Postgres Triggers with Listen / Notify

Ifat Ribon
LaunchPad Lab
Published in
4 min readFeb 28, 2023

Introduction

A common problem set developers encounter is knowing when an event has occurred in the database (change in data) and capturing that change to perform subsequent processes on it. Luckily, Postgres offers several native capabilities that support solving that exact problem.

The main capabilities we’ll cover today include Postgres triggers, and Postgres’ notify and listen functions.

Postgres Triggers

Postgres triggers are essentially callbacks at the database level that can execute a defined function before, after, or instead of identified operations (i.e., INSERT, UPDATE, DELETE), on the table(s) you specify. This supports the recognition and capture of changed data.

Within the trigger function, Postgres surfaces metadata of the event, such as the operation (event name), schema, table name, and OLD and NEW versions of the record’s attributes (you can find all the metadata available in the PG trigger function docs).

In the function you can include any logic you need, such as conditionals, committing other transactions to the database (e.g., inserting a new record), and calling other Postgres functions, which we’ll see later.

CREATE OR REPLACE FUNCTION process_record() RETURNS TRIGGER as $process_record$
BEGIN
IF (TG_OP = 'DELETE') THEN
INSERT INTO user_log(user_id, user_last_name, date_deleted)
VALUES(OLD.id, OLD.last_name, current_date);
RETURN OLD;
ELSE
INSERT INTO user_log(user_id, user_last_name, date_deleted)
VALUES(NEW.id, NEW.last_name, current_date);
RETURN NEW;
END IF;
END;
$process_record$ LANGUAGE plpgsql;

To apply the trigger to the relevant tables, you use the CREATE TRIGGER keywords and give the trigger a name.

You then indicate whether the trigger should occur before, after, or instead of the specified operation(s), on the specified table. The operation can be INSERT, UPDATE, DELETE, or any combination of the three.

You can then provide the option of whether to perform the function on EACH ROW of the table modified by the operation, or on EACH STATEMENT, which results in one execution per statement on the table.

Finally, you indicate to EXECUTE PROCEDURE, giving the name of the trigger function defined above.

CREATE TRIGGER users_trigger
AFTER INSERT OR UPDATE OR DELETE ON users FOR EACH ROW
EXECUTE PROCEDURE process_record()

Postgres Notify and Listen

Postgres offers native notify and listen functions that allow you to implement a notification message system where there is a message that is broadcast and a listener running to receive the message.

NOTIFY broadcasts a notification event with a defined payload string through defined channels

LISTEN establishes sessions on defined channels to capture notifications sent on those channels

Photo by Museums Victoria on Unsplash

The pg_notify function takes the name of a channel and a string payload. You can then use the LISTEN command that will be open and listening for the channels you define, and capture the notifications in the payload you specify for the notification.

pg_notify('create_or_update_record', '{klass_name: users, crud_method: create_or_update, record_id: 2}');

In Ruby on Rails, for instance, you can set up your listener by manually opening a connection to the database with the ActiveRecord PostgreSQL adapter, and exposing the underlying PG::Connection object. From there you can call Postgres’ LISTEN function with the name of the channel it should listen for.

class DatabaseListener
def listen
ActiveRecord::Base.connection_pool.with_connection do |connection|
conn = connection.instance_variable_get(:@connection)

begin
conn.async_exec "LISTEN create_or_update_record"
loop do
conn.wait_for_notify do |channel, pid, payload|
puts "Received NOTIFY on channel #{channel} with payload: #{payload}"
process_notification(payload) # your code here
end
end
ensure
conn.async_exec "UNLISTEN *"
end
end
end
end

You can then have an infinite loop to wait for the notification, using Rails’ wait_for_notify. When the notification arrives, the block receives the broadcast and its metadata, such as the channel, PID, and payload defined earlier.

You also want to manage your connection pooling. For example, you can use Postgres’ UNLISTEN if the loop breaks and the connection pool closes on its own, to prevent orphaned listeners and lost notifications.

Bringing It All Together

Using what we know about Postgres triggers, notify, and listen, we can build a data syncing system that captures changes made to data in real time and delivers those changes to the code that contains the business logic for managing system integrations.

For instance, perhaps we have a scenario where a change in a User record from one part of the system requires a change to be made in another table for another system. In this case, we can create a trigger function that is executed on an insert, update, or deletion of a user record, and sends a notification to the app with the record and changes in question. The listener can then pass off the metadata to a service that can process the changes that need to happen.

CREATE OR REPLACE FUNCTION process_record() RETURNS TRIGGER as $process_record$
BEGIN
IF (TG_OP = 'DELETE') THEN
PERFORM pg_notify('delete_record', '{klass_name: "' || TG_TABLE_NAME || '", crud_method: delete, record_id: "' || COALESCE(OLD.id, 0) || '"}');
RETURN OLD;
ELSE
PERFORM pg_notify('create_or_update_record', '{klass_name: "' || TG_TABLE_NAME || '", crud_method: "' || TG_EVENT || '", record_id: "' || COALESCE(NEW.id, 0) || '"}');
RETURN NEW;
END IF;
END;
$process_record$ LANGUAGE plpgsql;

CREATE TRIGGER users_trigger
AFTER INSERT OR UPDATE OR DELETE ON users FOR EACH ROW
EXECUTE PROCEDURE process_record()

--

--