Kafka レシーバー

Kafka からメトリクス、ログ、トレースを受信します。メトリクスとログは OTLP 形式のみをサポートします。

Kafka レシーバーは、Splunk Distribution of OpenTelemetry Collector が、Kafka からメトリクスとログ(OTLP 形式)およびトレースを収集できるようにします。メッセージペイロードのエンコーディングは設定可能です。サポートされるパイプラインタイプは、metricslogstraces です。詳細については「パイプラインでデータを処理する」を参照してください。

はじめに

以下の手順に従って、コンポーネントの設定とアクティベーションを行ってください:

  1. Splunk Distribution of the OpenTelemetry Collector をホストまたはコンテナプラットフォームにデプロイします:

  2. 次のセクションで説明するようにKafkaレシーバーを設定します。

  3. Collector を再起動します。

サンプル構成

レシーバーをアクティブにするには、設定ファイルの kafka セクションに receivers を追加します:

receivers:
  kafka:
    protocol_version: 2.0.0

設定を完了するには、構成ファイルの service セクションの 1 つ以上のパイプラインにレシーバーを含めます。例:

service:
  pipelines:
    metrics:
      receivers: [kafka]

主な設定

以下の設定が必要です:

  • protocol_versionします。Kafka プロトコルバージョン、たとえば 2.0.0

以下の設定はオプションです:

  • brokers。デフォルトでは localhost:9092 です。Kafka ブローカーのリスト。

  • resolve_canonical_bootstrap_servers_only。デフォルトでは false です。起動時にブローカー IP を解決してからルックアップするかどうか。

  • topicします。デフォルト:トレースの場合は otlp_spans、メトリクスの場合は otlp_metrics、ログの場合は otlp_logs。読み取る Kafka トピックの名前。特定のトピックに使用できるテレメトリタイプは 1 つのみです。

  • encoding。デフォルトでは otlp_proto です。Kafka から受信したペイロードのエンコーディング。次のエンコーディングを使用できます。

    • otlp_protoします。ペイロードはそれぞれ ExportTraceServiceRequestExportLogsServiceRequest または ExportMetricsServiceRequest に逆シリアル化されます。

    • jaeger_protoします。ペイロードは、単一の Jaeger proto Span に逆シリアル化されます。

    • jaeger_jsonします。ペイロードは、jsonpb を使用して、単一の Jaeger JSON Span に逆シリアル化されます。

    • zipkin_protoします。ペイロードは Zipkin proto スパンのリストに逆シリアル化されます。

    • zipkin_jsonします。ペイロードは Zipkin V2 JSON スパンのリストに逆シリアル化されます。

    • zipkin_thriftします。ペイロードは Zipkin Thrift スパンのリストに逆シリアル化されます。

    • raw``します。ログの場合のみ。ペイロードのバイトはログレコードの本文として挿入されます。

    • textします。ログの場合のみ。ペイロードはテキストとしてデコードされ、ログレコードの本文として挿入されます。デフォルトでは、UTF-8 を使用してデコードを行います。この動作は、text_utf-8text_shift_jis などの text_<ENCODING> を使用してカスタマイズできます。

    • jsonします。ログの場合のみ。ペイロードは JSON としてデコードされ、ログレコードの本文として挿入されます。

    • azure_resource_logsします。ログの場合のみ。ペイロードは Azure Resource Logs フォーマットから OTLP に変換されます。

  • group_id。デフォルトでは otel-collector です。レシーバーがメッセージを消費するコンシューマグループ。

  • client_id。デフォルトでは otel-collector です。コンシューマクライアント ID。

  • initial_offset。デフォルトでは latest です。以前にコミットしたオフセットがない場合に使用する初期オフセット。有効な値は latest または earliest です。

  • authします。以下のオプションを使用して認証できます。

    • plain_textします。次のフィールドがあります。

      • usernameします。使用するユーザー名。

      • passwordします。使用するパスワード。

    • saslします。次のフィールドがあります。

      • usernameします。使用するユーザー名。

      • passwordします。使用するパスワード。

      • mechanismします。使用する SASL メカニズム: SCRAM-SHA-256SCRAM-SHA-512AWS_MSK_IAM、または PLAIN

      • aws_msk.regionします。AWS_MSK_IAM を使用している場合は、AWS リージョン。

      • aws_msk.broker_addrします。AWS_MSK_IAM を使用している場合は、MSK ブローカーのアドレス。

    • tlsします。次のフィールドがあります。

      • ca_fileします。insecurefalse に設定されている場合にのみ使用します。CA 証明書へのパスで、クライアントではサーバー証明書を検証します。

      • cert_fileします。insecurefalse に設定されている場合にのみ使用します。TLS 必須接続に使用する TLS 証明書へのパス。

      • key_fileします。insecurefalse に設定されている場合にのみ使用します。TLS 必須接続に使用する TLS キーへのパス。

      • insecure。デフォルトでは false です。TLS 設定で、サーバーの証明書チェーンとホスト名、InsecureSkipVerify の検証を無効にします。

      • server_name_overrideします。バーチャルホスティングをサポートするためにクライアントが要求したサーバー名を示します。

    • kerberosします。次のフィールドがあります。

      • service_nameします。Kerberos サービス名。

      • realmします。Kerberos レルム。

      • use_keytabします。true の場合、パスワードの代わりに keytab が使われます。

      • usernameします。KDC での認証に使用する Kerberos ユーザー名。

      • passwordします。KDC での認証に使用する Kerberos パスワード。

      • config_fileします。Kerberos 設定へのパス、たとえば /etc/krb5.conf

      • keytab_fileします。keytab ファイルへのパス、たとえば /etc/security/kafka.keytab

      • disable_fast_negotiation。デフォルトでは false です。PA-FX-FAST ネゴシエーション(事前認証フレームワーク:高速)を無効にします。一般的な Kerberos 実装の一部は、PA-FX-FAST ネゴシエーションをサポートしていません。

  • metadataします。次のフィールドがあります。

    • full。デフォルトでは true です。メタデータの完全なセットを維持するかどうか。無効にすると、クライアントはスタートアップ時にブローカーに最初の要求を行いません。

    • retryします。次のフィールドがあります。

      • max。デフォルトでは 3 です。メタデータ取得の再試行回数。

      • backoff。デフォルトでは 250ms です。メタデータの再試行の間隔。

  • autocommitします。次のフィールドがあります。

    • enable。デフォルトでは true です。更新されたオフセットをブローカーに自動コミットするかどうか。

    • interval。デフォルトでは 1s です。更新されたオフセットをコミットする頻度。 auto-commit が有効でない限り無効です。

  • message_markingします。次のフィールドがあります。

    • after。デフォルトでは false です。true の場合、メッセージはパイプラインが実行された後にマークされます。

    • on_error。デフォルトでは false です。false の場合、正常に処理されたメッセージのみがマークされます。処理されたメッセージが恒久的なエラーを返した場合、パーティション全体がブロックされる可能性があることに注意してください。

  • header_extractionします。ヘッダーを抽出する方法を決定します。次のフィールドがあります。

    • extract_headers。デフォルトでは false です。true の場合、ヘッダーフィールドはリソース属性にアタッチされます。

    • headers。デフォルトでは [] です。Kafka レコードから抽出するヘッダーのリスト。一致パターンは exact です。現在、正規表現はサポートされていません。

