透過 Jaeger with OpenTelemetry 追蹤 Kafka 資料傳遞路徑與運行狀況

林鼎淵
Dean Lin
Published in
19 min readNov 23, 2022

--

我們可以把 Kafka 當成傳遞消息的「轉運站」,但有時就算 Consumer 順利拿到 Producer 的消息,我們也無法保證接下來的運作都符合預期。

而且會用到 Kafka 的系統,大多以微服務(Microservices)為主;若沒有搭配合適的追蹤系統,我們很難排查 Consumer 收到消息後發生的錯誤。

這篇文章以實作和邏輯講解為主,如果想了解 Jaeger 這個分布式追蹤系統,請參考連結;想了解 Kafka 這個分布式消息串流平台,請參考連結1連結2

建議讀者直接到筆者的 Github Clone 一份下來跑跑看,會更好理解裏面的邏輯(順手按個 ⭐ 更好 🤩)。

大綱

一、自定義追蹤的 Parent & Child 關聯(Manually propagating span context)
二、使用 Kafka 作為傳遞資料的轉運站
三、在 Local 建立 Node.js 專案來達成目標
四、啟動專案,透過 Jaeger UI 確認 Trace 路徑是否符合預期

ㄧ、自定義追蹤的 Parent & Child 關聯(Manually propagating span context)

其實 OpenTelemetry 有提供許多 Instrumentation 的方案,如:

透過上面的套件可以自動產生追蹤紀錄,但有時我們只需要追蹤特定路徑,而使用上面的套件會產生許多不需要的資訊,還容易產生效能瓶頸。

比如使用「instrumentation-http」這款套件時,它會把所有 http 相關的操作都記錄下來,但我們實際上只關心其中的幾個路徑。

因此在某些情況下,自定義追蹤的 Parent & Child 關聯是有必要的;下面筆者擷取關鍵的程式碼來做說明:

➤ 透過 W3CTraceContextPropagator 把 span 資訊 inject 到 carrier

let span = tracer.startSpan('example');
const carrier = {};
const propagator = new W3CTraceContextPropagator();
propagator.inject(
api.trace.setSpanContext(api.context.active(), span.spanContext()),
carrier,
api.defaultTextMapSetter,
);
span.end()

➤ 透過 W3CTraceContextPropagator extract carrier 資訊,使其成為能使用的 Context

const propagator = new W3CTraceContextPropagator();
// 解析 carrier
const parentCtx = propagator.extract(api.context.active(), carrier, api.defaultTextMapGetter);
// 設定 parent 關聯
const span = tracer.startSpan(`report_consumer`, undefined, parentCtx);
span.end()

二、使用 Kafka 作為傳遞資料的轉運站

假使今天有個功能是「產出報表」,但它的執行時間長達「3 分鐘」;我想正常的使用者是不會等一隻 API 回傳等這麼久,針對這個議題,我們不妨改個思路:

SETP 1:在報表的 Table 新增一個「status」欄位,用來表達報表建立的狀況(init、processing、done、fail)。

SETP 2:前端呼叫產出報表的 API 時

  • 在報表的 Table 新增初始資訊,並將 Status 設定為「init」。
  • 將相關資訊透過 Producer 塞入 Kafka。

SETP 3:前端收到 API 回傳的報表 id 後

  • 透過 Consumer 監聽報表執行的狀況;可以用報表的 id 作為 Topic 命名邏輯,ex:reports/{id}。

SETP 4:監聽產出報表的 Consumer 從 Kafka 收到命令後

  • 透過 id 從資料庫撈出對應的報表資訊,將 Status 設定為「processing」。
  • 呼叫相關服務產出報表,如果其中一個服務發生錯誤,就將 Status 設定為「fail」。
  • 如果報表順利產出,就將 Status 設定為「done」。

對 Status 的任何變動,都會透過 Producer 將資訊傳遞到 reports/{id} 這個 Topic,讓前端監聽的的 Consumer 可以作出對應的變動。

上面是比較完整的業務邏輯,建議讀者可以自行實做看看,下面筆者就單純示範透過 Kafka 傳遞 carrier 資訊的部分:

➤ 在 Producer 的 value 放上 carrier 的資訊(記得要轉為字串)

await producer.connect()
await producer
.send({
topic,
messages: Array({
key: `report`,
value: JSON.stringify(carrier)
})
});

➤ 從 Consumer 收到的 message 中取出 carrier 的資訊

