Understanding Streaming Query Metrics

Tharun Kumar Sekar
3 min readDec 10, 2023

--

To optimize a Streaming Pipeline, Streaming query metrics is the right place to begin your analysis.

For illustration purposes, I’m picking Kafka Topic as a source and Delta table as destination. Here is a sample Streaming Query Metrics and this can be found in the log4j file of the driver.

INFO MicroBatchExecution: Streaming query made progress: {
"id" : "8734d7c4-f46d-4e28-a7d6-5b4498ec9fc0",
"runId" : "848g9a09-8141-4a78-8c05-c2138c1b4e09",
"name" : "ingest-stream",
"timestamp" : "2023-11-30T09:03:00.000Z",
"batchId" : 2,
"numInputRows" : 43,
"inputRowsPerSecond" : 725573.7333333333,
"processedRowsPerSecond" : 5705.508014487084,
"durationMs" : {
"addBatch" : 7629498,
"getBatch" : 0,
"commitOffsets" : 203,
"queryPlanning" : 176,
"triggerExecution" : 7630245,
"walCommit" : 131
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[event-tree]]",
"startOffset" : {
"event-tree" : {
"1" : 143,
"0" : 149
}
},
"endOffset" : {
"event-tree" : {
"1" : 166,
"0" : 169
}
},
"latestOffset" : {
"event-tree" : {
"1" : 199
"0" : 199
}
},
"numInputRows" : 43,
"inputRowsPerSecond" : 725573.7333333333,
"processedRowsPerSecond" : 5705.508014487084,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[s3://datalake/bronze/event-tree]",
"numOutputRows" : -1
}
}
}

Few of the basic entries in this Metrics are

  • id — Streaming Pipeline’s id. This will not change across different runs.
  • runId — Unique ID of that individual run. This is expected to change during every restart.
  • batchId — Number of the micro-batch which is being processed.
  • numInputRows — Number of Records that were consumed in this micro-batch
  • processedRowsPerSecond — Number of Records that were processed per second.

Now let’s get into the metrics which will help us in understanding the processing time.

  • durationMs — This category contains all the time related information of the micro-batch
  • getBatch — Time taken to retrieve the metadata about the next micro-batch, like offsets. This doesn’t include reading the actual data. This value would mostly be very minimal.
  • walCommit — Time taken to commit the offset value to the checkpoint. This value also would be very minimal. This value would also be very minimal
  • queryPlanning — Time taken to generate the execution plan by spark. This value would also be minimal.
  • addBatch — Time taken to read, process and sent the data to the sink. This metric would always have a higher value since the processing time is measured here.
  • commitOffsets — Time taken to commit to the commit log file after processing the micro-batch. This value would also be minimal.
  • triggerExecution — This metric is the summation of all the above metrics.

The next category is about the Source (Kafka in our case).

  • description — Name of the topic from which we are reading the data.
  • startOffset — Category in which we display information related to the start offset from which we are reading the data.
  • event-tree — Topic Name
  • 0, 1 — partition Ids
  • The value displays the record number (offset) from which this micro-batch has started to read the data.
  • endOffset — The values displayed are the record number (offsets) until which the records has been read.
  • latestOffset — This displays the current latest record of the partition. If this value matches the endOffset, it means we have processed all records in the partition.

If you liked this article, click the 👏 so other people will see it here on Medium.

--

--