Kafka receiver
Receives metrics, logs, and traces from Kafka. Metrics and logs only support the OTLP format.
The Kafka receiver allows the Splunk Distribution of the OpenTelemetry Collector to collect metrics and logs (in OTLP format), and traces, from Kafka. Message payload encoding is configurable. The supported pipeline types are metrics, logs, and traces. See Process your data with pipelines for more information.
Get started
Follow these steps to configure and activate the component:
-
Deploy the Splunk Distribution of the OpenTelemetry Collector to your host or container platform:
-
Configure the Kafka receiver as described in the next section.
-
Restart the Collector.
Sample configuration
To activate the receiver, add kafka to the receivers section of your configuration file:
receivers:
kafka:
protocol_version: 2.0.0
To complete the configuration, include the receiver in one or more pipelines of the service section of your configuration file. For example:
service:
pipelines:
metrics:
receivers: [kafka]
Main settings
The following setting is required:
-
protocol_version. The Kafka protocol version, for example2.0.0.
The following settings are optional:
-
brokers.localhost:9092by default. The list of Kafka brokers. -
resolve_canonical_bootstrap_servers_only.falseby default. Whether to resolve then reverse-lookup broker IPs during startup. -
topic. Defaults:otlp_spansfor traces,otlp_metricsfor metrics,otlp_logsfor logs. The name of the Kafka topic to read from. You can only use one telemetry type for a given topic. -
encoding.otlp_protoby default. The encoding of the payload received from Kafka. The following encodings are available :-
otlp_proto. The payload is deserialized toExportTraceServiceRequest,ExportLogsServiceRequestorExportMetricsServiceRequestrespectively. -
jaeger_proto. The payload is deserialized to a single Jaeger protoSpan. -
jaeger_json. The payload is deserialized to a single Jaeger JSON Span usingjsonpb. -
zipkin_proto. The payload is deserialized into a list of Zipkin proto spans. -
zipkin_json. The payload is deserialized into a list of Zipkin V2 JSON spans. -
zipkin_thrift. The payload is deserialized into a list of Zipkin Thrift spans. -
raw``. Only for logs. The payload’s bytes are inserted as the body of a log record. -
text. Only for logs. The payload is decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can usetext_<ENCODING>, such astext_utf-8ortext_shift_jis, to customize this behavior. -
json. Only for logs. The payload is decoded as JSON and inserted as the body of a log record. -
azure_resource_logs. Only for logs. The payload is converted from Azure Resource Logs format to OTLP.
-
-
group_id.otel-collectorby default. The consumer group the receiver consumes messages from. -
client_id.otel-collectorby default. The consumer client ID. -
initial_offset.latestby default. The initial offset to use if no offset was previously committed. Possible values arelatestorearliest. -
auth. You can use the following options to authenticate:-
plain_text. It has the following fields:-
username. The username to use. -
password. The password to use.
-
-
sasl. It has the following fields:-
username. The username to use. -
password. The password to use. -
mechanism. The SASL mechanism to use:SCRAM-SHA-256,SCRAM-SHA-512,AWS_MSK_IAMorPLAIN. -
aws_msk.region. If usingAWS_MSK_IAM, AWS region. -
aws_msk.broker_addr. If usingAWS_MSK_IAM, MSK broker address.
-
-
tls. It has the following fields:-
ca_file. Use only ifinsecureis set tofalse. Path to the CA cert. For a client it verifies the server certificate. -
cert_file. Use only ifinsecureis set tofalse. Path to the TLS cert to use for TLS required connections. -
key_file. Use only ifinsecureis set tofalse. Path to the TLS key to use for TLS required connections. -
insecure.falseby default. Deactivates the verification of the server’s certificate chain and host name,InsecureSkipVerifyin the tls configuration. -
server_name_override. Indicates the name of the server requested by the client in order to support virtual hosting.
-
-
kerberos. It has the following fields:-
service_name. Kerberos service name. -
realm. Kerberos realm. -
use_keytab. Iftrue, the keytab is used instead of the password. -
username. The Kerberos username used to authenticate with KDC. -
password. The Kerberos password used to authenticate with KDC. -
config_file. Path to Kerberos configuration, for example/etc/krb5.conf. -
keytab_file. Path to the keytab file, for example/etc/security/kafka.keytab. -
disable_fast_negotiation.falseby default. Deactivates the PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation.
-
-
-
metadata. It has the following fields:-
full.trueby default. Whether to maintain a full set of metadata. When deactivated, the client does not make the initial request to broker at the startup. -
retry. It has the following fields:-
max.3by default. The number of retries to get metadata. -
backoff.250msby default. How long to wait between metadata retries.
-
-
-
autocommit. It has the following fields:-
enable.trueby default. Whether or not to auto-commit updated offsets back to the broker. -
interval.1sby default. How frequently to commit updated offsets. Ineffective unlessauto-commitis enabled.
-
-
message_marking. It has the following fields:-
after.falseby default. Iftrue, the messages are marked after the pipeline is executed. -
on_error.falseby default. Iffalse, only the successfully processed messages are marked. Note that this can block the entire partition in case a processed message returns a permanent error.
-
-
header_extraction. Determines how to extract headers. It has the following fields:-
extract_headers.falseby default. Iftrue, header fields are attached to resource attributes. -
headers.[]by default. List of headers you want to extract from the Kafka records. The matching pattern isexact. Regexes are not supported for the moment.
-
Configuration example: Connect to Kafka using SASL and TLS
This is an example of how to configure the receiver to connect to Kafka using SASL and TLS:
receivers:
kafka:
auth:
sasl:
username: "user"
password: "secret"
mechanism: "SCRAM-SHA-512"
tls:
insecure: false
Configuration example: Extract headers
This is an example of how to configure the receiver to extract headers:
receivers:
kafka:
topic: test
header_extraction:
extract_headers: true
headers: ["header1", "header2"]
If you feed the receiver the following test:
{
event: Hello,
headers: {
header1: value1,
header2: value2,
}
}
You’ll obtain the following log record:
{
...
body: Hello,
resource: {
kafka.header.header1: value1,
kafka.header.header2: value2,
},
...
}
The following applies:
-
Kafka record headers
header1andheader2are added to the resource’s attributes. -
Every matching Kafka header key is prefixed with the
kafka.headerstring and attached to the resource’s attributes.
Settings
The following table shows the configuration options for the Kafka receiver:
included
https://raw.githubusercontent.com/splunk/collector-config-tools/main/cfg-metadata/receiver/kafka.yaml
Troubleshooting
If you are a Splunk Observability Cloud customer and are not able to see your data in Splunk Observability Cloud, you can get help in the following ways.
Available to Splunk Observability Cloud customers
Submit a case in the Splunk Support Portal.
Contact Splunk Support.
Available to prospective customers and free trial users
Ask a question and get answers through community support at Splunk Answers.
Join the Splunk community #observability Slack channel to communicate with customers, partners, and Splunk employees worldwide.