Streamlining Oracle Advanced Queue to Transactional Event Queue Migration

Kalpesh Dusane
Oracle Developers
Published in
9 min readMar 21, 2024
Photo by Julia Craice on Unsplash

With the release of Oracle Database 23c, users now have access to the newly introduced PL/SQL package DBMS_AQMIGTOOL, providing a seamless solution for migrating their existing Advanced Queue (AQ) to Transactional Event Queue (TxEventQ) without any hassle. In this post, we’ll explore the details of migrating from AQ to TxEventQ, a task that has been made much simpler despite its inherent complexity.

Before delving into the details of migration, it’s essential to understand the background of Oracle message queuing systems. Oracle Database has incorporated queuing functionality for many years, dating back to Oracle 8. The legacy disk-based implementation, known as Oracle Advanced Queuing (AQ), contrasts with the newer in-memory, system-partitioned implementation, Oracle Transactional Event Queue (TxEventQ). TxEventQ is optimized for higher throughput and lower latency and offers transactional semantics with durability, making it an ideal choice for developing event-driven microservices and workflows.

Need for Migration tool?

The absence of a user-friendly API for migrating to TxEventQ without disrupting existing queues necessitates the need for a migration tool. The feature disparities between AQ and TxEventQ and data representation changes make manual migration complex and prone to downtime. Oracle Database 23c addresses this challenge with the DBMS_AQMIGTOOL PL/SQL package, streamlining the migration process and enabling a seamless transition to TxEventQ without downtime or extensive manual intervention.

Example Walkthrough: Migrating from AQ to TxEventQ

In this example walkthrough, we begin by creating a user and an advanced queue with a JSON payload type. We then proceed to explore the migration procedure within the DBMS_AQMIGTOOL package, which is essential for transitioning our queue to TxEventQ. Without delay, let's dive right in!

Step 1: Preparing the database

To get started, I grabbed a new Oracle Database 23c instance (base DB) on Oracle Cloud, opened up the SQL Worksheet in Database Actions, and created a user called aq_user. As the ADMIN user, I ran the following commands:

DROP USER aq_user CASCADE;
CREATE USER aq_user IDENTIFIED BY SomePassword; -- that's not the real password!
GRANT CONNECT, RESOURCE TO aq_user;
GRANT UNLIMITED TABLESPACE TO aq_user;
GRANT EXECUTE ON DBMS_AQ TO aq_user;
GRANT EXECUTE ON DBMS_AQADM TO aq_user;
GRANT EXECUTE ON DBMS_AQMIGTOOL TO aq_user;

The only additional grant we need is to provide the EXECUTE privilege on the DBMS_AQMIGTOOL package to our user aq_user.

Step 2: Setting up the AQ

Of course, I needed an AQ to work with, so I logged on to SQL Worksheet as my new aq_user user, then performed SET serveroutput ON and created a queue called json_queue with these commands:

BEGIN
SYS.DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'json_queue_table',
queue_payload_type => 'JSON',
sort_list => 'priority,enq_time');
SYS.DBMS_AQADM.CREATE_QUEUE(queue_name => 'json_queue',
queue_table => 'json_queue_table');
SYS.DBMS_AQADM.START_QUEUE(queue_name => 'json_queue');
END;
/

For simplicity, we’re demonstrating a single consumer queue, but it also functions with multi-consumer queues.

You can verify whether our queue has been created by querying the user_queues view.

SQL> SELECT name || ' -> ' || queue_category FROM user_queues WHERE queue_type = 'NORMAL_QUEUE' ORDER BY name;

NAME||'->'||QUEUE_CATEGORY
--------------------------------------------------------------------------------
JSON_QUEUE -> Classic Queue

The output displays Classic Queue, which is equivalent to AQ.

I have created a wrapper procedure customized for enqueue-dequeue operations specific to our JSON payload. So, we’ll call this procedure whenever we need to enqueue or dequeue data into our json_queue.

CREATE or REPLACE PROCEDURE enqueue_json (qname in VARCHAR2, data in VARCHAR2)
IS
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
msg_id RAW(16);
payload JSON;
value VARCHAR2(200) := '{ "tkey" : "' || data ||'" }';
BEGIN
SELECT JSON(value) INTO payload FROM dual;

