駄文型

プログラミングとか英語とかの話題を中心にした至極ちゃらんぽらんな日記です。

kafka-docker で作った kafka クラスタに Elixir クライアントから接続する

kafka-docker でローカルに kafka クラスタを構築する - 駄文型 の続き。kafkaex/kafka_ex という Kafka クライアントで Producer を作ってみます。

ライブラリの取得

mix.exs を編集して mix deps.get するだけ。

参考: Introduction to Mix - Elixir

defmodule ProducerSampleEx.Mixfile do
  # ...

  def application do
    [
      applications: [
        :kafka_ex,
        :snappy
      ]
    ]
  end

  defp deps do
    [
      {:kafka_ex, "~> 0.6.5"},
      {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"}
    ]
  end
end
$ mix deps.get

設定

まず、kafka-docker でローカルに kafka クラスタを構築する - 駄文型 で作った Kafka コンテナのポートを確認。今回は 32776 32778

$ docker-compose ps
                Name                              Command               State                          Ports
-----------------------------------------------------------------------------------------------------------------------------------
66f23109d78a_kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:32777->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
kafkadocker_kafka_1                    start-kafka.sh                   Up      0.0.0.0:32778->9092/tcp
kafkadocker_kafka_2                    start-kafka.sh                   Up      0.0.0.0:32776->9092/tcp

次にKafkaEx.Config – kafka_ex を参考に config.exs を書く。以下は最低限の設定。

config :kafka_ex,
  brokers: [
    {"192.168.145.65", 32776}, # {"hostname", port}
    {"192.168.145.65", 32778}
  ],
  consumer_group: :no_consumer_group,
  use_ssl: false

iex で確認

iex -S mix で立ち上げる。設定がおかしいとここでエラーがでる。

$ iex -S mix
Erlang/OTP 19 [erts-8.3] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]


11:53:23.055 [debug] Succesfully connected to broker "192.168.145.65":32776

11:53:23.056 [debug] Succesfully connected to broker "192.168.145.65":32778

11:53:23.075 [debug] Establishing connection to broker 1009: "192.168.145.65" on port 32778

11:53:23.076 [debug] Succesfully connected to broker "192.168.145.65":32778

11:53:23.076 [debug] Establishing connection to broker 1010: "192.168.145.65" on port 32776

11:53:23.077 [debug] Succesfully connected to broker "192.168.145.65":32776
Interactive Elixir (1.4.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>

つながった! metadata/1 で確認できる。

iex(1)> KafkaEx.metadata(topic: "topic")
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "192.168.145.65",
   node_id: 1009, port: 32778, socket: nil},
  %KafkaEx.Protocol.Metadata.Broker{host: "192.168.145.65", node_id: 1010,
   port: 32776, socket: nil}],
 topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
   partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :leader_not_available,
     isrs: [], leader: -1, partition_id: 0, replicas: []},
    %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [1010], leader: 1010, partition_id: 3, replicas: [1010]},
    %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [1010], leader: 1010, partition_id: 1, replicas: [1010]},
    %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [1009], leader: 1009, partition_id: 2, replicas: [1009]}],
   topic: "topic"}]}

あとは produce/4 で送るだけ!トピック名、パーティション番号、メッセージを渡す。

iex(2)> KafkaEx.produce("topic", 0, "msg") # opt は省略
:leader_not_available

11:56:41.715 [error] Leader for topic topic is not available

あれ?どうやら、トピック名が topic だとだめそう。別のトピックを指定すると通る。

iex(3)> KafkaEx.produce("new_topic", 0, "msg")
:ok

(追記) 使えなかったのはトピックではなくパーティションだった。リーダーが使用不可になっている。それがなぜかは不明だが。。。

kafka-console-consumer.sh で確認

前回同様、 Kafka Shell を起動して

$ start-kafka-shell.sh 192.168.145.65 192.168.145.65:32777 # Kafka Shell を起動

kafka-console-consumer.sh を叩く。 KafkaEx.produce/4 でもう一度送り、メッセージが表示されればOK!!!

$ kafka-console-consumer.sh --topic new_topic --zookeeper $ZK
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by pas
sing [bootstrap-server] instead of [zookeeper].
msg

ハマったところ

  • config.exs の設定
    • brokers{"192.168.145.65", "32776"} と設定してしまう凡ミスを犯していた。
  • トピック名が topic だとだめそう。
  • mix.exs の設定
    • kafka_ex の README.md には mod: {MyApp, []}, という行があったので入れていた。
    • これはモジュールのコールバックの設定を行うためのもの。
    • 入れておくと MyApp.start/2 (今回の場合 ProducerSampleEx.start/2 )を実行しようとしてしまう。
    • 今回は不要なので削除した。