Kafkaの管理ツールを読む - kafka-broker-api-versions.sh編

この記事は Distributed computing Advent Calendar 2017 の 13 日目の記事です。

Apache Kafka Logo

Kafka のブローカー間、あるいはブローカーとクライアント間の通信は、TCP 経由のバイナリプロトコルです。 それぞれの通信は API として定義されており、どういうフォーマットのリクエスト・レスポンスなのかが定められています。

API にはバージョンがあります。 ブローカーがサポートする API のバージョンを取得するには、Kafka に標準に同梱されている kafka-broker-api-versions.sh が利用できます。 kafka-broker-api-versions.sh はクラスタ内に存在するブローカーのリストと、それぞれのブローカーがサポートする API バージョンを取得します。

$ kafka-broker-api-versions.sh --bootstrap-server kafka-1.myhost:9092
kafka-1.myhost:9092 (id: 0 rack: null) -> (
        Produce(0): 0 to 5 [usable: 3],
        Fetch(1): 0 to 6 [usable: 5],
        Offsets(2): 0 to 2 [usable: 2],
        Metadata(3): 0 to 5 [usable: 4],
        LeaderAndIsr(4): 0 to 1 [usable: 0],
        StopReplica(5): 0 [usable: 0],
        UpdateMetadata(6): 0 to 4 [usable: 3],
        ControlledShutdown(7): 0 to 1 [usable: 1],
        OffsetCommit(8): 0 to 3 [usable: 3],
        OffsetFetch(9): 0 to 3 [usable: 3],
        FindCoordinator(10): 0 to 1 [usable: 1],
        JoinGroup(11): 0 to 2 [usable: 2],
        Heartbeat(12): 0 to 1 [usable: 1],
        LeaveGroup(13): 0 to 1 [usable: 1],
        SyncGroup(14): 0 to 1 [usable: 1],
        DescribeGroups(15): 0 to 1 [usable: 1],
        ListGroups(16): 0 to 1 [usable: 1],
        SaslHandshake(17): 0 to 1 [usable: 0],
        ApiVersions(18): 0 to 1 [usable: 1],
        CreateTopics(19): 0 to 2 [usable: 2],
        DeleteTopics(20): 0 to 1 [usable: 1],
        DeleteRecords(21): 0 [usable: 0],
        InitProducerId(22): 0 [usable: 0],
        OffsetForLeaderEpoch(23): 0 [usable: 0],
        AddPartitionsToTxn(24): 0 [usable: 0],
        AddOffsetsToTxn(25): 0 [usable: 0],
        EndTxn(26): 0 [usable: 0],
        WriteTxnMarkers(27): 0 [usable: 0],
        TxnOffsetCommit(28): 0 [usable: 0],
        DescribeAcls(29): 0 [usable: 0],
        CreateAcls(30): 0 [usable: 0],
        DeleteAcls(31): 0 [usable: 0],
        DescribeConfigs(32): 0 [usable: 0],
        AlterConfigs(33): 0 [usable: 0],
        UNKNOWN(34): 0,
        UNKNOWN(35): 0,
        UNKNOWN(36): 0,
        UNKNOWN(37): 0
)

たとえば Produce(0): 0 to 5 [usable: 3] は、ブローカーが Produce API のバージョン 0 から 5 までをサポートしていることを意味し、 usable は現在のクライアントライブラリで利用可能なバージョンを指します。 カッコ内の数字は API key と呼ばれるもので、各 API を識別する番号です。

この記事では、kafka-broker-api-versions.sh を読んでいきたいと思います。

BrokerApiVersionsCommand

kafka-broker-api-versions.sh の実装は BrokerApiVersionsCommand.scala に実装があります。 このファイルは 100 行にも満たない小さなクラスですが、重要な部分は以下のコードのみです。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala#L41-L50
val adminClient = createAdminClient(opts)
adminClient.awaitBrokers()
val brokerMap = adminClient.listAllBrokerVersionInfo()
brokerMap.foreach { case (broker, versionInfoOrError) =>
  versionInfoOrError match {
    case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")
    case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")
  }
}
adminClient.close()

