Flink SQL in Practice: Create Your First Table in Confluent Cloud

CREATE TABLE by examples

Timo Walther
Confluent

--

Apache Flink® is a stream processor that enables scalable, low-latency data pipelines for event-driven architectures and real-time analytics. Flink SQL sits on top of this dataflow runtime for the look and feel of a database, while working with streams.

Confluent Cloud embeds Flink SQL into a complete and serverless solution. This blog series shows examples how to use Flink SQL in practice.

In Flink SQL, we work conceptually with tables that change over time. Traditional databases work similarly under the hood. In those systems, a table is often backed by a transaction log that serializes changes to be applied to physical storage.

Apache Kafka® can be viewed as a storage system for changelogs. Since Flink SQL is able to consume from those changelogs and produce new changes, Kafka topics are the default storage layer for Flink tables in Confluent Cloud.

Both Kafka and Flink come with many configuration knobs. Confluent Cloud aims to keep the number of configuration options low while still solving complex tasks.

The following examples show how to use the CREATE TABLE statement and covers which defaults the system adds for us.

Minimal table example

Let’s start with a very minimal table declaration:

CREATE TABLE t_minimal (s STRING);

Once the table has been created, we can run the following:

SHOW CREATE TABLE t_minimal;

This will reveal what the system has made out of our declaration:

CREATE TABLE `my_catalog`.`my_database`.`t_minimal` (
`s` VARCHAR(2147483647)
) DISTRIBUTED INTO 6 BUCKETS
WITH (
'changelog.mode' = 'append',
'connector' = 'confluent',
'kafka.cleanup-policy' = 'delete',
'kafka.max-message-size' = '2097164 bytes',
'kafka.retention.size' = '0 bytes',
'kafka.retention.time' = '7 d',
'scan.bounded.mode' = 'unbounded',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-registry'
)

In other words:

  • A Kafka topic t_minimal has been created in the Confluent environment my_catalog and Kafka cluster my_database
  • A schema (s STRING) has been stored in Schema Registry
  • Data in the topic is round-robin distributed into 6 Kafka partitions

The WITH clause can be read as follows:


-- Let Flink know that we use a Confluent-specific connector
'connector' = 'confluent',

-- Let Flink know that only insert-only changes are consumed and produced
'changelog.mode' = 'append',

-- Start from the earliest entry in the changelog
'scan.startup.mode' = 'earliest-offset',

-- Consume the changelog infinitely
'scan.bounded.mode' = 'unbounded',

-- Configure Kafka-specific options
'kafka.cleanup-policy' = 'delete',
'kafka.max-message-size' = '2097164 bytes',
'kafka.retention.size' = '0 bytes',
'kafka.retention.time' = '7 d',

-- All columns of the schema should be stored in Schema Registry
-- in the Apache Avro format
'value.format' = 'avro-registry'

However, there is more that we can visualize:

DESCRIBE EXTENDED t_minimal;

This will reveal that the system has also added an implicit column:

+-------------+----------------------------+----------+-----------------------------------------------------+---------+
| Column Name | Data Type | Nullable | Extras | Comment |
+-------------+----------------------------+----------+-----------------------------------------------------+---------+
| s | STRING | NULL | | |
| $rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | NOT NULL | METADATA VIRTUAL, WATERMARK AS `SOURCE_WATERMARK`() | SYSTEM |
+-------------+----------------------------+----------+-----------------------------------------------------+---------+

This means:

  • There is a metadata column called $rowtime
  • The column is virtual which means read-only and not part of the target table’s schema for an INSERT INTO statement
  • The column gives us access to the event timestamp that is stored in every Kafka message
  • $rowtime is a special time column because a watermark strategy has been defined as SOURCE_WATERMARK()
  • SOURCE_WATERMARK() is Confluent’s built-in algorithm for deriving watermarks for Flink

Primary key table example

In many cases, a table contains one or more columns that make a row identifiable and unique. In SQL, those columns make up the primary key.

CREATE TABLE t_pk (k INT PRIMARY KEY NOT ENFORCED, s STRING);

When working with changelogs, it’s often desirable that the primary key is interpreted as the upsert key. Thus, changes can be applied idempotently.

Once the table has been created, we can run the following:

SHOW CREATE TABLE t_pk;

This will reveal what the system has made out of our declaration this time:

CREATE TABLE `my_catalog`.`my_database`.`t_pk` (
`k` INT NOT NULL,
`s` VARCHAR(2147483647),
CONSTRAINT `PK_k` PRIMARY KEY (`k`) NOT ENFORCED
) DISTRIBUTED BY HASH(`k`) INTO 6 BUCKETS
WITH (
'changelog.mode' = 'upsert',
'connector' = 'confluent',
'kafka.cleanup-policy' = 'delete',
'kafka.max-message-size' = '2097164 bytes',
'kafka.retention.size' = '0 bytes',
'kafka.retention.time' = '7 d',
'key.format' = 'avro-registry',
'scan.bounded.mode' = 'unbounded',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-registry'
)

In other words:

  • A Kafka topic t_pk has been created
  • A primary key constraint has been added that is NOT ENFORCED, meaning it is still the responsibility of the user to ensure uniqueness
  • Data in the topic is distributed into 6 Kafka partitions
  • However, instead of round-robin distribution, data is distributed by hashing the primary key
  • A schema (k INT NOT NULL) has been stored in Schema Registry for the Kafka message key due to the DISTRIBUTED BY clause
  • A schema (s STRING) has been stored in Schema Registry for the Kafka message value

The WITH clause can be read as follows:

-- Let Flink know that updates can be consumed and produced
'changelog.mode' = 'upsert',

-- The key columns of the schema (in DISTRIBUTED BY) should
-- be stored in Schema Registry in the Apache Avro format
'key.format' = 'avro-registry',

-- The value columns of the schema (not in DISTRIBUTED BY) should
-- be stored in Schema Registry in the Apache Avro format
'value.format' = 'avro-registry'

This means:

  • DISTRIBUTED BY defines what lands in the Kafka message key and the key.format decides about its binary format.
  • The PRIMARY KEY defines that k must not be nullable. The SQL standard dictates that primary key columns must not be NULL and an implicit NOT NULL constraint will be added if necessary.

We can also run:

DESCRIBE EXTENDED t_pk

This time it will list not only the $rowtime system column, but also the primary key and bucket key for the distribution:

+-------------+----------------------------+----------+-----------------------------------------------------+---------+
| Column Name | Data Type | Nullable | Extras | Comment |
+-------------+----------------------------+----------+-----------------------------------------------------+---------+
| k | INT | NOT NULL | PRIMARY KEY, BUCKET KEY | |
| s | STRING | NULL | | |
| $rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | NOT NULL | METADATA VIRTUAL, WATERMARK AS `SOURCE_WATERMARK`() | SYSTEM |
+-------------+----------------------------+----------+-----------------------------------------------------+---------+

Summary

This article gave us some insight into the power of Flink SQL. In upcoming articles, we will dive deeper into individual concepts such as changelog modes, watermarks, or complex SQL queries.

The views expressed in this article are those of the author and do not necessarily reflect the position of Confluent.

--

--

Timo Walther
Confluent

Principal Software Engineer at Confluent and long-time committer in Apache Flink. Among the core architects of Flink SQL. Making stream processing for everyone.