Kafkaの管理ツールを読む - kafka-topics.sh編

この記事は Distributed computing Advent Calendar 2017 の 6 日目の記事です。

Apache Kafka Logo

Apache Kafka にはクラスタの管理ツールが含まれており、ユーザはこれらのツールを使ってトピックやオフセットを管理できます。 コードは規模的にもあまり大きくないので、Kafka の勉強にはちょうどいい教材です。 Kafka の内部構造を知ることで、障害発生時に原因を特定しやすくなります。

この記事では、トピック管理用の kafka-topics.sh の、よく使う以下の 4 つの操作を追ってゆきます。 なお今回読むコードは、1.0.0を対象にします。

# トピックの一覧表示
kafka-topics.sh --list --zookeeper localhost:2181/kafka

# トピックの作成
kafka-topics.sh --create --zookeeper localhost:2181/kafka --topic new-topic --partitions 3 --replication-factor 1

# トピックの詳細表示
kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic new-topic

# トピックの削除
kafka-topics.sh --delete --zookeeper localhost:2181/kafka --topic new-topic

TopicCommand

まず kafka-topics.sh の中身ですが、中身は以下の 1 行のみです。

# kafka-topics.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

Kafka の管理ツールは kafka-run-class.sh を経由して、コンパイルされたクラスを実行します。 コード上でのクラスの実装は、 TopicCommand.scala にあります。

Kafka のトピックの情報は Zookeeper に保存されており、kafka-topics.sh も Kafka ブローカーではなく Zookeeper にアクセスします。

トピックの一覧表示

Kafka のトピック情報は、Zookeeper 上の ${prefix}/brokers/topics/${topic} に保存されてます。 ${prefix}/brokers/topics 以下のノードを取得することでトピック一覧を取得できます。 TopicCommand でトピック一覧の取得は以下の部分でしてます。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/TopicCommand.scala#L85
val allTopics = zkUtils.getAllTopics().sorted

ZkUtils は Zookeeper 上の Kafka の情報にアクセするためのユーティリティクラスです。 ZkUtils.getAllTopics() の実装は次のようになってます。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/utils/ZkUtils.scala#L911-L917
def getAllTopics(): Seq[String] = {
  val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
  if(topics == null)
    Seq.empty[String]
  else
    topics
}

BrokerTopicsPath${prefix}/brokers/topics を指します。 ZkUtils.getChildrenParentMayNotExist() はあるパス以下のノードのを取得します

トピックの作成

トピックの作成には、トピック名の他にパーティション数とレプリカ数を指定します。 また --replica-assignment オプションで、レプリカをどのブローカーに割り当てるかを指定できます。 指定しないとクライアント側が割り当てを計算します。

コード的にはこの辺です。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/TopicCommand.scala#L101-L111
if (opts.options.has(opts.replicaAssignmentOpt)) {
  val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
} else {
  CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
  val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
  val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
  val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
  else RackAwareMode.Enforced
  AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
}

--replica-assignmentが与えられてるなら、その値をパースして AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK()に渡します。 そうでないときはAdminUtils.createTopic()を呼び出してますが、 内部でレプリカの割り当てを計算して AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK() に渡します。

レプリカを手動で割り当てる時

上述のように、レプリカの割り当て情報をパースして、AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK()を呼び出します。 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK()の実装は以下のようになってます。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/AdminUtils.scala#L504-L519
def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
                                                   topic: String,
                                                   partitionReplicaAssignment: Map[Int, Seq[Int]],
                                                   config: Properties = new Properties,
                                                   update: Boolean = false) {
  validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update)

  // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
  if (!update) {
    // write out the config if there is any, this isn't transactional with the partition assignments
    writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
  }

  // create the partition assignment
  writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
}

トピック更新時は update=false となり、writeEntityConfig() を呼び出します。 Kafka ではトピックごとの設定を Zookeeper 上の ${prefix}/config/topics/${topic} に記録します。 writeEntityConfig() は上記の場所にトピックの設定を記録するノードを作成します。 たとえばあるトピックに flush.message が設定されていると、以下のような JSON で記録されます。

{ "version": 1, "config": { "flush.messages": "1" } }

writeTopicPartitionAssignment()${prefix}/brokers/topics/${topic} にパーティションの割り当て情報を記録します。 たとえばパーティション 0 をブローカー 0,1 に、パーティション 1 をブローカー 1,2 に割り当てる場合、以下のような JSON が記録されます。

{ "version": 1, "partitions": { "0": [0, 1], "1": [1, 2] } }

レプリカを自動で割り当てる時

レプリカを自動で割り当ててトピックを作成する、AdminUtils.createTopic() の中身は以下のようになってます。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/AdminUtils.scala#L461-L463
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)

まず getBrokerMetadatas() でブローカーのラック情報を取得します。 そして AdminUtils.assignReplicasToBrokers() でレプリカの割り当てを計算して、 その結果をAdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK()に渡します。

AdminUtils.assignReplicasToBrokers() は以下の 3 つのゴールを達成するように割り当てを計算します。

  1. レプリカはブローカー間で偏りが無いこと
  2. 同パーティションのレプリカはブローカー間で分散されること
  3. ブローカーがラック情報を持つなら、ラック間でも分散されること

詳しいアルゴリズムは AdminUtils.assignReplicasToBrokers() のコメントに記述されてます。

トピックの削除

Kafka クラスタ内でのトピックの削除は、直接データを消しに行くのではなく、Zookeeper に削除リクエストのためのノードを作成します。 ${prefix}/admin/delete_topic/${topic}にノードを作ると、ラスタ内のコントローラノードが削除処理を開始し、各ブローカーや Zookeeper 上からトピックの情報を削除します。 トピックの削除が完了すると、Zookeeper 上からこのノードが削除されます。