await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log("topic:" + topic + " message:" + message.value)
const carrier = JSON.parse(message.value)
},
})

三、在 Local 建立 Node.js 專案來達成目標

專案的目標就如同標題,我們要透過 Jaeger with OpenTelemetry 追蹤 Kafka 資料傳遞路徑與運行狀況。

STEP 1:建立專案、安裝必要套件。

mkdir -p kafka-jaeger-demo
cd kafka-jaeger-demo
mkdir -p deploy/kafkaCluster/kraft

npm init -f

npm install --save @opentelemetry/api
npm install --save @opentelemetry/resources
npm install --save @opentelemetry/semantic-conventions
npm install --save @opentelemetry/sdk-trace-base
npm install --save @opentelemetry/exporter-jaeger
npm install --save axios
npm install --save express
npm install --save kafkajs

SETP 2:新增「docker-compose.yml」貼上如下程式,並將 docker 啟動。

version: '2.0'
services:
jaeger:
image: jaegertracing/all-in-one:1.38 # 當時用 1.39 或 latest 有出現一些 bug,故選用此版本
environment:
- COLLECTOR_ZIPKIN_HOST_PORT=:9411
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14250:14250"
- "14268:14268"
- "14269:14269"
- "9411:9411"
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
# 這是設定資料要存在哪路的路徑
- ./deploy/kafkaCluster/kraft:/bitnami/kafka:rw

接著輸入指令拉下並啟動 Kafka & Jaeger 環境。

docker-compose up -d

STEP 3:新增「tracing.js」貼上如下程式,基本上就是「Opentelemetry」的基礎運用。

'use strict';

const opentelemetry = require('@opentelemetry/api');
const { Resource } = require('@opentelemetry/resources');
const { SemanticResourceAttributes } = require('@opentelemetry/semantic-conventions');
const { BasicTracerProvider, BatchSpanProcessor } = require('@opentelemetry/sdk-trace-base');
const { JaegerExporter } = require('@opentelemetry/exporter-jaeger');

module.exports = (serviceName) => {
const provider = new BasicTracerProvider({
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: serviceName,
})
});

let exporter;
exporter = new JaegerExporter({ endpoint: "http://127.0.0.1:14268/api/traces" });

provider.addSpanProcessor(new BatchSpanProcessor(exporter));

// Initialize the OpenTelemetry APIs to use the NodeTracerProvider bindings
provider.register();

return opentelemetry.trace.getTracer('my-service-tracer');
};

STEP 4:新增「service.js」貼上如下程式;我們在這裡建立供使用者呼叫的 API(/api/report),當 API 被呼叫後會開始追蹤,並透過 Producer 將資料塞入 Kafka。

const api = require('@opentelemetry/api');
const tracer = require('./tracing')('service');
const express = require("express");
const PORT = "8080";
const app = express();
const { Kafka, logLevel } = require('kafkajs')
const kafka = new Kafka({
logLevel: logLevel.ERROR,
brokers: [`localhost:9092`],
clientId: 'example-producer',
})
const producer = kafka.producer()
const topic = 'report'
const { W3CTraceContextPropagator } = require('@opentelemetry/core');

async function report(parentSpan) {
try {
const ctx = api.trace.setSpan(
api.context.active(),
parentSpan
);
let span = tracer.startSpan('report_producer', undefined, ctx);
const carrier = {};
const propagator = new W3CTraceContextPropagator();
propagator.inject(
api.trace.setSpanContext(api.context.active(), span.spanContext()),
carrier,
api.defaultTextMapSetter,
);
await producer.connect()
await producer
.send({
topic,
messages: Array({
key: `report`,
value: JSON.stringify(carrier)
})
});
span.end()
} catch (e) {
console.log(e)
throw e
}
}
app.get("/api/report", async (req, res) => {
try {
let span = tracer.startSpan('report_request');
let result = await report(span)
span.end()
res.send(result);
} catch (e) {
res.send(e);
}
});

app.listen(parseInt(PORT, 10), () => {
console.log(`Service listening for requests on http://localhost:${PORT}`);
});

STEP 5:新增「consumer.js」貼上如下程式;專門監聽「report」這個 Topic,在收到 message 後會開始後續的處理(有刻意設計一個執行錯誤的結果)。

