Accessing RisingWave Data in PostgreSQL

RisingWave Labs
ILLUMINATION
Published in
8 min readJul 4, 2024
photo by author

As a streaming database, RisingWave often functions as a streaming engine within the big data ecosystem. It reads data from various sources and writes it to multiple destinations. During this process, RisingWave handles data cleaning, transformation, and aggregation, ultimately producing data results.

RisingWave offers a variety of methods for users to interact with it and retrieve computational results. So, what’s the best way to interact with RisingWave? There’s no one-size-fits-all answer because different users have different needs.

In this blog, we’ll explore and introduce the two common ways to interact with RisingWave, along with their pros and cons, to help you find the best method for your needs. Additionally, we will introduce a new method for PostgreSQL users to interact with RisingWave.

Existing options to interact with RisingWave

Query directly in RisingWave

In RisingWave, streaming jobs are represented as materialized views (MVs). These MVs store all computation results directly in the database and are updated with the results of streaming jobs. You can interactively retrieve these results with a simple query:

SELECT * FROM {target_mv};

This method works like querying a traditional database. You can also use various Java or Python APIs to programmatically access these MVs. This approach is straightforward and easy to implement. However, it requires an additional client for each query, making it less ideal for scenarios needing frequent result retrieval.

Deliver data to a destination and query from there

RisingWave supports delivering computation results to various destinations using sinks, such as Redis, S3, Kafka, and PostgreSQL. You can then retrieve these results through the APIs of these destinations. This method leverages your existing serving systems in your production environment, which can be a big advantage if you already have a suitable system in place.

Once a sink is configured, RisingWave automatically outputs computation results to it. However, if you don’t have an existing serving system, you’ll need to deploy an additional sink system. This adds complexity to the overall architecture.

Access RisingWave data in PostgreSQL

PostgreSQL is a powerful and flexible open-source relational database management system. It’s known for its excellent performance and reliability. One of its standout features is the Foreign Data Wrapper (FDW), which lets PostgreSQL connect to external data sources like databases, files, and APIs. This feature allows PostgreSQL to access external data as if it were stored locally. Since version 9.3, the postgres_fdw extension has been included as a standard module in PostgreSQL, enabling seamless access to other PostgreSQL instances.

RisingWave, a part of the PostgreSQL ecosystem, supports PostgreSQL’s FDW starting from version 1.9.0 (documentation). With this support, users can directly access RisingWave’s computation results as foreign tables in PostgreSQL using the postgres_fdw extension. This means you can perform operations like aggregation, selection, and joining on these foreign tables, just as you would on regular PostgreSQL tables. This integration eliminates the need for additional clients or sink systems to handle computation results, providing a seamless way for RisingWave and PostgreSQL to interact.

Comparisons using an example

Let’s dive into an example in the E-commerce field, which is featured in another blog. We’ll compare different methods for calculating the total payment amount of orders over different time periods. In this case, the raw data is stored in the PostgreSQL database, and we’ll utilize RisingWave’s postgres-cdc feature to read and process the data.

To get started, let’s import the data from PostgreSQL into RisingWave using the postgres-cdc connector. Here's an example of how it can be achieved:

--- Run in RisingWave
CREATE TABLE pg_orders (
o_orderkey BIGINT,
o_custkey INTEGER,
o_totalprice NUMERIC,
o_orderdate TIMESTAMP WITH TIME ZONE,
...
PRIMARY KEY (o_orderkey)
) WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'postgresuser',
password = 'postgrespw',
database.name = 'mydb',
schema.name = 'public',
table.name = 'orders'
);

Next, we can proceed to create three materialized views (MVs) that will help us calculate the total payment amount of orders at different time granularities: minute-level, hour-level, and day-level.

--- Run in RisingWave
--- min level
CREATE MATERIALIZED VIEW orders_total_price_per_min AS
SELECT date_trunc('minute', o_orderdate) minute, SUM(o_totalprice) totalprice
FROM pg_orders
GROUP BY date_trunc('minute', o_orderdate);

