Flink SQL in Practice: Create Your First Table in Confluent Cloud
CREATE TABLE by examples
Apache Flink® is a stream processor that enables scalable, low-latency data pipelines for event-driven architectures and real-time analytics. Flink SQL sits on top of this dataflow runtime for the look and feel of a database, while working with streams.
Confluent Cloud embeds Flink SQL into a complete and serverless solution. This blog series shows examples how to use Flink SQL in practice.
In Flink SQL, we work conceptually with tables that change over time. Traditional databases work similarly under the hood. In those systems, a table is often backed by a transaction log that serializes changes to be applied to physical storage.
Apache Kafka® can be viewed as a storage system for changelogs. Since Flink SQL is able to consume from those changelogs and produce new changes, Kafka topics are the default storage layer for Flink tables in Confluent Cloud.
Both Kafka and Flink come with many configuration knobs. Confluent Cloud aims to keep the number of configuration options low while still solving complex tasks.
The following examples show how to use the CREATE TABLE
statement and covers which defaults the system adds for us.
Minimal table example
Let’s start with a very minimal table declaration:
CREATE TABLE t_minimal (s STRING);
Once the table has been created, we can run the following:
SHOW CREATE TABLE t_minimal;
This will reveal what the system has made out of our declaration:
CREATE TABLE `my_catalog`.`my_database`.`t_minimal` (
`s` VARCHAR(2147483647)
) DISTRIBUTED INTO 6 BUCKETS
WITH (
'changelog.mode' = 'append',
'connector' = 'confluent',
'kafka.cleanup-policy' = 'delete',
'kafka.max-message-size' = '2097164 bytes',
'kafka.retention.size' = '0 bytes',
'kafka.retention.time' = '7 d',
'scan.bounded.mode' = 'unbounded',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-registry'
)
In other words:
- A Kafka topic
t_minimal
has been created in the Confluent environmentmy_catalog
and Kafka clustermy_database
- A schema
(s STRING)
has been stored in Schema Registry - Data in the topic is round-robin distributed into 6 Kafka partitions
The WITH
clause can be read as follows:
-- Let Flink know that we use a Confluent-specific connector
'connector' = 'confluent',
-- Let Flink know that only insert-only changes are consumed and produced
'changelog.mode' = 'append',
-- Start from the earliest entry in the changelog
'scan.startup.mode' = 'earliest-offset',
-- Consume the changelog infinitely
'scan.bounded.mode' = 'unbounded',
-- Configure Kafka-specific options
'kafka.cleanup-policy' = 'delete',
'kafka.max-message-size' = '2097164 bytes',
'kafka.retention.size' = '0 bytes',
'kafka.retention.time' = '7 d',
-- All columns of the schema should be stored in Schema Registry
-- in the Apache Avro format
'value.format' = 'avro-registry'
However, there is more that we can visualize:
DESCRIBE EXTENDED t_minimal;
This will reveal that the system has also added an implicit column:
+-------------+----------------------------+----------+-----------------------------------------------------+---------+
| Column Name | Data Type | Nullable | Extras | Comment |
+-------------+----------------------------+----------+-----------------------------------------------------+---------+
| s | STRING | NULL | | |
| $rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | NOT NULL | METADATA VIRTUAL, WATERMARK AS `SOURCE_WATERMARK`() | SYSTEM |
+-------------+----------------------------+----------+-----------------------------------------------------+---------+
This means:
- There is a metadata column called
$rowtime
- The column is virtual which means read-only and not part of the target table’s schema for an
INSERT INTO
statement - The column gives us access to the event timestamp that is stored in every Kafka message
$rowtime
is a special time column because a watermark strategy has been defined asSOURCE_WATERMARK()
SOURCE_WATERMARK()
is Confluent’s built-in algorithm for deriving watermarks for Flink
Primary key table example
In many cases, a table contains one or more columns that make a row identifiable and unique. In SQL, those columns make up the primary key.
CREATE TABLE t_pk (k INT PRIMARY KEY NOT ENFORCED, s STRING);
When working with changelogs, it’s often desirable that the primary key is interpreted as the upsert key. Thus, changes can be applied idempotently.
Once the table has been created, we can run the following:
SHOW CREATE TABLE t_pk;
This will reveal what the system has made out of our declaration this time:
CREATE TABLE `my_catalog`.`my_database`.`t_pk` (
`k` INT NOT NULL,
`s` VARCHAR(2147483647),
CONSTRAINT `PK_k` PRIMARY KEY (`k`) NOT ENFORCED
) DISTRIBUTED BY HASH(`k`) INTO 6 BUCKETS
WITH (
'changelog.mode' = 'upsert',
'connector' = 'confluent',
'kafka.cleanup-policy' = 'delete',
'kafka.max-message-size' = '2097164 bytes',
'kafka.retention.size' = '0 bytes',
'kafka.retention.time' = '7 d',
'key.format' = 'avro-registry',
'scan.bounded.mode' = 'unbounded',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-registry'
)
In other words:
- A Kafka topic
t_pk
has been created - A primary key constraint has been added that is
NOT ENFORCED
, meaning it is still the responsibility of the user to ensure uniqueness - Data in the topic is distributed into 6 Kafka partitions
- However, instead of round-robin distribution, data is distributed by hashing the primary key
- A schema
(k INT NOT NULL)
has been stored in Schema Registry for the Kafka message key due to theDISTRIBUTED BY
clause - A schema
(s STRING)
has been stored in Schema Registry for the Kafka message value
The WITH
clause can be read as follows:
-- Let Flink know that updates can be consumed and produced
'changelog.mode' = 'upsert',
-- The key columns of the schema (in DISTRIBUTED BY) should
-- be stored in Schema Registry in the Apache Avro format
'key.format' = 'avro-registry',
-- The value columns of the schema (not in DISTRIBUTED BY) should
-- be stored in Schema Registry in the Apache Avro format
'value.format' = 'avro-registry'
This means:
DISTRIBUTED BY
defines what lands in the Kafka message key and thekey.format
decides about its binary format.- The
PRIMARY KEY
defines thatk
must not be nullable. The SQL standard dictates that primary key columns must not beNULL
and an implicitNOT NULL
constraint will be added if necessary.
We can also run:
DESCRIBE EXTENDED t_pk
This time it will list not only the $rowtime
system column, but also the primary key and bucket key for the distribution:
+-------------+----------------------------+----------+-----------------------------------------------------+---------+
| Column Name | Data Type | Nullable | Extras | Comment |
+-------------+----------------------------+----------+-----------------------------------------------------+---------+
| k | INT | NOT NULL | PRIMARY KEY, BUCKET KEY | |
| s | STRING | NULL | | |
| $rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | NOT NULL | METADATA VIRTUAL, WATERMARK AS `SOURCE_WATERMARK`() | SYSTEM |
+-------------+----------------------------+----------+-----------------------------------------------------+---------+
Summary
This article gave us some insight into the power of Flink SQL. In upcoming articles, we will dive deeper into individual concepts such as changelog modes, watermarks, or complex SQL queries.
The views expressed in this article are those of the author and do not necessarily reflect the position of Confluent.