Kafka レシーバー
Kafka からメトリクス、ログ、トレースを受信します。メトリクスとログは OTLP 形式のみをサポートします。
Kafka レシーバーは、Splunk Distribution of OpenTelemetry Collector が、Kafka からメトリクスとログ(OTLP 形式)およびトレースを収集できるようにします。メッセージペイロードのエンコーディングは設定可能です。サポートされるパイプラインタイプは、metrics、logs、traces です。詳細については「パイプラインでデータを処理する」を参照してください。
はじめに
以下の手順に従って、コンポーネントの設定とアクティベーションを行ってください:
-
Splunk Distribution of the OpenTelemetry Collector をホストまたはコンテナプラットフォームにデプロイします:
-
次のセクションで説明するようにKafkaレシーバーを設定します。
-
Collector を再起動します。
サンプル構成
レシーバーをアクティブにするには、設定ファイルの kafka セクションに receivers を追加します:
receivers:
kafka:
protocol_version: 2.0.0
設定を完了するには、構成ファイルの service セクションの 1 つ以上のパイプラインにレシーバーを含めます。例:
service:
pipelines:
metrics:
receivers: [kafka]
主な設定
以下の設定が必要です:
-
protocol_versionします。Kafka プロトコルバージョン、たとえば2.0.0。
以下の設定はオプションです:
-
brokers。デフォルトではlocalhost:9092です。Kafka ブローカーのリスト。 -
resolve_canonical_bootstrap_servers_only。デフォルトではfalseです。起動時にブローカー IP を解決してからルックアップするかどうか。 -
topicします。デフォルト:トレースの場合はotlp_spans、メトリクスの場合はotlp_metrics、ログの場合はotlp_logs。読み取る Kafka トピックの名前。特定のトピックに使用できるテレメトリタイプは 1 つのみです。 -
encoding。デフォルトではotlp_protoです。Kafka から受信したペイロードのエンコーディング。次のエンコーディングを使用できます。-
otlp_protoします。ペイロードはそれぞれExportTraceServiceRequest、ExportLogsServiceRequestまたはExportMetricsServiceRequestに逆シリアル化されます。 -
jaeger_protoします。ペイロードは、単一の Jaeger protoSpanに逆シリアル化されます。 -
jaeger_jsonします。ペイロードは、jsonpbを使用して、単一の Jaeger JSON Span に逆シリアル化されます。 -
zipkin_protoします。ペイロードは Zipkin proto スパンのリストに逆シリアル化されます。 -
zipkin_jsonします。ペイロードは Zipkin V2 JSON スパンのリストに逆シリアル化されます。 -
zipkin_thriftします。ペイロードは Zipkin Thrift スパンのリストに逆シリアル化されます。 -
raw``します。ログの場合のみ。ペイロードのバイトはログレコードの本文として挿入されます。 -
textします。ログの場合のみ。ペイロードはテキストとしてデコードされ、ログレコードの本文として挿入されます。デフォルトでは、UTF-8 を使用してデコードを行います。この動作は、text_utf-8やtext_shift_jisなどのtext_<ENCODING>を使用してカスタマイズできます。 -
jsonします。ログの場合のみ。ペイロードは JSON としてデコードされ、ログレコードの本文として挿入されます。 -
azure_resource_logsします。ログの場合のみ。ペイロードは Azure Resource Logs フォーマットから OTLP に変換されます。
-
-
group_id。デフォルトではotel-collectorです。レシーバーがメッセージを消費するコンシューマグループ。 -
client_id。デフォルトではotel-collectorです。コンシューマクライアント ID。 -
initial_offset。デフォルトではlatestです。以前にコミットしたオフセットがない場合に使用する初期オフセット。有効な値はlatestまたはearliestです。 -
authします。以下のオプションを使用して認証できます。-
plain_textします。次のフィールドがあります。-
usernameします。使用するユーザー名。 -
passwordします。使用するパスワード。
-
-
saslします。次のフィールドがあります。-
usernameします。使用するユーザー名。 -
passwordします。使用するパスワード。 -
mechanismします。使用する SASL メカニズム:SCRAM-SHA-256、SCRAM-SHA-512、AWS_MSK_IAM、またはPLAIN。 -
aws_msk.regionします。AWS_MSK_IAMを使用している場合は、AWS リージョン。 -
aws_msk.broker_addrします。AWS_MSK_IAMを使用している場合は、MSK ブローカーのアドレス。
-
-
tlsします。次のフィールドがあります。-
ca_fileします。insecureがfalseに設定されている場合にのみ使用します。CA 証明書へのパスで、クライアントではサーバー証明書を検証します。 -
cert_fileします。insecureがfalseに設定されている場合にのみ使用します。TLS 必須接続に使用する TLS 証明書へのパス。 -
key_fileします。insecureがfalseに設定されている場合にのみ使用します。TLS 必須接続に使用する TLS キーへのパス。 -
insecure。デフォルトではfalseです。TLS 設定で、サーバーの証明書チェーンとホスト名、InsecureSkipVerifyの検証を無効にします。 -
server_name_overrideします。バーチャルホスティングをサポートするためにクライアントが要求したサーバー名を示します。
-
-
kerberosします。次のフィールドがあります。-
service_nameします。Kerberos サービス名。 -
realmします。Kerberos レルム。 -
use_keytabします。trueの場合、パスワードの代わりに keytab が使われます。 -
usernameします。KDC での認証に使用する Kerberos ユーザー名。 -
passwordします。KDC での認証に使用する Kerberos パスワード。 -
config_fileします。Kerberos 設定へのパス、たとえば/etc/krb5.conf。 -
keytab_fileします。keytab ファイルへのパス、たとえば/etc/security/kafka.keytab。 -
disable_fast_negotiation。デフォルトではfalseです。PA-FX-FAST ネゴシエーション(事前認証フレームワーク:高速)を無効にします。一般的な Kerberos 実装の一部は、PA-FX-FAST ネゴシエーションをサポートしていません。
-
-
-
metadataします。次のフィールドがあります。-
full。デフォルトではtrueです。メタデータの完全なセットを維持するかどうか。無効にすると、クライアントはスタートアップ時にブローカーに最初の要求を行いません。 -
retryします。次のフィールドがあります。-
max。デフォルトでは3です。メタデータ取得の再試行回数。 -
backoff。デフォルトでは250msです。メタデータの再試行の間隔。
-
-
-
autocommitします。次のフィールドがあります。-
enable。デフォルトではtrueです。更新されたオフセットをブローカーに自動コミットするかどうか。 -
interval。デフォルトでは1sです。更新されたオフセットをコミットする頻度。auto-commitが有効でない限り無効です。
-
-
message_markingします。次のフィールドがあります。-
after。デフォルトではfalseです。trueの場合、メッセージはパイプラインが実行された後にマークされます。 -
on_error。デフォルトではfalseです。falseの場合、正常に処理されたメッセージのみがマークされます。処理されたメッセージが恒久的なエラーを返した場合、パーティション全体がブロックされる可能性があることに注意してください。
-
-
header_extractionします。ヘッダーを抽出する方法を決定します。次のフィールドがあります。-
extract_headers。デフォルトではfalseです。trueの場合、ヘッダーフィールドはリソース属性にアタッチされます。 -
headers。デフォルトでは[]です。Kafka レコードから抽出するヘッダーのリスト。一致パターンはexactです。現在、正規表現はサポートされていません。
-
設定例:SASLとTLSを使用してKafkaに接続する
この例では、SASLとTLSを使用してKafkaに接続するレシーバーを設定する方法を示します:
receivers:
kafka:
auth:
sasl:
username: "user"
password: "secret"
mechanism: "SCRAM-SHA-512"
tls:
insecure: false
設定例:ヘッダーを抽出
この例では、レシーバーを設定してヘッダーを抽出する方法を示します:
receivers:
kafka:
topic: test
header_extraction:
extract_headers: true
headers: ["header1", "header2"]
レシーバーに次の test をフィードした場合:
{
event: Hello,
headers: {
header1: value1,
header2: value2,
}
}
以下のログレコードを取得します。
{
...
body: Hello,
resource: {
kafka.header.header1: value1,
kafka.header.header2: value2,
},
...
}
以下が該当します:
-
Kafka レコードヘッダー
header1とheader2がリソースの属性に追加されます。 -
一致する Kafka ヘッダーキーはすべて、
kafka.headerの文字列が先頭に付加され、リソースの属性に付加されます。
設定
次の表に、Kafkaレシーバーの設定オプションを示します:
同梱
https://raw.githubusercontent.com/splunk/collector-config-tools/main/cfg-metadata/receiver/kafka.yaml
トラブルシューティング
__ ___ ___ _ ______ _____________ _____ ________ ___ ___ ___ ____ __ ___ ____ ____ __ ______ _____________ ______ ___ ___ ___ ____ __ ___ _________ _____
_________ __ ______ _____________ _____ _________
-
______ _ ____ __ ___ ______ _______ _______
-
_______ ______ ________
_________ __ ___________ _________ ___ ____ _____ _____
-
___ _ ________ ___ ___ _______ _______ _________ _______ __ ______ ________
-
____ ___ ______ ______________ ____ _____ _____ _______ __ ___________ ____ __________ _________ ___ ______ _________ __________ __ _____ ___ ____ _______