-- hour level
CREATE MATERIALIZED VIEW orders_total_price_per_hour AS
SELECT date_trunc('hour', MINUTE) hour, SUM(totalprice) totalprice
FROM orders_total_price_per_min
GROUP BY date_trunc('hour', minute);

-- day level
CREATE MATERIALIZED VIEW orders_total_price_per_day AS
SELECT date_trunc('day', hour) date, SUM(totalprice) totalprice
FROM orders_total_price_per_hour
GROUP BY date_trunc('day', hour);

We now have three materialized views (MVs) at the minute, hour, and day levels, and they are updated in real-time. When we need to calculate the total revenue for the past seven days, we have three options to choose from.

Query directly in RisingWave

The first approach is to simply log in to RisingWave using a client and execute the following query to obtain the desired results:

--- Run in RisingWave
SELECT SUM(totalprice)
FROM orders_total_price_per_day
WHERE date BETWEEN date_trunc('day', NOW() - INTERVAL '7 days') AND date_trunc('day', NOW());
------
sum
---------------
1725458400.05

The entire process can be visually represented by the following diagram:

Deliver data with a sink and query from there

To deliver data using RisingWave’s sink and query it from there, you first need to create a table in PostgreSQL where the sink connector will write the data. Here’s an example of how you can create this table:

--- Run in PostgreSQL
CREATE TABLE orders_total_price_per_day (
date timestamptz primary key,
totalprice numeric
);

Next, in RisingWave, you can use the following statement to create a sink. This sink will ensure real-time synchronization of data from the orders_total_price_per_day materialized view in RisingWave to the orders_total_price_per_day table in PostgreSQL:

--- Run in RisingWave
CREATE SINK orders_total_price_per_day
FROM orders_total_price_per_day
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://127.0.0.1:5432/mydb?user=postgresuser&password=postgrespw',
table.name = 'orders_total_price_per_day',
type = 'upsert',
primary_key = 'date'
);

Then, you can directly query in PostgreSQL:

--- Run in PostgreSQL
SELECT SUM(totalprice) AS total_price, DATE_TRUNC('day', NOW()) AS day
FROM orders_total_price_per_day
WHERE date BETWEEN DATE_TRUNC('day', NOW() - INTERVAL '7 days') AND DATE_TRUNC('day', NOW());
------
sum | day
---------------+---------------------
1725458400.05 | 2024-04-20 00:00:00

Additionally, you also have the option to create a sink directly in RisingWave to write the query results into PostgreSQL.

--- Run in RisingWave
CREATE SINK orders_total_price_in_7_days
AS
SELECT SUM(totalprice) AS total_price, DATE_TRUNC('day', NOW()) AS day
FROM orders_total_price_per_day
WHERE date BETWEEN DATE_TRUNC('day', NOW() - INTERVAL '7 days') AND DATE_TRUNC('day', NOW())
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://127.0.0.1:5432/mydb?user=postgresuser&password=postgrespw',
table.name = 'orders_total_price_in_7_days',
type = 'upsert',
primary_key = 'day'
);

The entire process can be visually represented by the following diagram:

Access RisingWave data in PostgreSQL via foreign data wrapper

The third method, which is the main focus of this blog, involves leveraging PostgreSQL’s Foreign Data Wrapper (FDW) feature. By utilizing postgres_fdw, you can access and query data from another PostgreSQL database seamlessly. To make use of this feature, you'll need to perform a few setup steps and create a foreign table in PostgreSQL. This will enable you to access and query the data effortlessly.

--- Run in PostgreSQL
--- install the `postgres_fdw` extension
CREATE EXTENSION postgres_fdw;

--- Fill with your RisingWave server information, name the foreign server as `risingwave`
CREATE SERVER risingwave
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host '127.0.0.1', port '4566', dbname 'dev');

