【Logstash】一定周期で取得するデータを集約してElasticsearchに取り込む方法

Logstashはデータを加工しながらElasticsearchにリアルタイムに投入するツールです。Logstashでは単にインプットのデータを送出するだけでなく、一定期間のデータを集約して平均値等を算出し、レコードとして出力できます。

今回は、一定期間のデータを集約してレコードとしてElasticsearchに投入する方法を紹介します。

内容

Pythonプログラム でCSVファイルにリアルタイム(1秒に一度)に追記していき、そのCSVをLogstashで読み、Elasticsearchに送出します。その際5秒に一度データを集約し、その間の平均値を算出したものもElasticsearchに送出します。

登録するデータ

データはPythonプログラムで1秒に1件CSVファイルに追記されます。

sample.csv

蛇足ですが、このデータを出力するPythonプログラムは以下の通りです。

import csv
import datetime
import numpy as np
import time
import os

filename = "./sample.csv"

if os.path.exists(filename) == False:
    
    with open(filename, 'w', newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Time", "val1", "val2", "val3"])

while True:
    now = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
    
    val1 = round(np.random.rand(), 3)
    val2 = np.random.randint(4, 10)
    
    val3 = "test for Logstash"
    
    with open(filename, 'a', newline="") as f:
        writer = csv.writer(f)
        writer.writerow([now, val1, val2, val3])
    
    time.sleep(1)

Logstashの設定

ここからが肝心のLogstashの設定です。データの集約は、FilterのAggregateプラグインを用います。

まずは設定ファイルの中身全体を紹介します。

input {
  file{
	path => "C:/Project/makecsv/sample.csv"
	sincedb_path => "C:/Project/makecsv/sample_sincedb"
	start_position => "beginning"
	}
}

filter{

	csv{
		skip_header => "true"
		columns => ["Time", "val1", "val2", "val3"]
		separator => ","
	}
	
	mutate{
		convert => {
			"val1" => "float"
			"val2" => "float"
		}
	}

	date{
		match => ["Time", "yyyy/MM/dd HH:mm:ss" ]
	}
	
	aggregate{
		task_id => "%{host}"
		push_map_as_event_on_timeout => true
		
		code => "map['sum_val1']||=0; map['sum_val1'] += event.get('val1');    
        map['count']||=0; map['count'] += 1;"
		
		timeout => 5
		timeout_code => "event.set('average_val1', 
        ((event.get('sum_val1')).fdiv(event.get('count'))));"
	}
	
}

output {
  elasticsearch {
    hosts => "http://localhost:9200"
    index => "sample-%{+YYYY.MM}"
    user => "elastic"
    password => "elastic"
  }
  
  
}

inputとoutputはCSVファイルからデータを取り込んでElasticsearchに送出するための設定です。詳細は以下記事を参考にしてみてください。

aggregateプラグインの中身

task_id

aggregateプラグインでは、task_idを指定する必要があります。task_idが同じレコードが集約されます。今回はelasticsearchへの登録時に自動でレコードに付けられる”host”をtask_idとします。”host”の値は全て同じなので、全てのレコードを1つのタスクとして集約が実行されます。

push_map_as_event_on_timeout

push_map_as_event_on_timeout はTrueにすることで、timeoutが発生した際にmap(後述)がLogstashのイベントとしてアウトプットされるようになります。集約データを出力する場合はTrueとしておきましょう。デフォルトはFalseです。

code

codeではどのようなルールで集約するかを指定します。mapは集約のためにデータを保持しておくためのものです。集約する期間の間ある項目の値を足していったりすることができます。これを利用して平均値を算出します。

map['sum_val1']||=0; map['sum_val1'] += event.get('val1');

これは、’sum_val1’というmap用の変数を作成するコードです。初期値を0とし、その後レコードを読み取るたびに’val1’の値を足していきます。

map['count']||=0; map['count'] += 1;

このコードでは、集約期間のレコードの件数をカウントします。

timeout

timeoutする秒数を指定します。最初のレコードから起算して秒数がカウントされます。集約はこの秒数の間で行われ、タイムアウトするたびにmapの情報などはリセットされます。

timeout_code

タイムアウトした際に実行する処理を書きます。今回はval1の平均値を求めます。

event.set('average_val1', 
((event.get('sum_val1')).fdiv(event.get('count'))));

平均値は’average_val’という変数に格納されます。タイムアウトするごとに、mapの値とtimeout_codeで設定した変数がレコードとして出力されます。

ちなみに、fdiv()は割り算をするための関数です。

結果

レコードを登録した結果をKibanaで確認してみます。

5秒に一度、集約された結果がレコードとして登録されています。変数’count’ から5件のレコードを集約していることがわかります。

まとめ

Logstashでデータを集約してElasticsearchに送出する方法を紹介しました。

Logstashでは単に1件のデータを加工するだけでなく、今回のように複数のレコードを集約して処理するといったことも可能です。

今後もLogstashについて便利な使い方を紹介していければと思います^^

ではでは👋