fluent-plugin-dynamodb-streams を作った

Amazon DynamoDB Streams のレコードを Fluentd で扱いたい要件があり、ググっても見当たらなかったのでプラグインを実装してみました。

takus/fluent-plugin-dynamodb-streams

使い方

README に書いてある通りですが、こんな設定でお使いいただけます。他の Fluentd のプラグインと組み合わせることで、AWS Official Blog にある、Logstash + Elasticsearch のような用途にもお使いいただけるのではないかと思います。

<source>
  type dynamodb_streams
  stream_arn YOUR_DDB_STREAMS_ARN
  pos_file /var/lib/fluent/dynamodb_streams_table_name
  fetch_interval 1
  fetch_size 100
</source>

また、example にあるように、 fluent-plugin-filter-jq と組み合わせると特定の条件で、特定のフィールドだけ取得するみたいなこともできるかと思います。

<source>
  type dynamodb_streams
  tag stream
  aws_region ddblocal
  stream_arn "#{ENV['STREAM_ARN']}"
</source>

# Only pass MODIFY event
<filter stream>
  type grep
  regexp1 event_name MODIFY
</filter>

# Only keep new_image
<filter stream>
  type jq
  jq '.dynamodb|{new_image:.new_image}'
</filter>

<match stream>
  type stdout
</match>

そのほか

Amazon KCL (Kinesis Client Library) では、DynamoDB に Checkpoint を保持して耐障害性をあげていたり、ワーカーが並列にレコードを取得することで性能を稼いでいたりしますが、とりあえず自分の要件を満たすのには不要なので実装しておらず、若干不十分なところもありますのでご注意くださいませ。Issue / PR もお待ちしております。