前回は、Elastic Stackのうち、ElasticsearchとKibanaの環境を構築しました。
今回は、Logstash環境を構築して、Elasticsearchにデータを投入していきたいと思います!
Logstashとは
Logstashはサーバーサイドデータ処理パイプラインとして、データの投入、変換、送信することができるプロダクトです。
Logstashは、非常に様々な形式のデータに対応しています。様々な形式データをインプットとすることができ、様々な形式のアウトプットとして出力できます。
これによって、色々な形式でばらばらになっているデータを一元的に管理することが可能になります。
データの投入だけでなく、加工もできる
Logstashの特徴として、投入前にデータの解析や変換ができるという点があります。
- 数値の計算
- データ型の変換
- IPアドレスから地理情報を得る
- 日時情報の設定 などなど、、、
これ以外にも紹介しきれないほどの機能があります。
出力先はElasticsearchだけではない
Logstashを使ってデータを投入するメインの出力先はElasticsearchですが、それ以外を出力先に指定することもできます。
csvファイルとして出力したり、mongoDBに書き込んだりといったこともできます。
Logstash環境を構築する
それでは実際に、Logstashの環境を作っていきましょう。
今回は、csvファイルをElasticsearchに投入していきます。
サンプルデータとして、「2015~2020までのYahooの株価推移」のデータを投入します。
Logstashをインストールする
公式サイトから、Logstashのzipファイルをインストールします。
https://www.elastic.co/jp/downloads/logstash
zipファイルを、Cドライブの直下に解凍します。
今回のバージョンは、10.7.2です。
logstash-sample.confを編集する
cpnfigフォルダ内のlogstash-sample.confファイルを編集します。
テキストエディタ等で開いてください。
本記事の冒頭の図にもある通り、Logstashは「入力(input)」「加工(Filter)」「出力(output)」の3段階に分かれています。
この3段階は、これから編集するコンフィグ(.conf)ファイルで表現します。
inputを編集する
今回投入するデータは、”C:/project” の下に置きました。
inputのサンプルの書き方は以下の通りです。
csvをinputとするときは、fileプラグインを用います。
“path”で取り込むファイルのパスを指定します。ワイルドカード(*)も使えるので、複数のファイルを指定して取り込むことも可能です。
注)ファイル名に日本語が含まれる場合はワイルドカードを使うことができません。
sincedbとは、「どのファイルの何行までを取り込んだか」を保持しておくファイルです。これによって、同じデータが取り込まれることを防ぐことができます。sincedbに拡張子は必要ありません。
start_positionは、ファイルをどこから読むかを指定するものです。今回は、先頭から読み込むようにしています(デフォルトはtailで後ろから)
filterを編集する
続いて、filterを編集していきましょう。
今回はcsvファイルを読み込んでいるので、csvプラグインを使います。
“skip_header”は、csvファイルのヘッダーをスキップするかを指定します。今回はヘッダーがあるので、trueにしてスキップします。
columnsでは列名を指定していきます。ここで指定しないと、elasticsearch上で項目に名前が付きません(column1とかcolumn2になってしまいます)。
続いてmutateプラグインを使います。ここでは、各項目のデータ型を変換しています。
データ型をこのように指定しないと、基本的に全データ文字列として登録されてしまいます(可視化するときに困ります)。
dateプラグインでは、時刻を示す列を指定し、そのフォーマットを指定します。
Kibanaでデータを可視化する際、時系列にデータを扱うことが多くなります。そのため、時刻を表すフィールドを指定することは非常に重要です。
outputを編集する
最後に、outputを編集していきます。
今回は、Elasticsearchにデータを投入するための設定をします。
“elasticsearch”プラグインを用います。
ホストを指定するときは、ElasticsearchのURLを指定します。
注)Elasticsearchサーバがローカルにない場合は、サーバPCのIPアドレスまたはマシン名を指定してください
“index”では、Elasticsearchに登録するインデックス名を指定します。%{+YYYY}とありますが、これは、データの年ごとにインデックスを分けるための設定です。
1つのcsvファイルの中でも、2016年のデータは”yahoo-2016″に、2018年のデータは”yahoo-2018″というインデックスに格納されます。こうすることで、データの管理がしやすくなるといったメリットがあります。
ちなみに、%{YYYY.MM.dd}と書けば、1日分のデータごとにインデックスを分けることができます。
これで、confファイルの編集は完了です。
pipelines.ymlを編集する
続いて、pipelines.ymlを編集していきます。
このファイルは、複数の.confファイルをまとめて処理するための設定ファイルです。
今回の記述例をいかに示します。
デフォルトではいろいろ書かれていますが、全部消してこれだけ記述すれば動きます。
“pipeline.workers”と”pipeline.batch.size”の値は、特別変更する必要はないと思います。
“path.config”で、処理するconfファイルのパスを指定します。
複数のconfファイルを指定するときは
複数のconfファイルを指定する場合は、上記の4行をコピーして、ファイルの下部に貼り付けます。そして、idとconfファイルのパスを変更すればOKです^^
これで準備は整いました。
LogstashからElasticsearchにデータを投入する
それでは、データを投入しましょう。
binフォルダ内のlogstash.batをクリックし、プログラムを実行します。
図のように表示されていれば成功です。
Kibanaでデータを確認する
Elasticsearchに投入したデータをKibanaで確認しましょう。
インデックスはきちんと作成されていました。
Date列の日時に合わせて、データが登録されています^^
まとめ
LogstashからElasticsearchへのデータ投入方法について解説しました。
今回の設定はあくまでも一例であり、実際にはもっと多くのプラグインを使ってデータの加工や変換等ができます。
しかし、csvファイルをElasticsearchに投入することに関しては、本記事の通りに実践することで実現すると思います。
慣れてくれば、もっと高度な加工等を試してみてもいいかもしれませんね^^
ではでは👋