PostgreSQL Trigger-Based Audit Log

Stav Barak
Israeli Tech Radar
Published in
10 min readOct 5, 2023

If your app has data that is editable by users, you’ll probably want to keep a log of those changes. There are many approaches to doing so, some are more frowned upon than others. Of course, I am not here to frown but to show you a little thing I did (then, you can frown at me in the comments).

Photo by Henry & Co. on Unsplash

The Stack and the Actors

(TLDR: React, Python (fastapi, sqlalchemy), PostgreSQL)

We have a web app built using React and a Postgres database. The web app shows case studies, each may contain some series, and each series contains images. The user is able to edit all of those. They can add a new case study, change a case study name or any metadata on the image, delete it, and so on.

Upon such a change, a client will make an API call to our FastAPI Python server which will execute the database query to create, update, or delete an entry in its corresponding database table.

Now we want to also log those changes.

Get to Work

Let’s start by creating an audit log table:

CREATE TABLE IF NOT EXISTS audit_log (
id serial PRIMARY KEY,
table_name TEXT,
record_id TEXT,
operation_type TEXT,
changed_at TIMESTAMP DEFAULT now(),
changed_by TEXT,
original_values jsonb,
new_values jsonb,
);

Of course, the columns may vary according to the needs, but here’s what I went with as a baseline:

  • id: a serial key of the entry.
  • table_name: the name of the table in which the update occurred.
  • record_id: the id of the entry in the changed table.
  • operation_type: INSERT, UPDATE, or DELETE.
  • changed_at: time of the update.
  • changed_by: which user made the update (not as trivial as it seems, but we will get to it).
  • original_values: a json object that holds the changed key with the original value.
  • new_values: a json object that holds the changed key with the new value.

Log to the Audit Log Table

Okay, so we created the table, now what?

As I mentioned before, there is more than one way to perform the logging action. One approach is to just have the server make an additional query to insert an entry to the audit log after every update query. This could be done using middleware or a decorator to make it feel cleaner, but it is still an extra query every time. I’m not saying it’s not a legitimate solution. However, what I’m going to showcase in this post is not that, but how to make the database log the changes automatically.

PostgreSQL Triggers

A PostgreSQL trigger is a special user-defined function invoked automatically whenever an event associated with a table occurs. An event could be any of the following: INSERT, UPDATE, DELETE, or TRUNCATE.

So we need to:

  1. Create a trigger function
  2. Subscribe a table to it (or several tables actually, because logging changes is not a table-specific task by nature)

Enough talk. Let’s break down some code.

Don’t Panic

You’re about to see a long and rather weird function. If you’re not familiar with the PostgreSQL syntax, that’s okay. We will get through it together.

Here goes:

CREATE OR REPLACE FUNCTION audit_trigger() RETURNS TRIGGER AS $$
DECLARE
new_data jsonb;
old_data jsonb;
key text;
new_values jsonb;
old_values jsonb;
user_id text;
BEGIN

user_id := current_setting('audit.user_id', true);

IF user_id IS NULL THEN
user_id := current_user;
END IF;

new_values := '{}';
old_values := '{}';

IF TG_OP = 'INSERT' THEN
new_data := to_jsonb(NEW);
new_values := new_data;

ELSIF TG_OP = 'UPDATE' THEN
new_data := to_jsonb(NEW);
old_data := to_jsonb(OLD);

FOR key IN SELECT jsonb_object_keys(new_data) INTERSECT SELECT jsonb_object_keys(old_data)
LOOP
IF new_data ->> key != old_data ->> key THEN
new_values := new_values || jsonb_build_object(key, new_data ->> key);
old_values := old_values || jsonb_build_object(key, old_data ->> key);
END IF;
END LOOP;

ELSIF TG_OP = 'DELETE' THEN
old_data := to_jsonb(OLD);
old_values := old_data;

FOR key IN SELECT jsonb_object_keys(old_data)
LOOP
old_values := old_values || jsonb_build_object(key, old_data ->> key);
END LOOP;

END IF;

IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
INSERT INTO audit_log (table_name, record_id, operation_type, changed_by, original_values, new_values)
VALUES (TG_TABLE_NAME, NEW.id, TG_OP, user_id, old_values, new_values);

RETURN NEW;
ELSE
INSERT INTO audit_log (table_name, record_id, operation_type, changed_by, original_values, new_values)
VALUES (TG_TABLE_NAME, OLD.id, TG_OP, user_id, old_values, new_values);

RETURN OLD;
END IF;
END;
$$ LANGUAGE plpgsql;

Take it all in for a minute, and keep reading:

Trigger Breakdown

CREATE OR REPLACE FUNCTION audit_trigger() RETURNS TRIGGER AS $$