SYS.DBMS_AQ.ENQUEUE(
queue_name => qname,
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => payload,
msgid => msg_id);
END;
/

CREATE or REPLACE PROCEDURE dequeue_json (qname in VARCHAR2)
IS
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
payload JSON;
msg_id RAW(16);
data VARCHAR2(200);
BEGIN
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.wait := DBMS_AQ.NO_WAIT;

SYS.DBMS_AQ.DEQUEUE(
queue_name => qname,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => payload,
msgid => msg_id);

SELECT JSON_VALUE(payload, '$.tkey') INTO data FROM dual;
dbms_output.put_line('Payload value - ' || data);
END;
/

As evident, I’m converting data from VARCHAR2 to JSON data type using SELECT JSON and retrieving it post-dequeuing using JSON_VALUE. I've also used NO_WAIT and FIRST_MESSAGE navigation for dequeue options. Additionally, while this demonstration is simplified, users have the flexibility to perform enqueue-dequeue operations using their preferred client-side languages like JAVA or Python. Both AQ and TxEventQ offer polyglot support.

-- enqueue 10 messages
DECLARE
cnt NUMBER := 10;
BEGIN
FOR indx IN 1 .. cnt LOOP
enqueue_json('json_queue', 'val - ' || TO_CHAR(indx));
END LOOP;
END;
/

I’m enqueuing the payload in JSON format as {"tkey":"val - 1"}.

Step 3: Initiating Migration

The migration process doesn’t involve direct data (message) transfer between AQ and TxEventQ.

Instead, we’ve introduced two primary APIs: INIT_MIGRATION, which kick-starts the migration by creating an interim TxEventQ using AQ's metadata, and COMMIT_MIGRATION, which finalizes the process by dropping AQ and renaming the TxEventQ back to AQ's name. This ensures that user applications remain unchanged.

Before delving into the migration procedures, it’s recommended to utilize the CHECK_MIGRATION_TO_TXEVENTQ procedure provided in the DBMS_AQMIGTOOL package. This optional step helps identify any features the user uses that are not supported in TxEventQ. Performing this check before starting the migration is highly recommended.

DECLARE
migration_report sys.TxEventQ_MigReport_Array := sys.TxEventQ_MigReport_Array();
BEGIN
SYS.DBMS_AQMIGTOOL.CHECK_MIGRATION_TO_TXEVENTQ(
cqschema => 'aq_user',
cqname => 'json_queue',
migration_report => migration_report);

dbms_output.put_line('-------------migration_report--------');
dbms_output.put_line('Number of unsupported features found in json_queue are : ' || migration_report.COUNT);
END;
/
-------------migration_report--------
Number of unsupported features found in json_queue are : 0

Now that no unsupported features are found, the user can proceed with the migration. If any unsupported features are detected in queue metadata or messages, you can check the guidelines for workarounds. Although this check is also internally performed in the INIT_MIGRATION API, it's helpful to review the list, if present, before initiating the migration.

To execute the INIT_MIGRATION procedure, you must provide the schema and queue name. Additionally, by using SYS.DBMS_AQMIGTOOL.SESSION for the 'ordering' parameter, we create the default number of event streams (typically 5, based on the initialization parameter _aq_init_shards), as opposed to the default value SYS.DBMS_AQMIGTOOL.GLOBAL, which generates only a single event stream to preserve global message-level ordering. While TxEventQ doesn't guarantee message ordering between event streams, it ensures session-level ordering. So, if the application doesn't need global message ordering, having more event streams can improve throughput. Other parameters are filled by default; the full definition is here.

BEGIN
SYS.DBMS_AQMIGTOOL.INIT_MIGRATION(cqschema => 'aq_user',
cqname => 'json_queue',
ordering => SYS.DBMS_AQMIGTOOL.SESSION);
END;
/

This procedure initiates migration by creating an interim TxEventQ with an identical queue configuration as AQ and the same name, appended with the default suffix M. To confirm if the interim TxEventQ is created, query the user_queues view.

