How to Load Data from ElasticSearch to BigQuery?

Onur Taşhan
Trendyol Tech
Published in
2 min readDec 28, 2020
Image Source: https://hevodata.com/learn/elasticsearch-to-bigquery-2-easy-methods/

Hi,

When ElasticSearch was added to our Trendyol DWH data sources, we needed a code to import the data in this environment as a batch.

In principle, the pipeline is here in below; data will start to be written to the ElasticSearch, and there will be a timestamp column named creationDate in this data. Each new incoming data will contain a new timestamp value over this column. Sample data is as given below :)

Let’s take the data from ElasticSearch and write the data to BigQuery as an append, using python in our case.

In each batch operation, get the max value of the relevant delta column (creationDate) from the table in BigQuery. Let’s call this value our delta value.

We said that when we go to ElasticSearch, we give this delta value as a filter and return to values greater than this value.

On top of that, we prepared 2 Python files as follows.

params.py

Let me explain the code; while the code is running, it takes the config file path as a parameter.

  • get_params(yaml_file)
    This function parses the yaml file that we have given as parameters and whose content is as follows, and sets the required input values to variables.
elastic.yaml
  • run_delta_query(sql)
    This function sets the delta_value variable by running sql in the yaml config file.
  • search_elastic(hosts, user, pwd, index, delta_column, delta_value)
    This function takes the values set from yaml config file; queries the data in ElasticSearch and creates the returned data as a JSON file under the files folder.
  • load_files_to_bigquery(index, table_project, table_schema, table_name)
    This function takes the values set from yaml config file; uploads the json files under the files folder to your Bucket you created in Google Cloud Storage. It then loads the loaded files into the BigQuery table in append type. If the load process is completed successfully, it also cleans the files created.

Have you noticed line 142 in the code? I used the gsutil library here because the BigQuery Python library does not support parallel uploading when uploading files to Google Cloud Storage.

With this code, with batch operation; We were able to upload the data from ElasticSearch to our table in BigQuery. Besides, we continue our studies to make streaming processes from ElasticSearch to Kafka and from there to BigQuery. :)

Have a nice coding! :)

--

--