Async event scheduling and validation using GCP Cloud tasks

Alekhya Chowdhury
5 min readNov 19, 2023

--

Use case:

Lets consider a requirement for a pathology lab. They collect blood sample from patients and as a part of their business offering they have promised to deliver reports within a pre-defined amount of time, failing which they are going to do the test for free. We want to make sure that the admin is notified that there can be a potential violation of the delivery dead line so that penalties can be avoided.

As a part of their current setup , once a blood sample has been collected an event is generated , lets call this event as “collected”, similarly once the corresponding report has been delivered another event is generated as “delivered”.

The corresponding json payoads for 2 patients with patients ids as p_id_1 and p_id_2 for these events are as shown below.

For Patient 1(Success scenario)
Collected
{
"patient_id":"p_id_1",
"status":"collected",
"test_id" : "test_1",
"timestamp":"14-11-2023 07:29:46"
}

Delivered
{
"patient_id":"p_id_1",
"status":"delivered",
"test_id" : "test_1",
"timestamp":"15-11-2023 06:15:00"
}

For Patient 2(Error scenario)
Collected
{
"patient_id":"p_id_2",
"status":"collected",
"test_id" : "test_1",
"timestamp":"16-11-2023 08:45:20"
}

The events are linked via patient_id and test_id abd are streamed via a pub-sub topic to GCP, for further processing.

Design:

The mechanism to handle the given requirement is shown below

(1) As the events arrive in the Receiver Pub Sub topic a push subscription sends the data to a cloud function .

(2) The first cloud function stores all incoming events in Datastore

(3) It also looks at the event and based on the status field, filters all “collected” events and pushes them to Cloud task queue along with a pre-defined schedule . This schedule should be less than the deadline promised for delivery.
(For this POC lets take the actual deadline as 90 seconds ,hence we will set the schedule for 60 seconds)

(4) The second cloud function is triggered via the events buffered in the the cloud task queue after the pre-defined schedule passes . It queries the data store using the patient_id , task_id and status as filtes and checks whether we have received the “delivered” event .

(5) If the delivered event is present it means that there is no risk of violating the deadline and hence no action needs to be taken . On the other hand if Delivered event is not found an alert is triggered to the admin for proactive resolution or investigation for the delay.

Implementation:

As a first step I had created the cloud task push queue (shown below). This will act as the buffer and will hold the “invocation” for 60 seconds

We can set the maximum throughput using maximum dispatches (Max Rate) while the number of parallel task execution is set by Max Concurrent

We can also configure various retry parameters for tasks arriving in this queue

Cloud function 1:

There are two parts to the 1st cloud function,

send events to Cloud task queue whenever we receive a “collected” event

store events in Datastore

Part 1

The below code is used to send data to cloud task queue . We need to mention the project id where the cloud task queue is hosted, the region where the queue has been created, the queue name, 2nd cloud function url that needs to be invoked for processing the task.

For creating the task we also need to mention the payload to be sent to the cloud function url and the http method (post).

scheduled_seconds_from_now” indicates the wait time of the message in the task queue , here we have taken 60 seconds. This is added to the current time and converted to protobuf format . It is then added to the task

Finally we invoke the task queue and send the task using the client

This is how a tasks looks like once its enqueued

Part 2

The second part of the cloud function is storing the events in Datastore so that we can retrieve the events and verify whether “delivered” event has arrived after 60 seconds. Here we are using the patient_id and test_id along with a 10 char uuid to create the unique entity key for datastore.

The events are stored as shown below (for patient p_id_1 both events are present while for patient p_id_2 only collected event is present)

Cloud function 2:

The job of the second cloud function is to verify whether we have receive the “delivered” event within the predefined SLA of 60 seconds. It does this by querying the datastore using patient_id , test_id and status as filter.

Based on whether the “delivered” event is found or not a log is printed, we can use log sinks to look to a specific patterns and emit alerts

We need to define a composite index on patient_id , test_id and status in the datastore for the below query to work

--

--

Alekhya Chowdhury

Senior cloud engineer (GCP , OCI) , IKEA (Ingka) || ex-Capgemini || ex-Wipro | | ex-IBM , keen to take up challenges and an eager learner