Streaming Data Platform at Exness: Deployment Process

Ilya Soin
Exness Tech Blog
Published in
5 min readMar 10, 2024

Co-authors: Gleb Shipilov, Aleksei Perminov, Yury Smirnov

Read more on Streaming Data Platform at Exness:

  1. Overview by Gleb Shipilov
  2. Flink SQL and PyFlink by Aleksei Perminov
  3. Deployment Process by Ilya Soin (you are here!)
  4. Monitoring and Alerting by Yuri Smirnov

Every team that uses our platform has a group of projects in GitLab.

Each group may contain one or more projects:

  • one for Python jobs;
  • one for Java jobs;
  • and one for Flink SQL jobs and Terraform configurations (which we’ll refer to as the Terraform project henceforth).

The Terraform project contains a collection of modules that define the following resources:

  • Kafka topic;
  • Kafka Connect Source / Sink connector;
  • Flink Session Job (Java, Python or SQL);
  • Flink Standalone Job (Java, Python or SQL);
  • Flink Data Replication Job, which is a templated SQL job.

Each module is essentially a Custom Resource Manifest, which describes a Custom Resource (CR) — an extension to the K8S API.

Every component in our K8S cluster is managed by a Kubernetes Operator (except for Kafka itself, which is managed by the PaaS team). Kubernetes Operator is a way to automate the deployment and management of applications on Kubernetes. It extends the Kubernetes API to manage complex stateful applications. Each operator manages Custom Resources in Kubernetes, which allows users to declare the desired state of their Flink and Kafka resources, such as topics, Kafka Connect Connectors, Flink Session Jobs, etc. To deploy those CRs on the cluster via Terraform, we use the Kubectl provider.

Deploying Kafka Connect connectors

The lifecycle of Kafka Connect connectors is managed by Strimzi Kafka K8S Operator. To add a new source or sink connector, users need to define a new Terraform module and deploy it via a GitLab pipeline.

Example of a Terraform module with a Kafka Connect Sink job:

module "example-sink-vertica" {
source = "./templates/kafka-connect-sink-vertica"
module_enabled = true
name = "sink-name"
topics = "source-topic"
vertica_target = local.vertica_rw
cluster = local.cluster_di
task_config = {
"vc.table" = "target_table_name"
"insert.mode" = "insert"
"upsert.timestamp.fields" = "updated_at"
...
}
}

Deploying Flink Jobs

The lifecycle of Flink jobs is managed by Flink K8S Operator. The deployment of a Flink Job varies depending on whether it’s an SQL job or a Python/Java job.

Deploying SQL jobs

As mentioned earlier, all Flink SQL tasks use the same Flink SQL Runner jar file, so the user only needs to define DDLs and the SQL script, and create a new Terraform module that describes their job. The only thing that users need to define in this new module is the name of the SQL script they want to run and the arguments for the job.

Example of the Terraform module with Flink SQL job:

module "flink-sql-job" {
source = "./modules/flink/sql-job"
enabled = true
job_name = "flink-sql-job.sql"
parallelism = 1
state = "running"
upgrade_mode = "savepoint"
args = {
"arg1" = 1
"arg2" = "val2"
"arg3" = "val3"
"arg4" = "4"
}
}

After the job is defined, it will be deployed to the Kubernetes cluster by Terraform as a FlinkSessionJob manifest. ConfigMap with Flink catalog will be deployed to the Kubernetes cluster as well.

Deploying Python and Java jobs

Since we often run multiple Flink Jobs on the same Flink Cluster (Session mode), the process of deploying a Java or Python job involves 2 steps:

  1. Building an artifact and deploying it to an artifact registry;
  2. Creating a new Terraform module that describes a new SessionJob and references the new artifact.

Example of the PyFlink job in Terraform:

module "pyflink-job" {
source = "./modules/flink/python-job"
enabled = true
job_name = "pyflink-job"
parallelism = 1
py_module = "pyflink.job"
py_archive_uri = "http://pypi/packages/pyflink/1.0.1/pyflink-1.0.1.tar.gz"
state = "running"
upgrade_mode = "savepoint"
args = {
"arg1" = 1
"arg2" = "val2"
"arg3" = "val3"
"arg4" = "4"
}
}

Deploying Standalone jobs

Our platform also supports standalone deployment of any Flink Job. This is especially useful when users want to have a custom image or aim for maximum isolation of job resources. To support this, we use FlinkDeployment Custom Resource.


module "standalone-job" {
source = "./modules/flink/standalone-job"
enabled = true
job_name = "standalone-job"
state = "running"
job_type = "python"
py_archive_uri = "http://pypi/packages/standalone-job/1.0.2/standalone_job-1.0.2.tar.gz"
py_module = "standalone.job"
parallelism = "1"
args = {
"source.topics" = "topic1,topic2"
"source.bootstrap.servers" = "..."
...
}
checkpoint_config = {
"interval" = "2 min"
"timeout" = "2 min"
"s3_path" = "s3a://bucket/flink/standalone-job"
}
tm_resource = {
"cpu" = local.is_tst ? "1" : "2"
"memory" = local.is_tst ? "4G" : "8G"
}
flink_image = "registry/flink-image:210224-02"
}

Abstracting away common configs

Most Custom Resources and common parameters are abstracted away from the end user: they only have to provide the necessary module parameters to make their jobs run. To achieve this, we have a separate GitLab project with common modules and rely on Terraform module composition to plug in parameters. For example, to successfully run a Standalone Job, several manifests have to be declared:

  • FlinkDeployment;
  • Ingress;
  • Service;
  • ServiceMonitor.

These declarations are hidden away from users in a common module called `standalone-job`. They reference this module from their code like this:

module "flink-standalone-job" {
source = "git::https://…/sdp-terraform-modules.git//flink/standalone-job?ref=master"
enabled = var.enabled
job_name = var.job_name
parallelism = var.parallelism
...
}

And plug in the parameters from their module, like in the example from the previous section.

To deploy changes via Terraform, users need to run a GitLab pipeline which generates a new Terraform plan and applies it, deploying changes to the K8S cluster. Once Custom Resources are deployed, an Operator will read them and perform necessary actions. For example, if a new CR with a Kafka Connect Source Connector is deployed, the Strimzi K8S Operator will “see” the new CR and create a new task on the KC cluster. If a CR with a new Flink Session Job is deployed, the Flink K8S Operator will submit that new job to the cluster.

Explore our other articles about:

--

--