Uploading data to Firestore using Dataflow

Firestore is next generation of NoSQL databases on Google Cloud Platform adding various improvements over Cloud Datastore like real time updates, strongly consistent queries etc. It was recently promoted to General Availability, so it’s about time to dive more into it.

Firestore exists in 2 flavours. One is Native mode which resembles all features of new Firestore, whereas second one, Datastore mode is good for users who wants to continue using Datastore (it’s backwards compatible) and properties although under the hood in runs on Firestore.

Firestore offers data export/import through Cloud SDK, but what if you need initial bulk import to load data to Firestore?

Based on my previous example Uploading data to Cloud Datastore using Dataflow I wanted again to use Apache Beam running on Dataflow. Dataflow is great in that sense, that it’s sufficient to write code for pipeline and it automatically provision servers (workers) for batch job and when it completes, it cleans up everything.

Since Firestore is relatively new, not everywhere is natively supported for Firestore, which also relates to Apache Beam, but situation is not so dark. I’ll describe 2 ways which I used to upload bulk data with Beam to Firestore Native. Just to give context, I will be uploading 1.2 millions of records containing product information based on Best Buy dataset. Python (2.7) code is on Github.

1. Using datastoreio

Since there is no native Transformation to write data to Firestore, just out of curiosity I tried to use same the one for Datastore. In my example for Datastore I created custom DoFn to create Datastore entity which I used in this case as well.

Whole pipeline looks like this:

with beam.Pipeline(options=options) as p:
(p | 'Reading input file' >> beam.io.ReadFromText(input_file_path)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(),['sku', 'name', 'regularPrice', 'salePrice', 'type', 'url', 'image', 'inStoreAvailability'])
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Datastore' >> WriteToDatastore(PROJECT)
)

in CSVtoDict I am parsing csv lines to Python dictionary and CreateEntities Transformation looks like this:

class CreateEntities(beam.DoFn):
"""Creates Datastore entity"""
def process(self, element):
entity = entity_pb2.Entity()
sku = int(element.pop('sku'))
element['regularPrice'] = float(element['regularPrice'])
element['salePrice'] = float(element['salePrice'])
element['name'] = unicode(element['name'].decode('utf-8'))
element['type'] = unicode(element['type'].decode('utf-8'))
element['url'] = unicode(element['url'].decode('utf-8'))
element['image'] = unicode(element['image'].decode('utf-8'))
element['inStoreAvailability'] = unicode(element['inStoreAvailability'])

datastore_helper.add_key_path(entity.key, 'Product', sku)
datastore_helper.add_properties(entity, element)
return [entity]

Again in my project I am using Firestore in Native mode and it uploads data without any problems. Only unusual thing I noticed is that the key is altered, for example if I set id to be 1, in Firestore document id is __id1__, i.e. it adds before my id “__id” and after “__”, not sure why.

This approach feels little bit like cheating since as far as I understand it shouldn’t be possible to use Datastore API with Firestore Native (or vice versa), although all comes down to protocol buffers or maybe this is some transitional state, I would be curious for more in depth explanation.

Bottom line: If you can live with altered id and you are not uploading some crazy-complicated — multiple-times-nested documents, this should be fine.

Here is code example how it would look like creating with other supported field types

class CreateEntities(beam.DoFn):
"""Creates Datastore entity with different types of fields, just to verify what is possible"""

def process(self, idx):
entity = entity_pb2.Entity()
embed = entity_pb2.Entity() # embedded document
ss = {
'ss': unicode('X'), # string
'bb': False, # boolean
'll': [1, 2, 3], # list of integers
'ii': 123 # integer
}
datastore_helper.add_properties(embed, ss) # setting properties for embedded document
element = dict()
element['s'] = unicode('String') # string
element['b'] = True # boolean
element['l'] = [unicode('s'), unicode('a'), unicode('b')] # list of strings
element['i'] = 1 # integer
element['e'] = embed # embedded document
datastore_helper.add_key_path(entity.key, 'ds', idx) # setting id for document
datastore_helper.add_properties(entity, element) # setting properties for document
return [entity]

And in Firestore UI it looks like this:

Firestore document

2. Using Firestore Client library

When you don’t know something, you google it and if it’s related to software development, first answer comes from StackOverflow, which inspired me to create my own simple Transformation using Python’s Client library for Firestore.

class FirestoreWriteDoFn(beam.DoFn):
MAX_DOCUMENTS = 200

def __init__(self, project, collection):
self._project = project
self._collection = collection
super(FirestoreWriteDoFn, self).__init__()

def start_bundle(self):
self._mutations = []

def finish_bundle(self):
if self._mutations:
self._flush_batch()

def process(self, element, *args, **kwargs):
self._mutations.append(element)
if len(self._mutations) > self.MAX_DOCUMENTS:
self._flush_batch()

def _flush_batch(self):
db = firestore.Client(project=self._project)
batch = db.batch()
for mutation in self._mutations:
if len(mutation) == 1:
# autogenerate document_id
ref = db.collection(self._collection).document()
batch.set(ref, mutation)
else:
ref = db.collection(self._collection).document(mutation[0])
batch.set(ref, mutation[1])
r = batch.commit()
logging.debug(r)
self._mutations = []

This is inspired by datastoreio module which is a bit more complex and robust, i.e. it has features like retries, auto adjusting write based on latency etc.

Although Firestore supports maximally 500 documents per batch write or 10MB per API call, I am using 200 just to be safe. in _flush_batch method, I added case when document is uploaded without id, although I didn’t use it.

Whole pipeline has same steps as previous with few changes under the hood:

with beam.Pipeline(options=options) as p:
(p | 'Reading input file' >> beam.io.ReadFromText(input_file_path)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(),
['sku', 'name', 'regularPrice', 'salePrice', 'type', 'url', 'image',
'inStoreAvailability'])
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Firestore' >> WriteToDatastore(PROJECT)
)

And in CreateEntities Transformation I extract data and prepare for write:

class CreateEntities(beam.DoFn):
"""Creates Datastore entity"""

def process(self, element):
document_id = unicode(element.pop('sku'))
element['regularPrice'] = float(element['regularPrice'])
element['salePrice'] = float(element['salePrice'])
element['name'] = unicode(element['name'].decode('utf-8'))
element['type'] = unicode(element['type'].decode('utf-8'))
element['url'] = unicode(element['url'].decode('utf-8'))
element['image'] = unicode(element['image'].decode('utf-8'))
element['inStoreAvailability'] = unicode(element['inStoreAvailability'])
return [(document_id, element)]

Here is detail from Dataflow job.

Firestore Dataflow

Whole job took 14–15 minutes (including provisioning). Dataflow cost 0.12$ and Firebase 2.26$ (~1.2M write operations). Workers were increasing to the number of 19 before job completion. Time is similar as with my Datastore upload job, although in this case it used less number of workes (19 apposed to 60 for Datastore job).