--- Create a user mapping for the foreign server, mapping the RisingWave's user `root` to the PostgreSQL's user `postgresuser`
CREATE USER MAPPING FOR postgresuser
SERVER risingwave
OPTIONS (user 'root', password 'xxx');

--- Import all the definitions of tables and materialized views under the `public` schema from RisingWave to the `public` schema in PostGreSQL.
IMPORT FOREIGN SCHEMA public
FROM SERVER risingwave INTO public;

By running the above statements, all tables and materialized views (MVs) from RisingWave will be imported into PostgreSQL and can be accessed as foreign tables. To verify which tables have been successfully mapped into PostgreSQL, you can run the following command:

--- Run in PostgreSQL
\\d
------
List of relations
Schema | Name | Type | Owner
--------+-----------------------------+---------------+--------------
public | orders | table | postgresuser
public | orders_total_price_per_day | foreign table | postgresuser
public | orders_total_price_per_hour | foreign table | postgresuser
public | orders_total_price_per_min | foreign table | postgresuser
public | pg_orders | foreign table | postgresuser

You will see that the tables and materialized views in RisingWave are mapped as foreign tables in PostgreSQL. If you wish to import a specific table or MV, such as orders_total_price_per_day, you can achieve this by running the following command:

--- Run in PostgreSQL
CREATE FOREIGN TABLE orders_total_price_per_day(
date timestamp with time zone,
totalprice NUMERIC
)
SERVER risingwave
OPTIONS (schema_name 'public', table_name 'orders_total_price_per_day')

Although this method requires some configuration in PostgreSQL and might seem more complex, it is a one-time setup process and the parameters are also straightforward. Using IMPORT FOREIGN SCHEMA, you don't even need to specify table and MV names - PostgreSQL will import them automatically. Once configured correctly, you can query the remote data directly in PostgreSQL, as if the tables existed locally.

--- Run in PostgreSQL
SELECT SUM(totalprice)
FROM orders_total_price_per_day
WHERE date BETWEEN date_trunc('day', NOW() - INTERVAL '7 days') AND date_trunc('day', NOW());
------
sum
---------------
1725458400.05

The entire architecture can be represented by the following diagram (assuming all the table and materialized views are imported):

Benefits of accessing RisingWave data in PostgreSQL

The Foreign Data Wrapper offers two significant advantages. Firstly, it allows you to access intermediate computation results from RisingWave at any time, without additional setup. This means you can query tables like orders_total_price_per_hour and orders_total_price_per_min directly in PostgreSQL, without extra configuration. This approach is particularly useful for ad-hoc scenarios, where businesses may need to debug or temporarily inspect data without extra setup.

Secondly, since FDW is a standard PostgreSQL extension, you can leverage all PostgreSQL features, such as joining foreign tables with ordinary tables. For example, you can join the orders table in PostgreSQL with the orders_total_price_per_day foreign table.

--- Run in PostgreSQL
SELECT count(*) totalcount, orders_total_price_per_day.date, orders_total_price_per_day.totalprice FROM orders_total_price_per_day JOIN orders ON date_trunc('day', orders.o_orderdate) = orders_total_price_per_day.date GROUP BY orders_total_price_per_day.date, orders_total_price_per_day.totalprice;

Unlike other solutions, Foreign Data Wrapper provides a seamless experience when working with multiple databases. Typically, analyzing data from two databases requires manual data import, which can be tedious. FDW solves this problem elegantly. This approach, where you can query data from multiple databases in a single query, is known as federated querying.

Supplemental information

If you’re curious about the CDC and FDW support in different Postgres distributions, here’s a list that was put together towards the end of April, 2024.

CONCLUSION

In this blog, we explored three methods to interact with RisingWave and PostgreSQL: directly query in RisingWave, output data to a destination and query from there, and using postgres_fdw to access RisingWave data in PostgreSQL.

If you need seamless integration between RisingWave and PostgreSQL, using FDW to access RisingWave data as a Foreign Table is a great choice.

--

--