How long does Google Dataflow pick and process Pub/Sub messages in “real time”?

Dong Sun
Google Cloud - Community
3 min readAug 18, 2020

(A test and its results)

Introduction

Organizations use Google Dataflow to process and feed the data to IoT devices for control or monitor the devices. Different use cases require different latencies. This test is trying to get some idea about how long it takes for Google Dataflow to pick and start processing each Pub/Sub element, in the given test environment.

Quick disclaimer: By no means this is a baseline assessment.

Test overview

Test steps:

  1. Create a Pub/Sub topic and send data to the topic as mock stream data. Data content is the timestamp of when the message is published.
  2. Dataflow reads from Pub/Sub, processes each element, writes process timestamp to BigQuery. If you don’t want to write to BigQuery directly, you can write to StackDriver log, and create a sink for the log later.
  3. For latency calculation, run a query in BigQuery for time difference.

Test Environment: Python.

Test Architecture:

Test detailed steps:

  1. If you haven’t set up the python running environment to run Dataflow jobs, reference this link and set it up.

2. Send mock stream data to a Pub/Sub topic:

# create a test topic
gcloud pubsub topics create TOPIC_NAME
# use gcloud scheduler to run publisher job every second (mock stream data) that contains the timestamp when publish job run
gcloud scheduler jobs create pubsub publisher-job \
- schedule="* * * * *" \
- topic=TOPIC_NAME - message-body=`date +%s`

3. Dataflow processes and logs information into BigQuery:

To record what time the Pub/Sub data element got picked up, we save the beam.DoFn.TimestampParam value.

class LatencyFn(beam.DoFn):
def process(self, element, publish_time=beam.DoFn.TimestampParam):
yield {
"msg_publish_time": json.loads(element),
"msg_process_time": round(time.mktime(dt.datetime.now().timetuple()))
}

Then write the time info to a BigQuery table. Snippet is in the following:

def run(input_topic, output_topic, pipeline_args=None):    
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
with beam.Pipeline(options=pipeline_options) as p:
msg = (p | "ReadPubSubMsg" >>
beam.io.ReadFromPubSub(topic=input_topic)
| "GetMsgInfo" >> beam.ParDo(LatencyFn())
)
msg | 'WriteBQ' >> beam.io.WriteToBigQuery(
table = 'test.test_latency_info',
schema = 'msg_publish_time:STRING,
msg_process_time:STRING',
create_disposition =
beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition =
beam.io.BigQueryDisposition.WRITE_APPEND
)
...

4. Deploy the dataflow job from the console.

python test.py - project=PROJECT_NAME - job_name=test - input_topic=projects/PROJECT_NAME/topics/TOPIC_NAME - runner=DirectRunner

5. Once the Dataflow starts running, it will read each Pub/Sub message and write time info to the BigQuery table.

6. Go to BigQuery and analyze logged data in a query like:

SELECT
MIN(latency) as min,
MAX(latency) as max,
approx_quantiles(latency, 100)[offset(50)] as p50,
approx_quantiles(latency, 100)[offset(90)] as p90,
approx_quantiles(latency, 100)[offset(99)] as p99
FROM
(SELECT cast(msg_process_time as int64) - cast(msg_publish_time as int64) AS latency
FROM `PROJECT_ID.DATASET_ID.TABLE_ID`)

Test Result:

From the time Cloud Scheduler sent data to Pub/Sub, to the time Pub/Sub processed the data element, the above query shows the following result in milliseconds:

To present the info, we can add the latency info to (say) Data Studio, for visualization and drill down further. It helps visually identify unusual latency spikes.

This flow could be expanded to as many Dataflow processing steps as possible.

--

--