To instrument Kafka consumer entry points using KafkaConsumer.poll(), identify the method in which the consumer reads messages in a loop with a custom interceptor definition. We instrument the iterator's next method to start and end the Business Transaction for each message. There could be many iterators used for iterating messages but we only support iterators of the following types:
kafka.consumer.ConsumerIterator
org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable$1
- Identify the class and method of the loop that processes messages from Kafka.
Consider, for example, a class
MyConsumer that employs the following loop to poll and process messages from Kafka:
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
pollMessages(records);
private void pollMessages(ConsumerRecords<String, String>records) throws Exception {
//AppDynamics instrumentation gets applied here
for (ConsumerRecord<String, String> record : records) {
//Processing of the records
System.out.println(record.value());
}
}
For this case, you want to intercept:
- Class:
MyConsumer
- Method:
pollMessages
The interceptor can also be applied to a method that processes individual records, not just a loop. For example:
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
pollMessages(records);
private void pollMessages(ConsumerRecords<String, String>records) throws Exception {
//AppDynamics instrumentation gets applied here
for (ConsumerRecord<String, String> record : records) {
processRecord(record)
}
}
- your preferred text editor to create and edit a file named custom-interceptors.xml at the following path:
<agent_home>/<version_number>/conf
For example:
/usr/home/appdynamics/appagent/ver4.3.1.0/conf/custom-interceptors.xml
- Copy the following XML to custom-interceptors.xml:
l
<custom-interceptors>
<custom-interceptor>
<interceptor-class-name>com.singularity.KafkaMarkerMethodInterceptor</interceptor-class-name>
<match-class type="matches-class">
<name filter-type="equals">my-fully-qualified-class-name</name>
</match-class>
<match-method>
<name>my-method-name</name>
</match-method>
</custom-interceptor>
</custom-interceptors>
- Set the value of the class name to the name of your consumer class.
For instance, to specify the
MyConsumer class:
<match-class type="matches-class">
<name filter-type="equals">com.mycompany.mypackage.MyConsumer</name>
</match-class>
- Set the value of the method name to the name of your message processing loop method.
For instance, to specify the
pollMessages method:
<match-method>
<name>pollMessages</name>
</match-method>
After the Java Agent reads the updated configuration, it detects consumer activity and upstream Kafka queue. The application flow map shows the tier receiving data from the Kafka queue.