具体的な削除リクエストを作成する処理は以下の部分のみです。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/TopicCommand.scala#L188
zkUtils.createPersistentPath(getDeleteTopicPath(topic))

トピックの詳細表示

--describe サブコマンドは、指定したトピックに対する以下の情報を取得します

  • トピック
    • パーティション数
    • レプリカ数
    • トピックの設定
    • 削除予定か否か
  • パーティション
    • レプリカの割り当て
    • リーダー
    • in-sync レプリカ
$ kafka-topics.sh --zookeeper localhost:2181/kafka --create \
    --topic my-topic --partitions 2 --replication-factor 2 --config flush.messages=1
Created topic "my-topic".

$ kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic my-topic
Topic:my-topic  PartitionCount:2        ReplicationFactor:2     Configs:flush.messages=1
        Topic: my-topic Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: my-topic Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0

以下の 3 つのオプションは、特定のトピック、パーティションのみを表示できます。

  • --topics-with-overrides: トピックの設定があるトピック名のみ表示
  • --under-replicated-partitions: レプリカ中のパーティションのみ表示
  • --unavailable-partitions: リーダーがいないパーティションのみ表示

これらの情報は全て Zookeeper 上から取得できます。 ${prefix}/brokers/topics/${topic}には、レプリカの割り当てが記録されてます。 またこの情報から、トピックのパーティション数、レプリカ数も求まります。

{ "version": 1, "partitions": { "1": [1, 0], "0": [0, 1] } }

${prefix}/config/topics/${topic} には、トピックの設定が記録されてます。

{ "version": 1, "config": { "flush.messages": "1" } }

${prefix}/brokers/topics/${topic}/partitions/${partition}/state には、リーダーおよび in-sync レプリカが記録されてます。

{
  "controller_epoch": 2,
  "leader": 0,
  "version": 1,
  "leader_epoch": 0,
  "isr": [0, 1]
}

describe の表示処理は、TopicCommand.describeTopics() に記述されてます。 まず、トピックの一覧(--topicオプションで指定したトピック、または全て)と、現在クラスタ内に存在するブローカーのリスト(${prefix}/brokers/ids以下)を取得します。 ブローカーのリストは、後にクラスタ内でリーダーが存在しているかをチェックするのに使います。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/TopicCommand.scala#L204
val topics = getTopics(zkUtils, opts)
// ...
val liveBrokers = zkUtils.getAllBrokersInCluster().map(_.id).toSet

各トピックに対して、トピックの設定とレプリカの割り当てを取得して、必要な情報を抜き出します。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/TopicCommand.scala#L217-L225
val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala
if (!reportOverriddenConfigs || configs.nonEmpty) {
  val numPartitions = topicPartitionAssignment.size
  val replicationFactor = topicPartitionAssignment.head._2.size
  val configsAsString = configs.map { case (k, v) => s"$k=$v" }.mkString(",")
  val markedForDeletionString = if (markedForDeletion) "\tMarkedForDeletion:true" else ""
  println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s%s"
    .format(topic, numPartitions, replicationFactor, configsAsString, markedForDeletionString))
}

AdminUtils.fetchEntityConfig()${prefix}/config/topics/${topic} からトピックの設定を取得します。 --topic-with-overridesが付与されてかつ、設定が空の場合トピックは表示しません。 パーティション数、レプリカ数はレプリカの割り当てから求めます。 また ${prefix}/admin/delete_topic/${topic} の存在を確認して、削除予定ならそれも表示します。

パーティションの情報に関しては、in-sync レプリカとリーダーを ${prefix}/brokers/topics/${topic}/partitions/${partition}/state から取得します。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/TopicCommand.scala#L229-L244
val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionId)
val leader = zkUtils.getLeaderForPartition(topic, partitionId)
if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
    (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
    (reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get)))) {

  val markedForDeletionString =
    if (markedForDeletion && !describeConfigs) "\tMarkedForDeletion: true" else ""
  print("\tTopic: " + topic)
  print("\tPartition: " + partitionId)
  print("\tLeader: " + (if(leader.isDefined) leader.get else "none"))
  print("\tReplicas: " + assignedReplicas.mkString(","))
  print("\tIsr: " + inSyncReplicas.mkString(","))
  print(markedForDeletionString)
  println()
}

in-sync レプリカとリーダーは、それぞれ ZkUtils.getInSyncReplicasForPartition()ZkUtils.getLeaderForPartition() で取得できます。 --under-replicated-partitions オプションおよび --unavailable-partitions オプションが付与されてないときは、パーティションを表示します。 --under-replicated-partitions オプションが付与されてかつ in-sync レプリカ数がレプリカ数より小さいなら、レプリカ中のパーティションを表示します。 --unavailable-partitions オプションが付与されてかつリーダーが空またはクラスタ内に存在しないなら、パーティションを表示します。

まとめ

この記事では、kafka-topics.sh 機能のうち、トピックの一覧表示、作成、削除、詳細表示する機能を、コードベースで読み解いてゆきました。 分散システムの特性上 Kafka は Zookeeper にも多くのデータを残しているので、トラブル時に Zookeeper を見ると障害対応の手がかりになります。

この記事では Zookeeper を利用したトピックの操作を紹介しましたが、 経由の実際 Kafka 0.11.0.0 から、トピックの作成と削除に関する API が追加されました (CreateTopics APIDeleteTopics API)。 今後もクライアント側は Zookeeper へ依存しなくても、Kafka のトピックの操作やその他の管理ができるようになるかも知れません。


Profile picture

Shin'ya Ueoka

B2B向けSaaSを提供する会社の、元Webエンジニア。今はエンジニアリング組織のマネジメントをしている。