Kafka ConnectがTasksを分散する様子
目次
- 目次
- Kafka Connect と Producer/Consumer
- Kafka Connect のTaskとWorker
- 対象とするログ収集基盤例
- Kafka Connect REST Interface
- Workerがダウンした場合の挙動
- tasks.max を増減した場合の挙動
- 最後に
Kafka Connect と Producer/Consumer
Kafkaでデータパイプラインを構築するとき、他のデータストアとのインタフェースには以下の2通りがあると理解している。
- 自前アプリケーションでKafka APIを叩く (Producer/Consumer)
- 実装の全体を把握できるので問題発生時に細部まで対応可能と思う
- スケーラビリティを自前で担保する必要がある
- Kafka Connectを使う(Source/Sink)
ということで、自分の作りたいパイプラインに登場するデータストアに対応したプラグインがあれば、Kafka Connectを採用する方が比較的楽ができそうな気がする。ただし、自分でプラグインを書くような茨の道を行くことになりそうな場合は状況をよく吟味し、前者の方法に倒す場合もあるようだ。
Why We Replaced Our Kafka Connector with a Kafka Consumer
以下、Kafka Connect を使う前提でいくつかのデータストアと連携させる様子を記載する。
Kafka Connect のTaskとWorker
まずは Kafka Connect を構成するコンポーネントを知っておく。参照するのは以下。
Kafka Connect Concepts — Confluent Platform
Tasks
これはデータ連携元から(Sourceの場合)であったり連携先へ(Sinkの場合)データをコピーするJobを分割したもの。Connectorを作成する際のパラメータとして tasks.max
に指定した数が最大分割数となる。タスク自体は状態を持たず、Kafka Server に作られる特殊なトピックあるいはConnectorインスタンスで状態管理している。
分割されたタスクは以下にあるWorkersに分散され、並行性やスケーラビリティが提供される。
Workers
Standalone とDistributed があるが、Getting Started 的な位置付けの前者は置いておく。Distributed Worker とは、実質Kafka Connect クラスタを構成するノードそのものと捉えて良いと思う。
このWorker上にConnectorインスタンスおよびタスクが分散される。例えば以下のように。
上図について追加説明をする。まず3ノードクラスタを考えている。つまりWorkerが3つある。Healthy と書かれた部分では、3ノード共が正常に稼働しており、Connectorインスタンスが1つ(C1)と実際に処理をするタスクが5つ(T1 - T5)あり、3ノードでタスクが分散されている。
Worker2 Failure と書かれた部分で、1ノードがダウンした様子が記載されている。ダウンしたノードに載っていたタスク(T2とT3)を、正常な2ノードで処理しなけれならなくなった。
Task Rebalance と書かれた部分で、T2とT3はそれぞれ別々のノードに移動し処理が継続されることが記載されている。
上記のようにタスクはFailoverしながらクラスタ内のいずれかのWorkerで処理されるようにうまくコントロールされる。
それ以外
Tasks と Workers の他に、 Converter と Transform の紹介があるが、これらはデータの加工処理に関わるコンポーネントである。本エントリでは興味の対象外とする。
対象とするログ収集基盤例
想定する簡単なログ収集基盤の概要は以下のようなものである。これ自体大した情報ではないが、この後で記載するタスク分散の挙動などは、この構成に特有のものである可能性があるので念の為書いておく。
- 各種ログをin_tailしているfluentdがfluent-plugin-kafkaで Kafka Clusterにメッセージを送信している
- Zookeeper / Kafka Broker / Kafka Connect それぞれのクラスタは同一の3ノード上に構築している
- Kafka Sink Connectorで、KafkaトピックからElasticsearchやinfluxDBに出力している。したがってConnector の Worker が3ノードである
Kafka関連の構成物は、基本的にconfluent-platformをドキュメント通りにインストールすれば問題ない。
Manual Install using Systemd on RHEL and CentOS — Confluent Platform
ElasticsearchやS3やHDFSといった代表的なデータストアのConnectorは同梱されておりそのまま使うことができるようになる。ただしinfluxDBのConnectorは自分で導入する必要がある(2019年3月時点プレビュー版のため)。Confluent公式から入手可能。ただ、自分の場合、これだとfluentdから送信されたJSONをうまく扱うことがどうしてもできず(java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
)、以下のLandoopのものを使用した。
Kafka Connect REST Interface
Kafka Connect にはREST APIがあり、たいていのアクションがこのAPIを通して行える。
Kafka Connect REST Interface — Confluent Platform
例えば、ElasticsearchにつなげるSink Connectorを作るには以下のようなリクエストを実行する。
curl -X POST http://kafka-broker01:8083/connectors \ -H 'Content-Type: application/json' \ -H 'Accept: application/json' \ -d '{ "name": "syslog_topic_es", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type.name": "type", "topics": "syslog_topic", "tasks.max": "5", "value.converter.schemas.enable": "false", "connection.url": "http://elasticsearch:9200", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.ignore": "true", "schema.ignore": "true" } }'
直感的に理解しやすい項目として、 topics
でKafka Broker上の対象トピックを、connection.url
で出力先のElasticsearchを指定するなどしているが、ここで注目したいのは tasks.max
である。上記の例では 5
としており、ここで指定した数値がWorkers上で展開されるTasksの数となる。
この後にTasksの状態を確認してみる。
curl http://kafka-broker01:8083/connectors/syslog_topic_es/status { "name": "syslog_topic_es", "connector": { "state": "RUNNING", "worker_id": "kafka-broker01:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "kafka-broker01:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "kafka-broker02:8083" }, { "id": 2, "state": "RUNNING", "worker_id": "kafka-broker03:8083" }, { "id": 3, "state": "RUNNING", "worker_id": "kafka-broker01:8083" }, { "id": 4, "state": "RUNNING", "worker_id": "kafka-broker02:8083" } ], "type": "sink" }
Connector インスタンスが kafka-broker01
ノードで稼働しており、5つのTaskが分散されて稼働している。
Workerがダウンした場合の挙動
Workerがダウンした場合、どのようにタスクがFailoverするかを観察してみる。
三番目のWorker(kafka-broker03)をstopしてみる。stopした直後に先のコマンドでstatusを確認してみると、以下のような状態を取得できた。
curl http://kafka-broker01:8083/connectors/syslog_topic_es/status { ... { "id": 0, "state": "RUNNING", "worker_id": "kafka-broker01:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "kafka-broker02:8083" }, { "id": 2, "state": "RUNNING", "worker_id": "kafka-broker01:8083" }, { "id": 3, "state": "RUNNING", "worker_id": "kafka-broker02:8083" }, { "id": 4, "state": "UNASSIGNED", "worker_id": "kafka-broker02:8083" } ... },
最終的には以下のように収束した。
0 -> kafka-broker01, 1 -> kafka-broker02, 2 -> kafka-broker01, 3 -> kafka-broker02, 4 -> kafka-broker01
次にダウンしたノードを復帰してみる。ちなみに、Kafka Connect のサービスはプラグインを順次ロードしていくことになり、プラグインの数によっては起動に要する時間が膨らみがちである。Confluent Platformを通常通りインストールしただけでも標準的なプラグインが豊富に含まれるので、不要なプラグインはそもそも削除するほうが運用上は良さそうである。復帰後は以下のような状態が取得できた。
0 -> kafka-broker01, 1 -> kafka-broker02, 2 -> kafka-broker03, 3 -> kafka-broker01, 4 -> kafka-broker02
上記より、おそらく、ダウン/復帰時に必要最低限のTasksをFailoverさせるというわけではなく、task id の若番から単純に順番にWorkersにAssignされていく仕組みのようである。
tasks.max を増減した場合の挙動
REST APIから設定値を変更することも容易である。
tasks.max
を変更するのには、例えば以下のようなリクエストを実行する。
curl -H 'Content-Type:application/json' -XPUT http://kafka-broker01:8083/connectors/syslog_topic_es/config -d '{ "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type.name": "type", "topics": "syslog_topic", "tasks.max": "10", "value.converter.schemas.enable": "false", "connection.url": "http://elasticsearch:9200", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.ignore": "true", "schema.ignore": "true" } '
最終的な結果は以下となる。
{"id": 0, "state": "RUNNING", "worker_id": "kafka-broker01:8083"}, {"id": 1, "state": "RUNNING", "worker_id": "kafka-broker02:8083"}, {"id": 2, "state": "RUNNING", "worker_id": "kafka-broker03:8083"}, {"id": 3, "state": "RUNNING", "worker_id": "kafka-broker01:8083"}, {"id": 4, "state": "RUNNING", "worker_id": "kafka-broker02:8083"}, {"id": 5, "state": "RUNNING", "worker_id": "kafka-broker03:8083"}, {"id": 6, "state": "RUNNING", "worker_id": "kafka-broker01:8083"}, {"id": 7, "state": "RUNNING", "worker_id": "kafka-broker02:8083"}, {"id": 8, "state": "RUNNING", "worker_id": "kafka-broker03:8083"}, {"id": 9, "state": "RUNNING", "worker_id": "kafka-broker01:8083"}
Tasksを再度5に戻してみる。
{"id": 0, "state": "RUNNING", "worker_id": "kafka-broker01:8083"}, {"id": 1, "state": "RUNNING", "worker_id": "kafka-broker02:8083"}, {"id": 2, "state": "RUNNING", "worker_id": "kafka-broker03:8083"}, {"id": 3, "state": "RUNNING", "worker_id": "kafka-broker01:8083"}, {"id": 4, "state": "RUNNING", "worker_id": "kafka-broker02:8083"}, {"id": 5, "state": "UNASSIGNED", "worker_id": "kafka-broker03:8083"}, {"id": 6, "state": "UNASSIGNED", "worker_id": "kafka-broker01:8083"}, {"id": 7, "state": "UNASSIGNED", "worker_id": "kafka-broker02:8083"}, {"id": 8, "state": "UNASSIGNED", "worker_id": "kafka-broker03:8083"}, {"id": 9, "state": "UNASSIGNED", "worker_id": "kafka-broker01:8083"}
一度設定されたタスク情報が削除されることはなく、 UNASSIGNED
となることがわかった。
Connectorインスタンスが稼働しているWorkerがダウンした場合の挙動
最後に、Connector インスタンスが稼働しているWorkerがダウンした場合の挙動を見る。最初に確認した通り、Connectorインスタンスは kafka-broker01
で稼働している。それを落とす。
ダウン直後は以下のように Connector が UNASSIGNED
となっている。
... "connector": { "state": "UNASSIGNED", "worker_id": "kafka-broker01:8083" }, ...
最終的には以下の状態に収束した。
... "connector": { "state": "RUNNING", "worker_id": "kafka-broker02:8083" }, ...
ざっとTasksがWorker上に分散される様子を観察した。
最後に
Kafka Connect は、よく利用されるデータストアに対しての実装は大抵用意されており、利用できる場面では推奨される。自前実装することもでき、しかも分散処理も容易に実現できるところも利点ではあるが、その場合分散システム特有の問題などが絡みトラブルシュートは辛くなるかもしれないのでよく検討する。
Tasksが分散される様子をみたが、アサインし直す場合、とにかくラウンドロビンにWorkerを割り当てていくことがわかった。巨大なクラスタでは、場合によっては、タスクの大移動が発生する可能性がありそうである。そうなったときのタスク移動の負荷は運用上無視できなくなる可能性はあるのかもしれない。