Amazon DynamoDB Streams のレコードを Fluentd で扱いたい要件があり、ググっても見当たらなかったのでプラグインを実装してみました。
takus/fluent-plugin-dynamodb-streams
使い方
README に書いてある通りですが、こんな設定でお使いいただけます。他の Fluentd のプラグインと組み合わせることで、AWS Official Blog にある、Logstash + Elasticsearch のような用途にもお使いいただけるのではないかと思います。
<source>
type dynamodb_streams
stream_arn YOUR_DDB_STREAMS_ARN
pos_file /var/lib/fluent/dynamodb_streams_table_name
fetch_interval 1
fetch_size 100
</source>
また、example にあるように、 fluent-plugin-filter-jq と組み合わせると特定の条件で、特定のフィールドだけ取得するみたいなこともできるかと思います。
<source>
type dynamodb_streams
tag stream
aws_region ddblocal
stream_arn "#{ENV['STREAM_ARN']}"
</source>
# Only pass MODIFY event
<filter stream>
type grep
regexp1 event_name MODIFY
</filter>
# Only keep new_image
<filter stream>
type jq
jq '.dynamodb|{new_image:.new_image}'
</filter>
<match stream>
type stdout
</match>
そのほか
Amazon KCL (Kinesis Client Library) では、DynamoDB に Checkpoint を保持して耐障害性をあげていたり、ワーカーが並列にレコードを取得することで性能を稼いでいたりしますが、とりあえず自分の要件を満たすのには不要なので実装しておらず、若干不十分なところもありますのでご注意くださいませ。Issue / PR もお待ちしております。