Avoid HTTP requests duplicates in Apache Beam with SCIO, a custom BaseAsyncDoFn and State and Timers

Alberto López Serna
9 min readApr 24, 2024

--

Introduction

We must migrate an on-premises microservice (mediation-service) application that:

  • is implemented in Spring (living in Openshift).
  • reads: records from Kafka.
  • output: HTTP Requests to a notification hub endpoint.
  • keeps state (historical results) in a In Memory Data Grid (IMDG) Hazelcast cluster (living in Openshift).
  • scales horizontally increasing its number of PODs.
Image 1: old mediation-service

NOTE: we do not have a Hazelcast Enterprise cluster due to extra costs avoidance. Thus, on top of the Hazelcast cluster we need an extra “storage logic layer”, as PVC in Kuberentes or as decoupled topic layer in Kafka to repopulate the state, when a disaster recovery or mainteance restart in new regions come about. This is managed by an additional hazelcast-manager microservice.

  • Could this design be simplified when migrating into GCP?
  • Could we use a distributed Big Data processing engine like Dataflow to undertake massive Async HTTP Requests?
  • What about keeping the state of this sent HTTP Requests to avoid duplicates?

Let’s move on to the next section!

Problem statement

Some of you may be familiar with mapWithState in SparkStreaming (https://www.databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html), keeping the state of elements among windows. Apache Beam has also a really cool and even more powerful pattern https://beam.apache.org/blog/timely-processing/ called State and Timer (S & T) widely used in the industry and with some interesting underlying infrastructure when using the Dataflow runner. I would encourage you to go through some of the Beam Summit talks in the last section to figure it out.

  • How would it be possible using a S & T through an Async ParDo? How would we attach a HTTP Client? How would it scale?
  • How can we achieve this using SCIO Scala for Apache Beam?
  • Would this be cheaper than managing infrastructure, replicas, reloading historical notifications after a new node is up or shutdown?
  • Which limitations are we facing with streaming, windowing and S & T when asynchronous HTTP calls are involved?

If you want to figure some of these questions out, you are in the right place.

Some data streaming challenges

Duplicated data in the “distributed/BigData” world is a reality, especially if your pipeline is using Kafka at some stage (rebalancing still coming about), reprocessing old transactions by mistake from your origin (e.g: with a
ChangeDataCapture from DB2), failed ACK in a microservice when publishing into Pub/Sub, and so on. Sometimes it is okay to deal with them, but your pupils can get into proper autofocus mode after some
duplicated DEBIT or BILLS push notifications that are just popping up on your smartphone’s screen, not cool at all…

At some point in your pipeline you must drop the duplicated processed data or prevent them from being sent at least, but what about if you have sent millions of notifications already? In addition, you need to face a high throughput of pushes to be delivered (end of the month bills, black friday, holidays, etc) and low latency to be resolved before being sent…
SOLUTION? You need to keep the state externally or maybe internally…???

Internally you can easily face issues, such as out of memory headaches and keeping the state in distributed nodes, all can be challenging. Vertical scalability is actually available in Dataflow Prime, but let’s explore other options.

Externally you have some databases widely used in the distributed world like Cassandra, HBase (Bigtable), Cache (Redis, Memorystore), In Memory Data Grid (IMDG Hazelcast), etc… Each of them have different pros and cons (not the purpose of this post), but the main drawback on the cloud is, I am sure you can guess: COST. Furthermore, if you need to deploy Regional or Multi-Regional instances this cost will proportionally increase.

Just give me the keywords, please:

  • State?
  • Time To Live (Timer) ?
  • Scalability?
  • High throughput?
  • Low latency look-ups?
  • Serverless?

S & T pattern might look like a good fit then! let’s get going 😎

Design

but, what about achieving the “Async” parallelism?

Modelling

  1. Using Avro (as new/historical notifications): MyEventRecord.java
  2. HTTPResponse and HTTPRequest: model
  3. idempotent_key in MyEventRecordUtils.scala

Then, is there a way to keep the HTTP requests saved as S & T entries coming from a custom HTTP client? is this implemented? not really…but, let’s get our hands dirty!

Flow

This diagram represents the ingestion and processing flow of the BER notifications, from ingestion to delivery to the HTTP endpoint through its state management within the S & T:

Image 2: mediation_service_high_level_design
  1. Reading **historical_notifications** from Google Cloud Storage (GCS) as OPTION 1. OPTION 2, from Pub/Sub, alternative one (to be implemented)
  2. Ingestion and pre S & T Flow. Notifications (BERs) are Globally windowed and keyed by _idempotent_key_:
    - 2.1: Reading **new_notifications** from Pub/Sub
    - 2.2: Treating **historical_notifications**, SideInput approach is taken (as long as TTL is applied) for discarding duplicates from **new_notifications** against **historical_notifications**.
    - 2.3: KO inValidBers method as toxic notifications in GCS.

NOTE 1: make sure you can fit in all **historical_notifications** from GCS as SideInput in your workers, allocating enough memory.

NOTE 2: A unionAll with Bounded “historical” (GCS) and Unbounded “fresh” (Pub/Sub), has been discarded, as it stalls the process (only first emitted Pane from Pub/Sub is emitted) despite GroupByKey (GBK) when applying state (for State And Timer), and using GlobalWindow (other Triggers were included but only first Pane Pub/Subrecords were shown). As mentioned in point 1, potential OPTION 2, might be useful when reloading **historical_notifications** into Pub/Sub and then loading them into the State and Timer, thus, all new notifications and historical will be wrapped seemingly as “unbounded”, applying S & T for **historical_notifications**.

3. applyKVState to Flush Futures. Saving attempted notifications BERs (**new_notifications**) in KV as State and release them when Timer expires (TTL): avoiding duplicates (race conditions are mitigated as distinctByKey is previously applied).

4. Checks if the **new_notifications** have been already processed and keeps state by idempotent key, add Future
- 4.1: processElement (ParDo) in StateBaseAsyncDoFn.java.
- 4.2: StateAsyncParDoWithHttpHandler.scala: sending HTTP request.
- 4.3: flush() in StateBaseAsyncDoFn.java: Future [Http Response] is asynchronous and StateBaseAsyncDoFn.scala will deal with it (outputting to main routine).

5. Output HTTP Response is emitted in the main routine.

6. Now you can save your HTTP Response/NOTIFICATION_RESPONSE along with the “sent” **new_notification** in Pub/Sub (e.g: sinking in GCS eventually, useful for analytics as external table alongside the historical load as **historical_notifications** if mediation-service is restarted).

Implementation

StateBaseAsyncDoFn

This class StateBaseAsyncDoFn.java is mainly based on the implementation of SCIO’s BaseAsyncDoFn.java but abstracting out some methods, adding a Timer(TTL) along with a BagState (buffer). All of these are needed to use the S & T pattern, preventing duplicated HTTP Requests from being sent:

Thus, the notifications can be loaded in the “State” as initialLoad or if they have not been sent. The Time To Live(TTL) can be set up as desired in the implementations or in settingElementTTLTimer.

An implementation of this abstract class is shown here StateAsyncParDoWithHttpHandler.scala

It must be stated, that SCIO provides some options to deal with similar “caching” scenarios, please refer to: https://spotify.github.io/scio/releases/migrations/v0.8.0-Migration-Guide.html#async-dofns , as BaseAsyncLookupDoFn.java
has a type parameter for some cache implementation, plugging in whatever cache supplier you want. E.g: a com.google.common.cache.Cache, having it for handling TTL. Although, a concern here would be the scalability, therefore, Dataflow Vertical autoscaling feature might come into place, tackling ingestion peaks.

This SCIO caching capability along with a workaround for adding something similar to the StateAsyncParDoWithHttpHandler.scala using a S & T pattern, has been discussed as a potential future enhancement here: https://github.com/spotify/scio/issues/5055

HTTPClient

“Bring your own HTTP Client”, like akka or zio here in clients. Implement it as AbstractHttpClient, and you can include it as **httpClient** :

within the StateAsyncParDoWithHttpHandler.scala class.

ZIORetry

Some retry mechanism has been implemented using ZIO retry for Scala: https://zio.dev/reference/schedule/retrying/, so that we can avoid Dead Letter Queuing or Retry topic patterns. Retry is achieved at process level as shown below in StateAsyncParDoWithHttpHandler.scala

Testing

StateAsyncParDoWithHttpHandler

Understanding the main purpose and design of the State and Timer pattern, keeping the idempotent state for MyEventRecord.scala notifications, before sending them as MyHttpRequest.scala, just run in
Intellij: MediationServiceSpec.1 OK and 2 SENT_OR_DUPLICATED HTTP_RESPONSE should exist

HINT 1: it uses the DirectRunner in your local machine (do not worry about not having a GCP project).

HINT 2: Pub/Sub emulator in Java needs this workaround https://cloud.google.com/pubsub/docs/emulator#pubsub-emulator-java. I have not got it sorted (yet). In the meantime we can just get by with TestStream.

StressTest

Some “Stress testing” has been undertaken with Mock data sets (with different idempotent_key), peaking more than +200K notifications / min with the current AkkaHttpClient config, forcing the application to scale up and keeping approximately a million notifications saved as “TTL” State:

akka {
max-open-requests = 20000
max-open-connection = 20000
initial-timeout = 30
completion-timeout = 60
buffer = 20000
throttle-requests = 1000
throttle-per-second = 1
throttle-burst = 1000
}

Bearing in mind that the same Mock data sets have been used in all scenarios, it is worth noting (with all e2 family GCE machine type):

- e2-highmem-4: scaling was needed (maxWorkers=3).
- e2-standard-4: we got good balance between scalability, latency and cost. Scaling was not needed.
- e2-highcpu-4: we got some OutOfMemory issues (restarting a worker), it did handle the load but with the worst performance.

Conclusions

A similar application to MediationService.scala has successfully been deployed on Dataflow in a real-world productive end to end pipeline and replaced:

  • A Spring microservice (on Openshift) that was reading notifications from Kafka and sending them to a HTTP endpoint.
  • An entire multi-node Hazelcast cluster (on Openshift) that was acting as low latency IMDG storage later for historical/sent notifications.
  • An additional hazelcast-manager microservice (on Openshift) that was managing an extra “storage logic layer” for the sent notifications (realtime) and mantaining historical (in a Kuberentes PVC or Kafka topics) in case of disaster recovery, region swapping or Openshift mainteance.

By using a:

  • New developed StateBaseAsyncDoFn.java integrated with a State & Timer pattern, bringing an alternative to keep the state instead of “brining your own cache” (Guava).
  • An abstraction to attach HTTP Clients, “bringing your own HTTP client”.
  • Some (optional) retry mechanism using ZIO.

Allowing:

  • High throughput for notification processing and low latency for reading/writing notifications by applying the State & Timer pattern.
  • Great hortizontal scalability.
  • High availaibility and robustness.
  • Huge cost saving, in terms of applications to be mantained, avoiding expensive infrastructure, deployments and operations.

Code

You can find the open source adaptation of the new real-world productive mediation-service here: https://github.com/albertols/scio-db/blob/cleanup-for-medium/src/main/scala/com.db.myproject/mediation/README.md

Future work

There is a lot more to cover, so more posts might come about soon regarding:

  • Loading historical_notifications within the State & Timer.
  • Brining more HTTP clients options and comparing performance among them.
  • More detailed stress tests.
  • Future similar features in Beam or SCIO core?

Thanks for reading to the end and see you soon!

Background

Here it comes, some inspiration and guidance for this project, coming mainly from previous Beam Summits:

- Intro to State and Timer (from Beam Summit 2019) https://www.youtube.com/watch?v=Q_v5Zsjuuzg
- Great intro to SCIO Beam (from Beam Summit 2021) https://github.com/iht/scio-scala-beam-summit
- State and Timer (from Beam Summit 2023)
https://beamsummit.org/sessions/2023/too-big-to-fail-a-beam-pattern-for-enriching-a-stream-using-state-and-timers/
- Unbreakable & Supercharged Beam Apps with Scala + ZIO (from Beam Summit 2023) https://beamsummit.org/sessions/2023/unbreakable-supercharged-beam-apps-with-scala-zio/

By these Beam Summit contributors:

Special kudos to Israel Herraiz for his active contribution in the Beam community, summits and encouraging me at some stages in this journey. And Dazbo (Darren Lester) for reviewing the post.

--

--

Alberto López Serna

Solution Arquitect | GCP Cloud Engineer at Deutsche Bank | Lecturer | Kafka, Spark, Apache Beam, Kubernetes | As much streaming as batch