const api = require('@opentelemetry/api');
const tracer = require('./tracing')('consumer');
const { W3CTraceContextPropagator } = require('@opentelemetry/core');
const { Kafka, logLevel } = require('kafkajs')
const kafka = new Kafka({
logLevel: logLevel.INFO,
brokers: [`localhost:9092`],
clientId: 'example-consumer',
})
const topic = 'report'
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log("topic:" + topic + " message:" + message.value)
const carrier = JSON.parse(message.value)
const propagator = new W3CTraceContextPropagator();
const parentCtx = propagator.extract(api.context.active(), carrier, api.defaultTextMapGetter);
if (topic === "report") {
const span = tracer.startSpan(`report_consumer`, undefined, parentCtx);
report(span)
span.end()
}
},
})
}

function report(parentSpan) {
const parentCtx = api.trace.setSpan(
api.context.active(),
parentSpan
);
let span = tracer.startSpan('handle_report', undefined, parentCtx);
for (let i = 0; i < 10; i++) {
doWork(span, i);
}
span.end();
}
function doWork(parentSpan, i) {
const parentCtx = api.trace.setSpan(
api.context.active(),
parentSpan
);
let span = tracer.startSpan(`doWork:${i}`, undefined, parentCtx);
// 讓每個執行的時間不一樣
for (let i = 0; i <= Math.floor(Math.random() * 40000000); i++) {
// empty
}
if (i === 5) { // 產生錯誤訊息
span.recordException(new Error(`doWork:${i} error`));
span.setStatus({ code: api.SpanStatusCode.ERROR, message: "Something wrong!" });
span.addEvent('log', {
'log.severity': 'error',
'log.message': 'doWork:${i} error'
});
}
span.end();
}

run().catch(e => console.error(`[example/consumer] ${e.message}`, e))

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
// 收到無法處理錯誤時的處理
errorTypes.forEach(type => {
process.on(type, async e => {
try {
console.log(`process.on ${type}`)
console.error(e)
await consumer.disconnect()
process.exit(0)
} catch (_) {
process.exit(1)
}
})
})
// 收到中斷訊號時的處理
signalTraps.forEach(type => {
process.once(type, async () => {
try {
await consumer.disconnect()
} finally {
process.kill(process.pid, type)
}
})
})

四、啟動專案,透過 Jaeger UI 確認 Trace 路徑是否符合預期

建議使用有分頁功能的終端機(如 iTerm2),這樣比較好觀察與操作。

STEP 1:在專案根目錄下輸入 node service.js 啟動 Service。

STEP 2:在終端機貼上指令 curl http://localhost:8080/api/report 來呼叫 API。

STEP 3:在專案根目錄下輸入 node consumer.js 啟動監聽模式;這邊提醒一下,如果監聽不存在的 Topic 會報錯。

STEP 4:到 Jaeger UI 上(http://localhost:16686/search)確認 Trace 路徑是否符合預期。

上面是呼叫 API 時留下的紀錄

下圖「report_producer」跟「report_consumer」之間的空白,就是資料在 Kafka 轉運站傳遞的時間。

我們可以看到 trace 中每個 span 執行的狀況(這邊能看到錯誤是出現在 dowork5 的 span 上)

範例到這裡告一段落,希望文中 Jaeger 與 Kafka 的組合應用,有幫讀者打開不一樣的思路。

▶︎ 相關技術文章

善用分布式追蹤系統,幫你找出程式的效能瓶頸與問題影響範圍
帶你了解 Kafka 這個分布式消息串流平台,以及它能解決什麼問題!
手把手帶你建立 Node.js 專案 & Kafka 環境
透過 Jaeger with OpenTelemetry 追蹤 Kafka 資料傳遞路徑與運行狀況(本篇)


▶︎ 如果這篇文章有幫助到你

1. 可以點擊下方「Follow」來追蹤我~
2. 可以對文章拍手讓我知道 👏🏻

你們的追蹤與鼓勵是我繼續寫作的動力 🙏🏼

▶︎ 如果你對工程師的職涯感到迷茫

1. 也許我在iT邦幫忙發表的系列文可以給你不一樣的觀點 💡
2. 也歡迎您到書局選購支持,透過豐富的案例來重新檢視自己的職涯

--

--

林鼎淵
Dean Lin

職涯中培育過多名工程師,🧰 目前在外商公司擔任 Software Specialist |✍️ 我專注寫 (1)最新技術 (2)團隊合作 (3)工程師職涯的文章,出版過 5 本專業書籍|👏🏻 如果對這些主題感興趣,歡迎點擊「Follow」來關注我~