Upgrading Data Flow Pipeline @ CARS24

Rajesh Dhanda
CARS24 Data Science Blog
3 min readJan 10, 2023

This blog is a continuation of the primary blog and focuses on upgrades to optimize cost and execution time. A read through the previous blog is required to understand the context of this.

previous workflow

Paying for idle CPU

In our previous workflow, we sent requests to cloud functions through a pub-sub mechanism. The Snowflake connector within the cloud function executed the query synchronously, retrieved data, and uploaded it to the GCS bucket.

However, in Snowflake, the query duration consists of compile time, Queue overload time, execution time, and more. For a synchronous query submission, the Snowflake connector in the cloud function simply waits for the query to complete, resulting in GCP charges for idle CPU. Assuming an average total query duration of just 20 seconds in Snowflake, this would result in approximately 83 hours of extra charges per day for 15,000 jobs.

Async query submission

To avoid waiting for query completion, we can instead send async queries to Snowflake. Async query submission returns a query id associated with the submitted job. This query id can later be used to check the status of the job and fetch the data. Consequently, we split our cloud function (Snowflake to GCS) into two parts :

Query ID generator

Async query submission, retrieval of query id, and saving into EdgeDB.

Query ID data fetcher

Given a query id, examine the QueryStatus and retrieve data from Snowflake and save it to the GCS bucket.

We require an additional dag to routinely harvest all recently submitted query ids from EdgeDB and transmit them to the Query ID data fetcher cloud function.

Still paying for idle CPU

Submitting Asynchronous Queries to Snowflake is a good idea, but it is too fast (subjective to use cases). Submitting lots of async queries with a limited number of max active warehouse instances in Snowflake can lead to an increased queue on the warehouse followed by lots of queue timeout resulting in lots of job failures. It is now time to reduce/control the query submission rate on the Snowflake, which can be accomplished by :

  1. Decrease the Maximum number of instances in Auto-scaling of cloud function. we did it to 1 but still too fast for our use case.
  2. Control the requests submission rate to pub-sub in dag eventually leading to delays in google sheets update. ( leading to max active runs in dags resulting in abrupt requests submission rate and delay in jobs )
  3. time.sleep(1.5) in cloud function, ~6.25 hours of idle CPU for 15k jobs.

Since a minor rise in query submission rate causes many queue timeouts and a slight drop causes an increase in idle CPU time, we are once more faced with this problem. Although the cost was decreased, we still wished to do so as much as possible.

Migrate the Query ID generator

Snowflake — GCS flow

The SHOW WAREHOUSES query provides information on the current number of queued queries in the warehouse and does not require an active warehouse to execute. To optimize the usage of resources, the strategy was to migrate the Query ID generator from a cloud function to an Airflow DAG, enabling live monitoring of queued queries in the warehouse at regular intervals. By implementing a minor modification to the Query ID generator, a queue threshold was established for each Query Submission Warehouse, allowing for the submission of queries in batches when the warehouse queue dropped below the threshold. This approach also facilitated the definition and control of query submission rates to the warehouse.

Conclusion

These upgrades assisted us in lowering costs by improving monitoring and control over the warehouse load. Recent EdgeDB updates that enabled EdgeDB UI also assisted us in better monitoring of jobs data.

Authors: Rajesh Dhanda, ML Engineer, Vijendra Maurya, Data Engineer @ CARS24, Swapnesh Khare, Senior ML Engineer @ CARS24, Abhay Kansal, Staff Data Scientist @ CARS24

--

--