設定例:SASLとTLSを使用してKafkaに接続する

この例では、SASLとTLSを使用してKafkaに接続するレシーバーを設定する方法を示します:

receivers:
  kafka:
    auth:
      sasl:
        username: "user"
        password: "secret"
        mechanism: "SCRAM-SHA-512"
      tls:
        insecure: false

設定例:ヘッダーを抽出

この例では、レシーバーを設定してヘッダーを抽出する方法を示します:

receivers:
  kafka:
    topic: test
    header_extraction:
      extract_headers: true
      headers: ["header1", "header2"]

レシーバーに次の test をフィードした場合:

{
  event: Hello,
  headers: {
    header1: value1,
    header2: value2,
  }
}

以下のログレコードを取得します。

{
  ...
  body: Hello,
  resource: {
    kafka.header.header1: value1,
    kafka.header.header2: value2,
  },
  ...
}

以下が該当します:

  • Kafka レコードヘッダー header1header2 がリソースの属性に追加されます。

  • 一致する Kafka ヘッダーキーはすべて、kafka.header の文字列が先頭に付加され、リソースの属性に付加されます。

設定

次の表に、Kafkaレシーバーの設定オプションを示します:

同梱

https://raw.githubusercontent.com/splunk/collector-config-tools/main/cfg-metadata/receiver/kafka.yaml

トラブルシューティング

__ ___ ___ _ ______ _____________ _____ ________ ___ ___ ___ ____ __ ___ ____ ____ __ ______ _____________ ______ ___ ___ ___ ____ __ ___ _________ _____

_________ __ ______ _____________ _____ _________

_________ __ ___________ _________ ___ ____ _____ _____

  • ___ _ ________ ___ ___ _______ _______ _________ _______ __ ______ ________

  • ____ ___ ______ ______________ ____ _____ _____ _______ __ ___________ ____ __________ _________ ___ ______ _________ __________ __ _____ ___ ____ _______