RETURNS TRIGGER specifies that this function will return a trigger object. This is important for the function to be used as a trigger.

The next section is the declaration of local variables used in the function:

DECLARE
new_data jsonb;
old_data jsonb;
key text;
new_values jsonb;
old_values jsonb;
user_id text;

Moving on to the main block of the function (after BEGIN):

user_id := current_setting('audit.user_id', true);
IF user_id IS NULL THEN
user_id := current_user;
END IF;
  • current_setting() is used to retrieve the current value of a configuration parameter.
  • In this case, it’s looking for the value of audit.user_id. If it's not set, user_id is assigned the value of current_user which is a keyword that references the current database user that triggered the change.

Now let’s take a short break to explain that part because this is actually the heart and soul of the feature.

Photo by Scott Blake on Unsplash

Where Does the Trigger Get audit.user_id From?

That is a great question. It has to get it from the database session.

A database session refers to a period during which a user or application interacts with a database, typically starting when a connection is established and ending when it is terminated, encompassing all related transactions and interactions in between.

Say we were to run this command just before running the update:

SELECT set_config('audit.user_id', 'test user', true);

This is how we would theoretically configure the session variable ‘audit.user_id’ to be ‘test user’ (and default to null). But obviously, we are not going to do that, since we want to know the actual app user who made the change. What we want to access is the real session created in the server, where the users are authenticated. This means the server has to start the database session, add whatever variables to it (in our case, audit.user_id), and use the session across all update requests.

Just a quick side note: in ‘audit.user_id’, the word audit is used as a namespace. You can call it whatever you want. But the funny thing is that I had to have it because it wouldn’t work otherwise. I admit I don’t know why (SQLalchemy is weird like that) and I didn’t find any documentation about this issue, which made it very hard to crack this feature. If you have some insights, please share them in the comments.

Trigger Breakdown: Continue

IF TG_OP = 'INSERT' THEN
new_data := to_jsonb(NEW);
new_values := new_data;

ELSIF TG_OP = 'UPDATE' THEN
new_data := to_jsonb(NEW);
old_data := to_jsonb(OLD);

FOR key IN SELECT jsonb_object_keys(new_data) INTERSECT SELECT jsonb_object_keys(old_data)
LOOP
IF new_data ->> key != old_data ->> key THEN
new_values := new_values || jsonb_build_object(key, new_data ->> key);
old_values := old_values || jsonb_build_object(key, old_data ->> key);
END IF;
END LOOP;

ELSIF TG_OP = 'DELETE' THEN
old_data := to_jsonb(OLD);
old_values := old_data;

FOR key IN SELECT jsonb_object_keys(old_data)
LOOP
old_values := old_values || jsonb_build_object(key, old_data ->> key);
END LOOP;

END IF;

In PostgreSQL, OLD represents the row before update while the NEW represents the new row that will be updated (see docs).
This block handles different cases of TG_OP which is a keyword representing the type of operation (INSERT, UPDATE, or DELETE). If it’s an INSERT, new_data is assigned the JSONB representation of the new row (NEW). If it’s an UPDATE, both new_data and old_data are assigned the JSONB representations of the new and old rows. The loop iterates over the keys present in both the new and old data.

Let’s get inside the loop (for example in the case of UPDATE):

FOR key IN SELECT jsonb_object_keys(new_data) INTERSECT SELECT jsonb_object_keys(old_data)
LOOP
IF new_data ->> key != old_data ->> key THEN
new_values := new_values || jsonb_build_object(key, new_data ->> key);
old_values := old_values || jsonb_build_object(key, old_data ->> key);
END IF;
END LOOP;

new_values and old_values on the left side of the || operator refers to the variables we declared at the beginning of the function, and they are used to store the values that were modified and the original values accordingly. On the right side of ||, we use the jsonb_build_object to create a json of just the changed value, old and new.

IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
INSERT INTO audit_log (table_name, record_id, operation_type, changed_by, original_values, new_values)
VALUES (TG_TABLE_NAME, NEW.id, TG_OP, user_id, old_values, new_values);

RETURN NEW;
ELSE
INSERT INTO audit_log (table_name, record_id, operation_type, changed_by, original_values, new_values)
VALUES (TG_TABLE_NAME, OLD.id, TG_OP, user_id, old_values, new_values);

RETURN OLD;
END IF;

The last IF block is the actual insertion into audit_log table.

An important notice: here we are inserting NEW.id and OLD.id to the column record_id. We can only do it based on the assumption that all the triggering tables have a column called id. If that wasn’t the case, we’d have to think of a different implementation.

Now we’re done with the trigger function!

Add Trigger to Table

Now let’s add the trigger to our three user-editable tables, like so:

CREATE TRIGGER audit_log_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON public.case_studies
FOR EACH ROW
EXECUTE FUNCTION audit_trigger();


CREATE TRIGGER audit_log_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON public.series
FOR EACH ROW
EXECUTE FUNCTION audit_trigger();


CREATE TRIGGER audit_log_trigger
BEFORE INSERT OR UPDATE OR DELETE
ON public.images
FOR EACH ROW
EXECUTE FUNCTION audit_trigger();

And now we have almost(!) everything we need. Except we still don’t have a way of getting the user id from the session. The one that made the changes.

Photo by Sean D on Unsplash

The Server Part

In order to have the user id in the session, we need to create a fastAPI middleware to create an SQLAlchemy SessionLocal for each request, add it to the request, and then close it once the request is finished (see docs).

Let’s break this down as well.

First, we create the SessionLocal (You use your actual database URL of course):

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"

engine = create_engine(
SQLALCHEMY_DATABASE_URL
)
SessionLocal = sessionmaker(autocommit=True, autoflush=True, bind=engine)

Then, we create the first dependency:

# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

Then, we can write two more dependencies. One to save the session in the request state, and another one to start the session and execute the command that sets the audit.user_id to the current user id (assuming your server already has an implementation for authenticating the user and getting its id):

from fastapi import Depends, Request
# let's assume a user implementation:
from user_model import User
from user_implementation_with_id import user
# end of assumptions


def start_session_with_user_id(user_id: str, session: Any = Database.session):
session.begin()
session.execute(
"select set_config('audit.user_id', :user_id, true)",
{"user_id": user_id},
)


def set_session_to_request(
request: Request, user: User = user, db=Depends(get_db)
):

if request.method != "GET":
request.state.db_session = db

start_session_with_user_id(
user.id, session=request.state.db_session
)

try:
yield
if request.state.db_session.is_active:
request.state.db_session.commit()
except Exception as exception:
try:
request.state.db_session.rollback()
except Exception as e:
LOGGER.error(f"Failed to rollback transaction: {e}")
raise exception
else:
yield

We now need to configure our server to useset_session_to_request :

APP = FastAPI(dependencies=[Depends(set_session_to_request)])

Now we can make another dependency to make it easy for any update function to grab the session from the request:

from typing import Any
from fastapi import Request


def get_db_session(request: Request) -> Any:
return request.state.db_session

And finally use it in an actual request! For example, assume we can update the series title:

# some more implementation assumptions here:
from user_model import User
from user_implementation_with_id import user
from series_model import Series
# end of assumptions

@router.put("/series/{series_id}", response_model=str)
def update_series(
series_id: str,
title: str = Body(),
user: User = user,
session: Any = Depends(get_db_session),
) -> str:
"""
Update Series
"""

try:
Series.update_data(
series_id,
{
"title": title,
},
user_id=user.id,
session=session,
)

except ValidationError as exception:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=str(exception)
)

return series_id

Notice how we pass the session to the request as a dependency. Then we use it in the update_data class method that might look like this (again, just an example! Also, don’t forget to import the types and stuff you need):

class Series(Database.Base):
__tablename__ = "series"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
title = Column(TEXT)
# probably more fields...


@classmethod
def update_data(
cls,
series_id: str,
data: Dict[str, str],
session: Any = None,
):
session = session

series = cls.query.filter(cls.id == series_id).one_or_none()

changed_fields = {}

if series.title != data["title"]:
changed_fields["title"] = data["title"]


update_query = session.query(cls).filter(cls.id == series_id) # type: ignore

update_query.update({**data, "id": series_id})

In this function, we query the session we got from the route. Finally, upon running this query on a series of id 1234 (for example), the trigger function on series table will fire, and say our user id is example_user_id and we changed the title from “I am the old title” to “I am the new title”, we expect to see a new entry in audit_log table that will look like this:

(1234 under record_id, series under table_name, UPDATE under operation_type, { title: I am the old title }and { title: I am the new title } under original_values and new_values accordingly. And most importantly, the actual user id under changed_by, so you can go and blame them after!

Photo by Wesley Tingey on Unsplash

If you made it this far, let’s sum up what we just did:

  1. A trigger function that is executed upon updates in certain tables and inserts information to an audit log table.
  2. A middleware that saves important variables into the database session so that the trigger can access them.

Some Important Notes to Keep in Mind

In this example we only used the middleware to save the user id in the session, but I would also recommend getting the change timestamp the same way so that we’ll have the actual time of the request rather than the time of the trigger execution.

Also, we didn’t cover the subject of permissions. Obviously, you’ll need to grant the relevant permissions to whichever user will write to the audit log, but also remember it’s generally a bad idea to grant UPDATE to it, since you probably don’t want anyone rewriting the history.

I hope this was helpful.

--

--