Streaming Data Platform at Exness: Flink SQL and PyFlink

Aleksei Perminov
Exness Tech Blog
Published in
7 min readMar 10, 2024

Co-authors: Gleb Shipilov, Ilya Soin, Yury Smirnov

Read more on Streaming Data Platform at Exness:

  1. Overview by Gleb Shipilov
  2. Flink SQL and PyFlink by Aleksei Perminov (you are here!)
  3. Deployment Process by Ilya Soin
  4. Monitoring and Alerting by Yuri Smirnov

Flink SQL

The ability to use SQL as the main way of declaring streaming jobs is one of the crucial requirements for our platform. We wanted to create an environment for people who know little to nothing about Java or any other languages except SQL. Flink with its powerful SQL syntax and Table API fulfilled our needs.

There are four common steps in the application development process:

  1. Development;
  2. Testing;
  3. Deployment;
  4. Monitoring.

We needed to make those steps as simple as possible.

Development

The main idea behind our platform solution is that customers should be able to develop streaming applications using only SQL. No Java, no Python, just Flink SQL. To address this, we created the Flink SQL Runner. Essentially, it is a Flink application that loads a Flink SQL query from an external source and executes it in a Table Environment. Overall, it is a very straightforward solution, and you can find a basic example here (keep in mind that even though the Statement Set support is mentioned in this example, it does not work as it should).

There were a few things that we needed to cover in addition to the basic implementation:

  1. Declaration of Flink tables and views;
  2. Easy User-Defined Functions (UDF) import;
  3. Statement Sets support;
  4. Convenient parameterization of the DDL/DML scripts.
Flink SQL Runner workflow

While considering the optimal method for defining tables and views, we devised the following solution: users define all the tables and views within a specific directory in the GIT repository (each team has its own repository), and these DDLs are then mounted to the Flink Cluster Pods as a ConfigMap. When a Flink SQL job is deployed to the Flink Cluster, Flink SQL Runner scans the designated directory in the Pod where all the DDLs are mounted and proceeds to create them in the newly established Table Environment. It is also possible to create catalogs in this environment, the process is identical.

One of the primary challenges encountered is the creation of views. When creating a view, it is essential to ensure that all dependent objects are created prior to the view itself. To address this issue, we require users to incorporate numeration in the names, allowing Flink SQL Runner to sort all DDLs and create them in the specified sequence.

How DDL names look:

  • 001-kafka_table_a.sql
  • 002-kafka_table_b.sql
  • 003-view_tables_a_b.sql
Flink SQL Tables Catalog delivery to K8S cluster

Statement Sets and UDF support were implemented purely using syntax analysis, i.e. we check all the BEGIN STATEMENT SET; … END; and IMPORT …; statements and perform the corresponding operations in the Table Environment.

Example of the syntax we use in our SQL jobs:

IMPORT com.exness.flink.sql.CutomUDF;


BEGIN STATEMENT SET;


INSERT INTO table_a
SELECT CustomUDF(val) FROM table_b WHERE flag = 0;


INSERT INTO table_c
SELECT CustomUDF(val) FROM table_b WHERE flag != 0;


END;

The parameterization was carried out using org.apache.commons.lang3.text.StrSubstitutor. Therefore, whenever you wish to pass specific parameters to an SQL script or a DDL, you can use the following syntax:

CREATE TABLE kafka_table_a
(

)
WITH (
'connector' = 'kafka',
'topic' = '${kafka.topic}',
'properties.bootstrap.servers' = '${kafka.bootstrap.servers}',
'properties.group.id' = '${kafka.group.id}'

)

This approach not only allows for passing sensitive information to the DDLs, but also to template the DDLs and simplifies the creation of routine jobs, such as one-to-one replication.

Parametrization can be used inside the SQL scripts as well:


INSERT INTO table_a
SELECT CustomUDF(val) FROM ${table.name:-default_table_name}

Testing

We also wanted to give our users the ability to test their SQL jobs in the most convenient way possible. To achieve this, we have created a Flink SQL Testing Framework that can execute all the declared Flink SQL tasks, utilizing the provided test data.

Brief description of how the Flink SQL Testing Framework works:

  1. It reads all the tables from the ./catalog directory and creates them in the dev database.
  2. Afterward, it creates a test database and mocks all the created tables using LIKE syntax
  3. All the mocked tables are configured with a filesystem connector and JSON format, enabling users to easily create test data using JSON files.
  4. Once all the tables and views are created, the Framework begins reading the ./sql-tasks directory, where all the Flink SQL scripts are stored.
  5. Framework expects that each Flink SQL script than needs to be tested will have a corresponding directory in the GIT repository that contains the mock data in JSON format (for all sources and targets).
  6. All used user-defined functions (UDFs) should also be mocked.
  7. The Flink SQL Testing Framework executes all the scripts that have associated test data and verifies whether the output matches the data that the user expects.
