KafkaConsumer.poll

KafkaConsumer.poll() を使用して Kafka コンシューマ エントリ ポイントをインストゥルメント化するには、コンシューマがカスタムインターセプタ定義のループ内のメッセージを読み取るメソッドを特定します。各メッセージのビジネストランザクションを開始して終了するために、反復子の次のメソッドがインストゥルメント化されます。反復するメッセージに使用される反復子はさまざまなものが考えられますが、次のタイプの反復子のみがサポートされています。

  • kafka.consumer.ConsumerIterator
  • org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable$1
  1. Kafkaからメッセージを処理するループのクラスとメソッドを識別します。
    たとえば、以下のループを使用して Kafka からメッセージをポーリングして処理するクラス MyConsumer があるとします。
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    pollMessages(records);
    private void pollMessages(ConsumerRecords<String, String>records) throws Exception {
       //AppDynamics instrumentation gets applied here
    for (ConsumerRecord<String, String> record : records) {
       //Processing of the records
       System.out.println(record.value());
       }
    }

    この場合、次をインターセプトします。

    • クラス:MyConsumer
    • 方式:pollMessages
    また、このインターセプタは、ループだけでなく個々のレコードを処理するメソッドにも適用できます。例:
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    pollMessages(records);
    private void pollMessages(ConsumerRecords<String, String>records) throws Exception {
       //AppDynamics instrumentation gets applied here
    for (ConsumerRecord<String, String> record : records) {
       processRecord(record)
       }
    }
  2. 任意のテキストエディタを使用して、次のパスで custom-interceptors.xml という名前のファイルを作成および編集します。例:
    <agent_home>/<version_number>/conf

    例:

    /usr/home/appdynamics/appagent/ver4.3.1.0/conf/custom-interceptors.xml
  3. 以下のXMLをcustom-interceptors.xmlにコピーします:
    l
    <custom-interceptors>
        <custom-interceptor>
            <interceptor-class-name>com.singularity.KafkaMarkerMethodInterceptor</interceptor-class-name>
            <match-class type="matches-class">
                <name filter-type="equals">my-fully-qualified-class-name</name>
            </match-class>
            <match-method>
                <name>my-method-name</name>
            </match-method>
        </custom-interceptor>
    </custom-interceptors>
  4. クラス名の値をコンシューマクラスの名前に設定します。
    たとえば、MyConsumer クラスを指定するには、次のようにします。
    <match-class type="matches-class">
        <name filter-type="equals">com.mycompany.mypackage.MyConsumer</name>
    </match-class>
  5. メソッド名の値をメッセージ処理ループメソッドの名前に設定します。
    たとえば、pollMessages メソッドを指定するには、次のようにします。
    <match-method>
        <name>pollMessages</name>
    </match-method>
    Javaエージェントは構成の更新を読み取ると、コンシューマアクティビティとアップストリームKafkaキューを検出します。アプリケーションフローマップには、Kafkaキューからデータを受信するティアが表示されます。