SQL> SELECT name || ' -> ' || queue_category FROM user_queues WHERE queue_type = 'NORMAL_QUEUE' ORDER BY name;

NAME||'->'||QUEUE_CATEGORY
--------------------------------------------------------------------------------
JSON_QUEUE -> Classic Queue
JSON_QUEUE_M -> Transactional Event Queue

Note: INIT_MIGRATION is a non-blocking procedure, allowing it to be executed concurrently with online workloads. It operates independently on each queue, enabling multiple queues to be migrated simultaneously.

Step 4: How do enqueue-dequeues work?

You might wonder how enqueue-dequeue operations function after initiating the migration.

The question naturally arises: Are there any modifications required in the enqueue-dequeue calls?

The answer is NO. Users do not need to alter the application for enqueue-dequeue operations.

  • Enqueue Request Behaviour

There’s no need to modify the enqueue call. All enqueue requests following INIT_MIGRATION will be internally routed to an interim TxEventQ.

Enqueue Behaviour
SQL> -- enqueue 10 messages
SQL> DECLARE
2 cnt NUMBER := 10;
3 BEGIN
4 FOR indx IN 1 .. cnt LOOP
5 enqueue_json('json_queue', 'val (after INIT) - ' || TO_CHAR(indx+10));
6 END LOOP;
7 END;
8 /

PL/SQL procedure successfully completed.

SQL> COMMIT;

Commit complete.

To distinguish between the two sets of messages, I’ve added (after INIT) in the value field of the JSON payload.

  • Dequeue Request Behaviour

No adjustments are needed in the dequeue call. Dequeue requests will first exhaust all messages from AQ. Only after AQ is empty will messages be dequeued from the interim TxEventQ. As depicted in the figure, these two scenarios are illustrated as case 1 and case 2.

Dequeue Behaviour in migration (after INIT_MIGRATION)
SQL> -- dequeue 15 messages
SQL> DECLARE
2 cnt NUMBER := 15;
3 BEGIN
4 FOR indx IN 1 .. cnt LOOP
5 dequeue_json('json_queue');
6 END LOOP;
7 END;
8 /
Payload value - val - 1
Payload value - val - 2
Payload value - val - 3
Payload value - val - 4
Payload value - val - 5
Payload value - val - 6
Payload value - val - 7
Payload value - val - 8
Payload value - val - 9
Payload value - val - 10
Payload value - val (after INIT) - 11
Payload value - val (after INIT) - 12
Payload value - val (after INIT) - 13
Payload value - val (after INIT) - 14
Payload value - val (after INIT) - 15

PL/SQL procedure successfully completed.

SQL> COMMIT;

Commit complete.

As evident, once AQ is empty, we start dequeuing messages from the interim TxEventQ, as indicated by the payload value. Furthermore, no modification is required in the dequeue call, allowing us to dequeue messages that were enqueued after executing the INIT_MIGRATION procedure.

Step 5: Can we check how many messages are left to be processed in our AQ?

Indeed, we can check how many messages are left to be processed in our AQ.

Before completing the migration, ensure that AQ is empty; otherwise, an exception will be raised.

To facilitate this, the DBMS_AQMIGTOOL package offers a CHECK_MIGRATED_MESSAGES procedure. This procedure enables users to verify how many messages are left to be processed in both the AQ and interim TxEventQ.

DECLARE
migrated_q_msg_cnt number := 0;
aq_msg_cnt number := 0;
BEGIN
SYS.DBMS_AQMIGTOOL.CHECK_MIGRATED_MESSAGES(
cqschema => 'aq_user',
cqname => 'json_queue',
txeventq_migrated_message => migrated_q_msg_cnt,
cq_pending_messages => aq_msg_cnt);
dbms_output.put_line('AQ ready state msg count: ' || aq_msg_cnt);
dbms_output.put_line('Migrated TxEventQ msg count: ' || migrated_q_msg_cnt);
END;
/
AQ ready state msg count: 0
Migrated TxEventQ msg count: 5

As observed, AQ is empty, while TxEventQ still has 5 messages awaiting dequeuing.

Step 6: Can you modify your AQ queue configuration during the migration process?