まず AdminClient.awaitBrokers() を呼び出して、クラスタ内にブローカーが現れるまで待ちます。 クラスタ内にブローカーが見つかると、どれか 1 つのブローカーに対して、クラスタに存在する全てのブローカーを問い合わせます。 その結果から、存在する全ブローカーに対してサポートする API のバージョンを問い合わせて、その結果を表示します。

ブローカーを見つける

AdminClient.awaitBrokers() はクラスタ内にブローカーが現れるまで待ちます。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/AdminClient.scala#L151-L158
def awaitBrokers() {
  var nodes = List[Node]()
  do {
    nodes = findAllBrokers()
    if (nodes.isEmpty)
      Thread.sleep(50)
  } while (nodes.isEmpty)
}

AdminClient.findAllBrokers() は、クラスタ内の全てのブローカーを探します。 Kafka のクライアントを利用する時、ブートストラップサーバーを指定します。 しかし分散システムなので、常に全てのノードが生きている保証はありません。 findAllBrokers() はブートストラップのブローカーどれか 1 つに、現在クラスタ内に存在するブローカーを問い合わせます。

AdminClient.findAllBrokers() の実装は以下のようになってます。 AdminClient.sendAnyNode() は 1 番目に見つかったブローカーにリクエストを送ります。 全てのブローカーにリクエストを送れないと例外を投げます。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/AdminClient.scala#L160-L167
def findAllBrokers(): List[Node] = {
  val request = MetadataRequest.Builder.allTopics()
  val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
  val errors = response.errors
  if (!errors.isEmpty)
    debug(s"Metadata request contained errors: $errors")
  response.cluster.nodes.asScala.toList
}
// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/AdminClient.scala#L93-L105
private def sendAnyNode(api: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
  bootstrapBrokers.foreach { broker =>
    try {
      return send(broker, api, request)
    } catch {
      case e: AuthenticationException =>
        throw e
      case e: Exception =>
        debug(s"Request $api failed against node $broker", e)
    }
  }
  throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers")
}

クラスタ内に存在するブローカーを問い合わせるのに、 Metadata API を使用します。 Metadata API は、クラスタ内のトピックやパーティション、リーダー情報やクラスタ情報を取得できる API で、任意のノードに対し利用できる API でもあります。

API バージョンを取得する

ここまでで、クラスタ内に存在するブローカーが取得できました。 あとはそれぞれのブローカーに対して、API バージョンを問い合わせるだけです。 AdminClient.listAllBrokerVersionInfo() は、 クラスタ内のブローカーのリストを取得し、それぞれに対し API バージョンを取得します。

// https://github.com/apache/kafka/blob/1.0.0/core/src/main/scala/kafka/admin/AdminClient.scala#L207-L210
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
  findAllBrokers.map { broker =>
    broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
  }.toMap

API バージョンを取得するには、対象のブローカーに ApiVersions API を利用します。 レスポンスに各 API のサポートしてる API バージョンが含まれます。

// https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala#L142-L146
def getApiVersions(node: Node): List[ApiVersion] = {
  val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
  response.error.maybeThrow()
  response.apiVersions.asScala.toList
}

まとめ

ここまでの処理をまとめると、次のようなステップでクラスタ内に存在するブローカーの、サポートする API バージョンを取得します。

  1. クラスタにブローカーが現れるまで待つ
  2. ブローカーが現れると、どれかのノードに Metadata API を利用して、クラスタ内のブローカーリストを取得する
  3. ブローカーリストの各ブローカーにたいし ApiVersions API を利用してサポートする API のバージョンを取得する
  4. 求まったブローカーリストとそれぞれのブローカーのサポートする API のバージョンを表示する

Kafka のバイナリプロトコルについては、Kafka protocol guide にあります。 興味があれば読んでみてはいかがでしょうか。


Profile picture

Shin'ya Ueoka

B2B向けSaaSを提供する会社の、元Webエンジニア。今はエンジニアリング組織のマネジメントをしている。