Consuming RSS feeds from Flink SQL

Nora Almasi
Cloudera
Published in
5 min readOct 5, 2021
Photo by Rishabh Sharma on Unsplash

Data surrounds us in every aspect of our life, thus effective data processing is of utmost importance. When it comes to real-time time-series data, web server events, trades from a stock exchange or sensor readings of any device, the unbounded nature of data brings up new challenges.

Apache Flink is an open-source stream processing framework that is a powerful choice when it comes to the processing of unbounded real-time streams. It offers a high-throughput low-latency solution with advanced event-time semantics and support for watermarking. Instead of supplying an own data-storage system, data management is solved by providing pre-defined data-source and sink connectors to systems like Apache Kafka or HDFS. Not only built-in table sources and sinks but user-defined connectors are supported too. This post presents a custom connector for RSS sources.

User-defined sources in Flink

Flink uses the logical concept of dynamic tables, for which the data is loaded from external systems. Dynamic sources and sinks can be used to read and write data from and into them. The term connector stands for any of these sinks or sources. Connectors can be used with tables by defining the connector type and other attributes in CREATE TABLE statements.

Flink program structure

RSS connector

Flink RSS connector is a connector that facilitates the processing of RSS feeds as data streams. An example query is shown below.

CREATE TABLE news (
`title` STRING,
`description` STRING
) WITH (
'connector' = 'rss-connector',
'uri' =
'https://rss.nytimes.com/services/xml/rss/nyt/Europe.xml,'
'https://rss.nytimes.com/services/xml/rss/nyt/US.xml,'
'https://rss.nytimes.com/services/xml/rss/nyt/Africa.xml',
'refresh-interval' = '5000',
'format' = 'xml'
);
​SELECT * FROM news;

Though the usage of the connector is quite intuitive, some explanation follows. The connector rss-connector is always to be used with the xml format, as it is the standard RSS format. The selectable table field names correspond to the standard RSS <item> tag element names, while field types are always strings. Field names not being present in the XML item will leave the corresponding row field empty. The refresh-interval is an optional attribute that specifies the query interval in milliseconds. Its default value is set for 10 minutes. The query of multiple RSS URIs is also supported, these must be added with commas as separators.

Implementation

The RSS connector implementation follows the general structure of custom sources presented in the Flink documentation. The classes are structured into three levels, metadata, planning and runtime, as shown below.

Class structure

The two main components to realize are the RSS connector itself, and the user-defined XML format which decodes keys and values. Both are realized through 3 main classes.

At the planning level, FlinkRssConnectorTableSourceFactory class is responsible for creating a table source with validated parameters. It declares URI and format as required options, and refresh interval as an optional one. It also sets the connector name as rss-connector. The class FlinkRssConnectorTableSource is on the planning level too, and is responsible for planning and optimization of the table program. Finally, FlinkRssConnectirSourceFunction is on the runtime level, specifying the exact behavior of the connector. The open() function solves the problem of dividing the given URIs among the threads evenly. Then, the run() function defines the main behavior. It obtains the response for the given RSS URI and builds an XML document object of its content. The acquired <item> elements contain the content of the table rows, which is acquired by the deserializer. Finally, these rows are collected by the source context.

The other main component is the user-defined format. XmlFormatFactory is the class defining the format name and options. The other class at the planning level is XmlFormat, the runtime class is XmlDeserializer. This solves the problem of transforming the message of bytes into table rows. In our case, it means creating an XML object of the message and reading the field values according to the current table schema.

Design decisions

In this section, I describe some design decisions which had to be made during the development process.

First, there was the question of the refresh intervals. As the connector may get various RSS feeds as an input, the optimal time gap between refreshes may vary. Thus, instead of hard-coding a general value, the refresh interval may be set by the user according to the usage.

Another question was how to use the deserializer and the XML format. As mentioned before, the RSS connector must be used with the XML format, no other format is suitable. The reason for that is that RSS feeds are always given in XML format. However it may seem unnecessary to define a custom format for this use case, but the main goal of this project was to show an example of a custom connector realizing the common class structure, including classes of a custom format.

The most interesting question was filtering table rows so that no duplicates are collected. For unbounded streams, it turned out to be a way harder task, than it is for bounded tables. The data structure of the Bloom filter offered a solution by realizing lookups in constant time. The Bloom filter is a probabilistic data structure that is used to test whether an element is a member of a set. Thus, as the items are added to the filter, the information of whether a new item is already in the table may be acquired, where the correctness of the answer has a high probability. In our case, this probability is set to 0.01%, which means that the rows are added to the filter, and we can expect that in 10000 inserted rows we would get only one duplicate. We decided to work with two Bloom filters which have 10000 expected insertions. When one is halfway full, we start filling the other one too, and vice versa. If one gets full, empty it. Thus the solution may be used with an arbitrary amount of RSS items, it will not fail with a greater amount either.

Finally, a question of polling several URIs arose too. This means to support the collecting of multiple RSS feeds into one data stream. In this solution, the given URIs are being divided evenly among the Flink subtasks. This provides efficient polling among RSS feeds.

RSS connector and SQL Stream Builder

The implemented module works together with Cloudera's SQL Stream Builder. We could try it after copying the rss-connector jar on the used cluster and editing some CSD dependency scripts to make the jar recognized. A template was also added to SQLIO to make writing queries in SSB Console easier. After these, it became possible to create tables with the RSS connector. An example query result is shown below.

Example query with SSB

--

--