Calculate Real Time Daily GMV With Apache Beam

Veli Can Ünal
Trendyol Tech
Published in
7 min readJan 6, 2023

What is Apache Beam?

Apache Beam is unified model for defining both batch and streaming data pipelines. It is open source, you can easily contribute to development of Apache Beam.You can define lots of activities in the Apache Beam pipeline: data addition, cleaning , transformation any many more. The interface of Apache Beam is simple and easy to understand. One of the main goals of this unified model is to separate the definition of the pipeline from its execution.

When you define a pipeline in Apache Beam, you can use several programming languages . It supports multiple programming SDKs to define your pipeline such as Python,Scala,Java and Go. You can execute your pipeline using different platforms as well : The direct runner of Apache Beam, Flink, Spark, Cloud Dataflow,The apache beam api was originally written to support Google Cloud Dataflow.Runners can give different level of support for the Apache Beam. You have to read the documentation carefully to see what is supported. You can check support document from this link .

What is Pipeline?

We can define pipeline as several steps of data transformation. When we get the data from source, we will modify data according to our purpose. After several transformations, our data will be written to sink. Sink can be any storage that you can put your data.

What is Pcollection?

A Pcollection is defined as a distributed data set in the our pipeline. You can easily create Pcollection by other operations in the pipeline. Apache Beam is working as an embarrassingly parallel model. When the pipeline is executed on, the execution engine takes care of managing these Pcollections by using processing nodes paralelly.A Pcollection can be used as not only inputs but also outputs for data transformations in our pipeline.

A PCollection does not have to have a fixed size. The Pcollections are defined as unbounded, unlimited collections.A PCollection can only used for current pipeline. We are not able to use it for several pipelines. It belongs to just one pipeline.

What is transform?

An apache beam transform is just a processing step in our pipeline. While doing transform operation, you are taking at least one PCollection as an input and send at least one Pcollection as an output.All types of aggregations, joining and grouping data can be considered as transform. Since the transforms are defined in Apache Beam. You can easily apply for your application.

What is Runner?

Apache Beam Pipelines can run on several processing platforms. The role of a runner is executing the Apache Beam pipelined that you defined. At the background, Apache Beam handles how runner execute the pipeline specifically.

Since you are able to use several platform without changing your code, we can say Apache Beam has easy-to-use development environment and portability without any hesitation. Additionally, by using native runner of Apache Beam, you can do your development on your local machine. This native runner is called Direct runner. When you create your model, you can deploy it to any runner you want suc as Google Cloud Dataflow, Flink runner or Spark Runner.Flink Runner is executing your pipeline on Apache Flink clusters. Spark Runner is executing your pipeline on Apache Spark clusters.

Apache Beam Window Types

Windowing is the group operation that you apply on real time data by creating finite sets. You can convert unbounded data by defining mini batches and transfrom this mini batch data. Windowing is one of the most important concept of real time data stream. Let’s take a look from Apache Beam perspective.

In the concept of Apache Beam, every window becomes a PCollection. You may think that Pcollection is unbounded data set, however we limit its size by using windows.

When new data arrives to our pipeline, we will apply our corresponded windowing strategy to that new data as well. Apache beam has severeal windowing strategies. Lets check the windowing types!

  • Global Time Windows
    A global window is used to calculate global cumulative metrics in the defined periodic intervals. In global window, the data is accumulated from the start of your pipeline.
  • Fixed Time Windows
    In fixed time windows, we are defining fixed intervals. When a fixed windows duration is completed, a new one will start. They are not overlapping each other. In our example, you can see 6 hours fixed windows.
  • Sliding Time Windows
    The Sliding time windows are very similar to fixed windows. Unlike fixed windows, they are overlapping each other. There can be multiple windows in the slide windowing process. By using sliding time windows, you can aggregate your order count for the last hour. In our example you can see 6 hours windows which are sliding every 2 hours.
  • Per-session Windows
    For session window, we have to define a session duration. Through session duration, we operate the new coming data. When we cannot get any data through the session duration that we define, the new data will be processed automatically in the new window. Lets say you defined your session limit as 3 hours. If you dont receive data more than 3 hours you will create a new window when you get a transaction for the first time after break. Session duration can be considered “break time duration” basically
  • Calendar Window
    This window type is overlooked in almost every Apache Beam document. The logic of calender window is creating fixed window that renew itself when the termination date has come. Until the termination time, we will processing all data inside the current date. It is very useful for determined dates. For instance, if you create a daily calendar window and start your application at 14:00, you will have a new window after 00:00. It is very similar to fixed time window. However, in fixed time window, you have to create youw window at 00:00 if you want to make daily calculations. Since we need daily calculations, we will use the calendar window in our project :)

Project Architecture & Details

Real Time Daily Gross Merchandise Volume Calculation

In Trendyol, we are providing important metrics from our dashboards by using batch processing system. Since data freshness is crucial for important decisions, we want to try real time streaming processing with Apache Beam. We start with calculating daily Gross Merchandise Volume.We used calendar window for that operation. First you have to define your time zone. Turkey is using Europe/Moscow time zone at the moment. We are taking data from Kafka topics. We map the data that we retrieve from kafka topics for processing more easily.We apply some filtering operations for cleaning our data. As you can see it is very easy operation when you define your function . After that we aggregate all data inside the daily calendar window. It is ready to write in Firebase. The system is triggering a function writing our data into Firebase for every 10 seconds. Our Firebase integrated dashboard shows real time data to our stakeholders

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(false);
DateTimeZone timeZone = DateTimeZone.forID("Europe/Moscow");

Pipeline pipeline = Pipeline.create(options);

pipeline.apply("Read messages from Kafka",
KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrapServer())
.withConsumerConfigUpdates(ImmutableMap.of("group.id","streamers"))
.withTopic(options.getInputTopic())
.withReadCommitted()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
)
.apply("Get message contents", Values.<String>create())
.apply("Log messages", MapElements.into(TypeDescriptor.of(String.class))
.via(message -> {
return message;
}))
.apply("Parse JSON", MapElements.into(TypeDescriptor.of(Orders.class))
.via(message -> GSON.fromJson(message, Orders.class)))
.apply("Filter orders", Filter.by((Orders order) -> (order.getOrderStatus().getName().equals("Created") ) ))
.apply("CalendarWindows", Window.<Orders>into(CalendarWindows.days(1).withTimeZone(timeZone))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
.accumulatingFiredPanes().withAllowedLateness(Duration.standardSeconds(0))

)
.apply("Agg Ops", new ExtractAndSumTotalPrice())
.apply("Trigger", MapElements.via(new triggerCloudFunction()));

We live in a time that changes rapidly and making instant decisions is vital.Batch processing systems will be very slow to make a sense for adapting our needs in near future.Real time data processing can create more sensitive tracking system, effective marketing strategies and better customer services.We can use the real time data processing tools to enhance data freshness such as Apache Beam, KSQL.

Lets follow the latest improvements on real time data processing!

If you have any questions or suggestions, please don’t hesitate to send a message.

Don’t delay the moment!

--

--