Flink SQL Testing Framework workflow

Example of the project working tree

.
├── catalog
│ ├── 001-table_a.sql
│ ├── 002-table_b.sql
│ ├── 003-table_c.sql
├── sql-tasks
│ ├── 001-task-for-tests.sql
│ └── 002-task-with-udf.sql
└── sql-tests
├── 001-task-for-tests.sql-case1
│ ├── table_a.json
│ ├── table_b.json
│ └── table_c.out.json
├── 001-task-for-tests.sql-case2
│ ├── table_a.json
│ ├── table_b.json
│ └── table_c.out.json
└── 002-task-with-udf.sql
├── CustomUDF.tableudf.json
├── table_a.json
└── table_b.out.json

As you can see, it is possible to provide several cases for the same script by using ‘-<case_name>’ suffix in the name of the directory. Output mock files also should be marked with ‘out’ keyword in the name, since the same table can be used as an input and output in some cases.

There are a few key things that we would like to highlight about the framework:

  1. All the tests are run in the BATCH mode, so all the watermark configs will be removed during the test stage;
  2. The filesystem connector does not support TEMPORAL JOINS, so our framework converts all temporal joins to regular ones;

Previously we have mentioned that UDFs can be mocked as well. To facilitate this, our framework analyzes all UDF imports and generates mocks for each detected UDF. These mocks are just implementations of the specific UDF classes. Similar to table mocks, UDF mock files must be named precisely after their corresponding UDFs with the addition of the UDF type in the name, e.g. tableudf, scalarudf etc. However, the format of these mock files is a bit different from the table mock files, as it is necessary to establish a relationship between the input parameters and the resulting output data.

For example, we have a UDF that gets two string parameters and returns two boolean values. Here is how the UDF mock file will look like:

[
{
"input": [
"str1",
"str2"
],
"output": [true, true]
},
{
"input": [
"str3",
"str4"
],
"output": [true, true]
}
]

The framework is written in Java, and we simply add the execution of this framework to the platform’s Git repository pipeline. We utilize the Git submodule functionality to distribute this framework across all our platform repositories. It is also possible to use it in a more traditional way as a Docker image.

PyFlink

SQL is a very convenient way of creating simple Flink jobs, but when it comes to complex logic, state manipulation, and session joins, SQL is not sufficient. The DataStream API is the best way to address these problems. However, unfortunately, there is little to no experience with Java among our platform users. Since Python is one of the most widely used programming languages, and in our company, it is the most popular one, we needed to provide our users with access to PyFlink. We attempted to use the built-in PythonDriver but encountered one major issue: how to deliver the Python scripts to the Flink Cluster so that the PythonDriver can execute them.

There were three possible solutions to the problem:

  1. Add all the necessary Python scripts to the Flink image;
  2. Mount scripts via ConfigMap;
  3. Use external storage, such as S3.

All those solutions had their advantages and disadvantages. In the case of the first solution, we needed to rebuild the image every time we updated the Python job. If we wanted to run the job in Standalone (Application) mode (one cluster — one job), this solution worked nicely. However, we aimed to run our jobs in Session mode (one cluster — multiple jobs) because most of our jobs do not need that level of isolation.

The second solution has one major disadvantage: it is impossible to divide the Python job into several packages because of Kubernetes ConfigMap limitations. It is possible to create a ConfigMap for each package and mount them, but this would mean that we would need to restart the Flink Cluster each time we need to mount a new ConfigMap, which would make the entire deployment pipeline too complicated.

The third solution worked for us just fine, with one exception: it is not a very good solution when we need to version our Python jobs. Of course, it is possible to create a custom CI/CD pipeline that will create a new folder each time we publish a new version of the Python job.

But why develop a custom pipeline if we already have a PyPi repository? We decided to fork PythonDriver and add the capability to handle HTTP/HTTPS protocols so that we can deploy Python jobs from PyPi just like we deploy our Java jobs from the Nexus repository.

Keep in mind that whenever you deploy the Python job in Standalone mode, Flink attempts to analyze the arguments that you pass. If it detects any Python flag in the program arguments, it will ignore the jarUri that you have set and will instead set the default PythonDriver. Whenever you fork PythonDriver, ensure that you either use different flag names or add additional flags for Standalone mode.

Explore our other articles about:

--

--