Inside League
Published in

Inside League

Streaming Data to BigQuery with Dataflow and Updating the Schema in Real-Time

Robert Delaunay, “Relief-disques,” 1936.

In our previous story, we saw how to stream data to Big Query and also add new columns when needed. This solution though is not really real-time, I think we can do better.

Another approach I’ve seen discussed online, but haven’t found any code samples, is this. We enable streaming inserts to Big Query using Dataflow, if the new data contain new fields, the insert is going to fail, then we get all the failed rows, detect the schema, update the schema in BQ and then re-insert.

A really simple pipeline that streams data to Big Query looks like this:

def run(argv):
with beam.Pipeline(options=pipeline_options) as pipeline:
realtime_data = (
pipeline
| "Read PubSub Messages" >>
beam.io.ReadFromPubSub(
subscription=options.input_subscription,
with_attributes=True)
| f"Write to {options.bq_table}" >>
beam.io.WriteToBigQuery(
table=f"{options.bq_dataset}.{options.bq_table}",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)

Now, if the PubSub message contains some new fields that are missing from Big Query, the beam.io.WriteToBigQuery , according to the documentation, is going to emit the failed rows to

beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS

So all we have to do is just read them. Group them in a small window (I use 1 minute), just to catch any other messages that happen to come at the same time, and re-insert them to Big Query

(
realtime_data[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| f"Window" >>
GroupWindowsIntoBatches(window_size=options.bq_window_size)
| f"Failed Rows for {table}" >>
beam.ParDo(ModifyBadRows(options.bq_dataset, options.bq_table))
)

BUT!

before we start testing there are a few gotchas.
1. we need to change beam.io.WriteToBigQuery retry policy to Never.

beam.io.WriteToBigQuery(
table=f"{options.bq_dataset}.{options.bq_table}",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER
)

2. The default GroupWindowsIntoBatches we find in Google’s documentation doesn’t work. Messages coming from BigQueryWriteFn.FAILED_ROWS are not timestamped, so we need to timestamp them

import timeclass GroupWindowsIntoBatches(beam.PTransform):def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self, pcoll):
return (pcoll
| 'Add Timestamps' >>
beam.Map(lambda x: beam.window.TimestampedValue(x, time.time()))
| "Window into Fixed Intervals" >>
beam.WindowInto(window.FixedWindows(self.window_size))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
)

Finally, to detect new schema we use the BigQuery Schema Generator, as the last time

class ModifyBadRows(beam.DoFn):

def __init__(self, bq_dataset, bq_table):
self.bq_dataset = bq_dataset
self.bq_table = bq_table

def start_bundle(self):
self.client = bigquery.Client()

def process(self, batch):
logging.info(f"Got {len(batch)} bad rows")
table_id = f"{self.bq_dataset}.{self.bq_table}"

generator = SchemaGenerator(input_format='dict',
quoted_values_are_strings=True)

# Get original schema to assist the deduce_schema function.
# If the table doesn't exist
# proceed with empty original_schema_map
try:
table_file_name =f"original_schema_{self.bq_table}.json"
table = self.client.get_table(table_id)
self.client.schema_to_json(table.schema, table_file_name)
original_schema_map =
read_existing_schema_from_file(table_file_name)
except Exception:
logging.info(f"{table_id} table not exists. Proceed without getting schema")
original_schema_map = {}

# generate the new schema
schema_map, error_logs = generator.deduce_schema(
input_data=batch,
schema_map=original_schema_map)
schema = generator.flatten_schema(schema_map)

job_config = bigquery.LoadJobConfig(
source_format=
bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
],
write_disposition=
bigquery.WriteDisposition.WRITE_APPEND,
schema=schema
)

try:
load_job = self.client.load_table_from_json(
batch,
table_id,
job_config=job_config,
) # Make an API request.

load_job.result() # Waits for the job to complete.
if load_job.errors:
logging.info(f"error_result = {load_job.error_result}")
logging.info(f"errors = {load_job.errors}")
else:
logging.info(f'Loaded {len(batch)} rows.')

except Exception as error:
logging.info(f'Error: {error} with loading dataframe')

And that’s it! Now our pipeline will stream the data to Big Query in real-time, and if we get a message that contains a field that we don’t have a column in Big Query:

  • that insertion will fail,
  • we’ll gather all failed rows and group them in a 1-minute window
  • our pipeline will automatically detect the new schema
  • update Big Query and
  • re-insert the failed rows

and all that without stopping the pipeline at all, messages that will arrive after the failed one will get inserted to Big Query.

You can find the full code here:
https://github.com/aFrag/PubsubToBigQuery/blob/main/StreamingPubsubToBQ.py

--

--

--

League is a platform technology company powering next-generation healthcare consumer experiences. Millions of people use solutions powered by League to access, navigate and pay for care. Interested in learning more about League? Visit www.league.com

Recommended from Medium

Setting Up an Ubuntu Web Server

Developing an Augmented Reality Hello World App with zero coding in Unity using ARFoundation and…

dojoDegen pre-road map 0.1

Getting permission denied (public key) on gitlab ? this is the way!

Kubernetes from Scratch — Part 3

🆕Tournament Announcement! League Of Legends Championship #16 🔥

Tutorial | KNOWLEDGE MAINLAND APPLICATION: MY CREDIT SCORES AND I

5 Ways Cmder Can Increase Your Productivity

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alex Fragotsis

Alex Fragotsis

Data Engineer @ League Inc.

More from Medium

Data Flow with Apache Nifi in Google Cloud Platform

Cloud Data Fusion: Update deployed pipelines through REST API

Incremental Batch update of PostgreSQL in Bigquery using Cloud Functions

Integrating Apache Pulsar with BigQuery