No, you cannot. Limitations are imposed on AQ while the queue is migrating to prevent inconsistencies between AQ and TxEventQ configurations. Therefore, any attempt by the user to execute DDL operations on AQ during this period will result in an error indicating that the queue is undergoing migration.

SQL> EXECUTE SYS.DBMS_AQADM.ALTER_QUEUE (queue_name => 'json_queue', retry_delay => 5);
BEGIN SYS.DBMS_AQADM.ALTER_QUEUE (queue_name => 'json_queue', retry_delay => 5); END;

*
ERROR at line 1:
ORA-25606: This operation is restricted because migration is ongoing on the
AQ_USER.JSON_QUEUE queue.


SQL> EXECUTE SYS.DBMS_AQADM.STOP_QUEUE (queue_name => 'json_queue');
BEGIN SYS.DBMS_AQADM.STOP_QUEUE (queue_name => 'json_queue'); END;

*
ERROR at line 1:
ORA-25606: This operation is restricted because migration is ongoing on the
AQ_USER.JSON_QUEUE queue.

Step 7: Finalizing the Migration

Once you run COMMIT_MIGRATION, the migration process wraps up. It removes the AQ and switches the interim TxEventQ back to the original AQ name.

BEGIN
SYS.DBMS_AQMIGTOOL.COMMIT_MIGRATION(cqschema => 'aq_user',
cqname => 'json_queue');
END;
/

In case of unexpected errors during the execution of migration procedures, such as INIT_MIGRATION or COMMIT_MIGRATION, it's recommended to utilize the RECOVER_MIGRATION procedure instead of attempting the same procedure again. To revert to the previous setup and cancel the migration, users can use the CANCEL_MIGRATION procedure.

We can confirm the success of the COMMIT_MIGRATION execution by verifying the user_queues view.

SQL> SELECT name || ' -> ' || queue_category FROM user_queues WHERE queue_type = 'NORMAL_QUEUE' ORDER BY name;

NAME||'->'||QUEUE_CATEGORY
--------------------------------------------------------------------------------
JSON_QUEUE -> Transactional Event Queue

Additionally, we must verify if we can dequeue the messages enqueued before executing the COMMIT_MIGRATION call.

SQL> EXECUTE enqueue_json('json_queue', 'val (after COMMIT) - 21');

PL/SQL procedure successfully completed.

SQL> COMMIT;

Commit complete.

SQL>
SQL> -- dequeue 6 messages
SQL> DECLARE
2 cnt NUMBER := 6;
3 BEGIN
4 FOR indx IN 1 .. cnt LOOP
5 dequeue_json('json_queue');
6 END LOOP;
7 END;
8 /
Payload value - val (after INIT) - 16
Payload value - val (after INIT) - 17
Payload value - val (after INIT) - 18
Payload value - val (after INIT) - 19
Payload value - val (after INIT) - 20
Payload value - val (after COMMIT) - 21

PL/SQL procedure successfully completed.

As evident, we can perform enqueue-dequeue operations as expected without any need for application changes or disruptions.

Note: The migration procedures within the DBMS_AQMIGTOOL package support rolling upgrades and Oracle GoldenGate replication. Enqueuing and dequeuing requests proceed without interruption as these migration procedures are non-blocking and can be executed during ongoing tasks. Watch the demonstration in the YouTube video below.

Demo of AQ to TxEventQ Online Migration Tool

The demonstrated migration is just one option (INTERACTIVE) among four supported modes: AUTOMATIC, INTERACTIVE, OFFLINE, and ONLY_DEFINITION. Each mode serves different user needs during the transition from AQ to TxEventQ. Further details on each mode can be found here.

Summary

And there you have it! We’ve successfully migrated from AQ to TxEventQ smoothly using two non-blocking procedures.

The online migration tool demonstrated above offers these benefits:

  • User-friendly interface for intuitive operation
  • Seamlessly executes migration without causing any disruptions or downtime
  • Facilitates adoption of TxEventQ without manual intervention or application changes
  • Provides flexibility in configuring the migration process

Further Readings

--

--

Kalpesh Dusane
Oracle Developers

Senior Member Of Technical Staff at Oracle | CSE, IIT Bombay