LogstashからElasticsearchにデータを送る! 設定方法を解説

前回は、Elastic Stackのうち、ElasticsearchとKibanaの環境を構築しました。

今回は、Logstash環境を構築して、Elasticsearchにデータを投入していきたいと思います!

Logstashとは

Logstashはサーバーサイドデータ処理パイプラインとして、データの投入、変換、送信することができるプロダクトです。

Logstashは、非常に様々な形式のデータに対応しています。様々な形式データをインプットとすることができ、様々な形式のアウトプットとして出力できます。

これによって、色々な形式でばらばらになっているデータを一元的に管理することが可能になります。

データの投入だけでなく、加工もできる

Logstashの特徴として、投入前にデータの解析や変換ができるという点があります。

  • 数値の計算
  • データ型の変換
  • IPアドレスから地理情報を得る
  • 日時情報の設定  などなど、、、

これ以外にも紹介しきれないほどの機能があります。

出力先はElasticsearchだけではない

Logstashを使ってデータを投入するメインの出力先はElasticsearchですが、それ以外を出力先に指定することもできます。

csvファイルとして出力したり、mongoDBに書き込んだりといったこともできます。

Logstash環境を構築する

それでは実際に、Logstashの環境を作っていきましょう。

今回は、csvファイルをElasticsearchに投入していきます。

サンプルデータとして、「2015~2020までのYahooの株価推移」のデータを投入します。

今回投入するサンプルデータ

Logstashをインストールする

公式サイトから、Logstashのzipファイルをインストールします。

https://www.elastic.co/jp/downloads/logstash

zipファイルを、Cドライブの直下に解凍します。

今回のバージョンは、10.7.2です。

logstash-sample.confを編集する

cpnfigフォルダ内のlogstash-sample.confファイルを編集します。

テキストエディタ等で開いてください。

本記事の冒頭の図にもある通り、Logstashは「入力(input)」「加工(Filter)」「出力(output)」の3段階に分かれています。

この3段階は、これから編集するコンフィグ(.conf)ファイルで表現します。

inputを編集する

今回投入するデータは、”C:/project” の下に置きました。

inputのサンプルの書き方は以下の通りです。

csvをinputとするときは、fileプラグインを用います。

“path”で取り込むファイルのパスを指定します。ワイルドカード(*)も使えるので、複数のファイルを指定して取り込むことも可能です。

注)ファイル名に日本語が含まれる場合はワイルドカードを使うことができません

sincedbとは、「どのファイルの何行までを取り込んだか」を保持しておくファイルです。これによって、同じデータが取り込まれることを防ぐことができます。sincedbに拡張子は必要ありません。

start_positionは、ファイルをどこから読むかを指定するものです。今回は、先頭から読み込むようにしています(デフォルトはtailで後ろから)

filterを編集する

続いて、filterを編集していきましょう。

今回はcsvファイルを読み込んでいるので、csvプラグインを使います。

“skip_header”は、csvファイルのヘッダーをスキップするかを指定します。今回はヘッダーがあるので、trueにしてスキップします。

columnsでは列名を指定していきます。ここで指定しないと、elasticsearch上で項目に名前が付きません(column1とかcolumn2になってしまいます)。

続いてmutateプラグインを使います。ここでは、各項目のデータ型を変換しています。

データ型をこのように指定しないと、基本的に全データ文字列として登録されてしまいます(可視化するときに困ります)。

dateプラグインでは、時刻を示す列を指定し、そのフォーマットを指定します。

Kibanaでデータを可視化する際、時系列にデータを扱うことが多くなります。そのため、時刻を表すフィールドを指定することは非常に重要です。

outputを編集する

最後に、outputを編集していきます。

今回は、Elasticsearchにデータを投入するための設定をします。

“elasticsearch”プラグインを用います。

ホストを指定するときは、ElasticsearchのURLを指定します。

注)Elasticsearchサーバがローカルにない場合は、サーバPCのIPアドレスまたはマシン名を指定してください

“index”では、Elasticsearchに登録するインデックス名を指定します。%{+YYYY}とありますが、これは、データの年ごとにインデックスを分けるための設定です。

1つのcsvファイルの中でも、2016年のデータは”yahoo-2016″に、2018年のデータは”yahoo-2018″というインデックスに格納されます。こうすることで、データの管理がしやすくなるといったメリットがあります。

ちなみに、%{YYYY.MM.dd}と書けば、1日分のデータごとにインデックスを分けることができます。

これで、confファイルの編集は完了です。

pipelines.ymlを編集する

続いて、pipelines.ymlを編集していきます。

このファイルは、複数の.confファイルをまとめて処理するための設定ファイルです。

今回の記述例をいかに示します。

デフォルトではいろいろ書かれていますが、全部消してこれだけ記述すれば動きます

“pipeline.workers”と”pipeline.batch.size”の値は、特別変更する必要はないと思います。

“path.config”で、処理するconfファイルのパスを指定します。

複数のconfファイルを指定するときは

複数のconfファイルを指定する場合は、上記の4行をコピーして、ファイルの下部に貼り付けます。そして、idとconfファイルのパスを変更すればOKです^^

これで準備は整いました。

LogstashからElasticsearchにデータを投入する

それでは、データを投入しましょう。

binフォルダ内のlogstash.batをクリックし、プログラムを実行します。

図のように表示されていれば成功です。

Kibanaでデータを確認する

Elasticsearchに投入したデータをKibanaで確認しましょう。

インデックスはきちんと作成されていました。

Date列の日時に合わせて、データが登録されています^^

まとめ

LogstashからElasticsearchへのデータ投入方法について解説しました。

今回の設定はあくまでも一例であり、実際にはもっと多くのプラグインを使ってデータの加工や変換等ができます。

しかし、csvファイルをElasticsearchに投入することに関しては、本記事の通りに実践することで実現すると思います。

慣れてくれば、もっと高度な加工等を試してみてもいいかもしれませんね^^

ではでは👋