Streamlining Oracle Advanced Queue to Transactional Event Queue Migration
With the release of Oracle Database 23c, users now have access to the newly introduced PL/SQL package DBMS_AQMIGTOOL, providing a seamless solution for migrating their existing Advanced Queue (AQ) to Transactional Event Queue (TxEventQ) without any hassle. In this post, we’ll explore the details of migrating from AQ to TxEventQ, a task that has been made much simpler despite its inherent complexity.
Before delving into the details of migration, it’s essential to understand the background of Oracle message queuing systems. Oracle Database has incorporated queuing functionality for many years, dating back to Oracle 8. The legacy disk-based implementation, known as Oracle Advanced Queuing (AQ), contrasts with the newer in-memory, system-partitioned implementation, Oracle Transactional Event Queue (TxEventQ). TxEventQ is optimized for higher throughput and lower latency and offers transactional semantics with durability, making it an ideal choice for developing event-driven microservices and workflows.
Need for Migration tool?
The absence of a user-friendly API for migrating to TxEventQ without disrupting existing queues necessitates the need for a migration tool. The feature disparities between AQ and TxEventQ and data representation changes make manual migration complex and prone to downtime. Oracle Database 23c addresses this challenge with the DBMS_AQMIGTOOL PL/SQL package, streamlining the migration process and enabling a seamless transition to TxEventQ without downtime or extensive manual intervention.
Example Walkthrough: Migrating from AQ to TxEventQ
In this example walkthrough, we begin by creating a user and an advanced queue with a JSON payload type. We then proceed to explore the migration procedure within the DBMS_AQMIGTOOL
package, which is essential for transitioning our queue to TxEventQ. Without delay, let's dive right in!
Step 1: Preparing the database
To get started, I grabbed a new Oracle Database 23c instance (base DB) on Oracle Cloud, opened up the SQL Worksheet in Database Actions, and created a user called aq_user
. As the ADMIN
user, I ran the following commands:
DROP USER aq_user CASCADE;
CREATE USER aq_user IDENTIFIED BY SomePassword; -- that's not the real password!
GRANT CONNECT, RESOURCE TO aq_user;
GRANT UNLIMITED TABLESPACE TO aq_user;
GRANT EXECUTE ON DBMS_AQ TO aq_user;
GRANT EXECUTE ON DBMS_AQADM TO aq_user;
GRANT EXECUTE ON DBMS_AQMIGTOOL TO aq_user;
The only additional grant we need is to provide the
EXECUTE
privilege on theDBMS_AQMIGTOOL
package to our useraq_user
.
Step 2: Setting up the AQ
Of course, I needed an AQ to work with, so I logged on to SQL Worksheet as my new aq_user
user, then performed SET serveroutput ON
and created a queue called json_queue
with these commands:
BEGIN
SYS.DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'json_queue_table',
queue_payload_type => 'JSON',
sort_list => 'priority,enq_time');
SYS.DBMS_AQADM.CREATE_QUEUE(queue_name => 'json_queue',
queue_table => 'json_queue_table');
SYS.DBMS_AQADM.START_QUEUE(queue_name => 'json_queue');
END;
/
For simplicity, we’re demonstrating a single consumer queue, but it also functions with multi-consumer queues.
You can verify whether our queue has been created by querying the user_queues
view.
SQL> SELECT name || ' -> ' || queue_category FROM user_queues WHERE queue_type = 'NORMAL_QUEUE' ORDER BY name;
NAME||'->'||QUEUE_CATEGORY
--------------------------------------------------------------------------------
JSON_QUEUE -> Classic Queue
The output displays Classic Queue
, which is equivalent to AQ.
I have created a wrapper procedure customized for enqueue-dequeue operations specific to our JSON payload. So, we’ll call this procedure whenever we need to enqueue or dequeue data into our json_queue
.
CREATE or REPLACE PROCEDURE enqueue_json (qname in VARCHAR2, data in VARCHAR2)
IS
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
msg_id RAW(16);
payload JSON;
value VARCHAR2(200) := '{ "tkey" : "' || data ||'" }';
BEGIN
SELECT JSON(value) INTO payload FROM dual;
SYS.DBMS_AQ.ENQUEUE(
queue_name => qname,
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => payload,
msgid => msg_id);
END;
/
CREATE or REPLACE PROCEDURE dequeue_json (qname in VARCHAR2)
IS
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
payload JSON;
msg_id RAW(16);
data VARCHAR2(200);
BEGIN
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.wait := DBMS_AQ.NO_WAIT;
SYS.DBMS_AQ.DEQUEUE(
queue_name => qname,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => payload,
msgid => msg_id);
SELECT JSON_VALUE(payload, '$.tkey') INTO data FROM dual;
dbms_output.put_line('Payload value - ' || data);
END;
/
As evident, I’m converting data from VARCHAR2
to JSON
data type using SELECT JSON
and retrieving it post-dequeuing using JSON_VALUE
. I've also used NO_WAIT
and FIRST_MESSAGE
navigation for dequeue options. Additionally, while this demonstration is simplified, users have the flexibility to perform enqueue-dequeue operations using their preferred client-side languages like JAVA or Python. Both AQ and TxEventQ offer polyglot support.
-- enqueue 10 messages
DECLARE
cnt NUMBER := 10;
BEGIN
FOR indx IN 1 .. cnt LOOP
enqueue_json('json_queue', 'val - ' || TO_CHAR(indx));
END LOOP;
END;
/
I’m enqueuing the payload in JSON format as {"tkey":"val - 1"}
.
Step 3: Initiating Migration
The migration process doesn’t involve direct data (message) transfer between AQ and TxEventQ.
Instead, we’ve introduced two primary APIs:
INIT_MIGRATION
, which kick-starts the migration by creating an interim TxEventQ using AQ's metadata, andCOMMIT_MIGRATION
, which finalizes the process by dropping AQ and renaming the TxEventQ back to AQ's name. This ensures that user applications remain unchanged.
Before delving into the migration procedures, it’s recommended to utilize the CHECK_MIGRATION_TO_TXEVENTQ
procedure provided in the DBMS_AQMIGTOOL
package. This optional step helps identify any features the user uses that are not supported in TxEventQ. Performing this check before starting the migration is highly recommended.
DECLARE
migration_report sys.TxEventQ_MigReport_Array := sys.TxEventQ_MigReport_Array();
BEGIN
SYS.DBMS_AQMIGTOOL.CHECK_MIGRATION_TO_TXEVENTQ(
cqschema => 'aq_user',
cqname => 'json_queue',
migration_report => migration_report);
dbms_output.put_line('-------------migration_report--------');
dbms_output.put_line('Number of unsupported features found in json_queue are : ' || migration_report.COUNT);
END;
/
-------------migration_report--------
Number of unsupported features found in json_queue are : 0
Now that no unsupported features are found, the user can proceed with the migration. If any unsupported features are detected in queue metadata or messages, you can check the guidelines for workarounds. Although this check is also internally performed in the INIT_MIGRATION
API, it's helpful to review the list, if present, before initiating the migration.
To execute the INIT_MIGRATION
procedure, you must provide the schema and queue name. Additionally, by using SYS.DBMS_AQMIGTOOL.SESSION
for the 'ordering
' parameter, we create the default number of event streams (typically 5, based on the initialization parameter _aq_init_shards
), as opposed to the default value SYS.DBMS_AQMIGTOOL.GLOBAL
, which generates only a single event stream to preserve global message-level ordering. While TxEventQ doesn't guarantee message ordering between event streams, it ensures session-level ordering. So, if the application doesn't need global message ordering, having more event streams can improve throughput. Other parameters are filled by default; the full definition is here.
BEGIN
SYS.DBMS_AQMIGTOOL.INIT_MIGRATION(cqschema => 'aq_user',
cqname => 'json_queue',
ordering => SYS.DBMS_AQMIGTOOL.SESSION);
END;
/
This procedure initiates migration by creating an interim TxEventQ with an identical queue configuration as AQ and the same name, appended with the default suffix M
. To confirm if the interim TxEventQ is created, query the user_queues
view.
SQL> SELECT name || ' -> ' || queue_category FROM user_queues WHERE queue_type = 'NORMAL_QUEUE' ORDER BY name;
NAME||'->'||QUEUE_CATEGORY
--------------------------------------------------------------------------------
JSON_QUEUE -> Classic Queue
JSON_QUEUE_M -> Transactional Event Queue
Note:
INIT_MIGRATION
is a non-blocking procedure, allowing it to be executed concurrently with online workloads. It operates independently on each queue, enabling multiple queues to be migrated simultaneously.
Step 4: How do enqueue-dequeues work?
You might wonder how enqueue-dequeue operations function after initiating the migration.
The question naturally arises: Are there any modifications required in the enqueue-dequeue calls?
The answer is NO. Users do not need to alter the application for enqueue-dequeue operations.
- Enqueue Request Behaviour
There’s no need to modify the enqueue call. All enqueue requests following INIT_MIGRATION
will be internally routed to an interim TxEventQ.
SQL> -- enqueue 10 messages
SQL> DECLARE
2 cnt NUMBER := 10;
3 BEGIN
4 FOR indx IN 1 .. cnt LOOP
5 enqueue_json('json_queue', 'val (after INIT) - ' || TO_CHAR(indx+10));
6 END LOOP;
7 END;
8 /
PL/SQL procedure successfully completed.
SQL> COMMIT;
Commit complete.
To distinguish between the two sets of messages, I’ve added (after INIT)
in the value field of the JSON payload.
- Dequeue Request Behaviour
No adjustments are needed in the dequeue call. Dequeue requests will first exhaust all messages from AQ. Only after AQ is empty will messages be dequeued from the interim TxEventQ. As depicted in the figure, these two scenarios are illustrated as case 1 and case 2.
SQL> -- dequeue 15 messages
SQL> DECLARE
2 cnt NUMBER := 15;
3 BEGIN
4 FOR indx IN 1 .. cnt LOOP
5 dequeue_json('json_queue');
6 END LOOP;
7 END;
8 /
Payload value - val - 1
Payload value - val - 2
Payload value - val - 3
Payload value - val - 4
Payload value - val - 5
Payload value - val - 6
Payload value - val - 7
Payload value - val - 8
Payload value - val - 9
Payload value - val - 10
Payload value - val (after INIT) - 11
Payload value - val (after INIT) - 12
Payload value - val (after INIT) - 13
Payload value - val (after INIT) - 14
Payload value - val (after INIT) - 15
PL/SQL procedure successfully completed.
SQL> COMMIT;
Commit complete.
As evident, once AQ is empty, we start dequeuing messages from the interim TxEventQ, as indicated by the payload value. Furthermore, no modification is required in the dequeue call, allowing us to dequeue messages that were enqueued after executing the INIT_MIGRATION
procedure.
Step 5: Can we check how many messages are left to be processed in our AQ?
Indeed, we can check how many messages are left to be processed in our AQ.
Before completing the migration, ensure that AQ is empty; otherwise, an exception will be raised.
To facilitate this, the DBMS_AQMIGTOOL
package offers a CHECK_MIGRATED_MESSAGES
procedure. This procedure enables users to verify how many messages are left to be processed in both the AQ and interim TxEventQ.
DECLARE
migrated_q_msg_cnt number := 0;
aq_msg_cnt number := 0;
BEGIN
SYS.DBMS_AQMIGTOOL.CHECK_MIGRATED_MESSAGES(
cqschema => 'aq_user',
cqname => 'json_queue',
txeventq_migrated_message => migrated_q_msg_cnt,
cq_pending_messages => aq_msg_cnt);
dbms_output.put_line('AQ ready state msg count: ' || aq_msg_cnt);
dbms_output.put_line('Migrated TxEventQ msg count: ' || migrated_q_msg_cnt);
END;
/
AQ ready state msg count: 0
Migrated TxEventQ msg count: 5
As observed, AQ is empty, while TxEventQ still has 5 messages awaiting dequeuing.
Step 6: Can you modify your AQ queue configuration during the migration process?
No, you cannot. Limitations are imposed on AQ while the queue is migrating to prevent inconsistencies between AQ and TxEventQ configurations. Therefore, any attempt by the user to execute DDL operations on AQ during this period will result in an error indicating that the queue is undergoing migration.
SQL> EXECUTE SYS.DBMS_AQADM.ALTER_QUEUE (queue_name => 'json_queue', retry_delay => 5);
BEGIN SYS.DBMS_AQADM.ALTER_QUEUE (queue_name => 'json_queue', retry_delay => 5); END;
*
ERROR at line 1:
ORA-25606: This operation is restricted because migration is ongoing on the
AQ_USER.JSON_QUEUE queue.
SQL> EXECUTE SYS.DBMS_AQADM.STOP_QUEUE (queue_name => 'json_queue');
BEGIN SYS.DBMS_AQADM.STOP_QUEUE (queue_name => 'json_queue'); END;
*
ERROR at line 1:
ORA-25606: This operation is restricted because migration is ongoing on the
AQ_USER.JSON_QUEUE queue.
Step 7: Finalizing the Migration
Once you run COMMIT_MIGRATION
, the migration process wraps up. It removes the AQ and switches the interim TxEventQ back to the original AQ name.
BEGIN
SYS.DBMS_AQMIGTOOL.COMMIT_MIGRATION(cqschema => 'aq_user',
cqname => 'json_queue');
END;
/
In case of unexpected errors during the execution of migration procedures, such as
INIT_MIGRATION
orCOMMIT_MIGRATION
, it's recommended to utilize theRECOVER_MIGRATION
procedure instead of attempting the same procedure again. To revert to the previous setup and cancel the migration, users can use theCANCEL_MIGRATION
procedure.
We can confirm the success of the COMMIT_MIGRATION
execution by verifying the user_queues
view.
SQL> SELECT name || ' -> ' || queue_category FROM user_queues WHERE queue_type = 'NORMAL_QUEUE' ORDER BY name;
NAME||'->'||QUEUE_CATEGORY
--------------------------------------------------------------------------------
JSON_QUEUE -> Transactional Event Queue
Additionally, we must verify if we can dequeue the messages enqueued before executing the COMMIT_MIGRATION
call.
SQL> EXECUTE enqueue_json('json_queue', 'val (after COMMIT) - 21');
PL/SQL procedure successfully completed.
SQL> COMMIT;
Commit complete.
SQL>
SQL> -- dequeue 6 messages
SQL> DECLARE
2 cnt NUMBER := 6;
3 BEGIN
4 FOR indx IN 1 .. cnt LOOP
5 dequeue_json('json_queue');
6 END LOOP;
7 END;
8 /
Payload value - val (after INIT) - 16
Payload value - val (after INIT) - 17
Payload value - val (after INIT) - 18
Payload value - val (after INIT) - 19
Payload value - val (after INIT) - 20
Payload value - val (after COMMIT) - 21
PL/SQL procedure successfully completed.
As evident, we can perform enqueue-dequeue operations as expected without any need for application changes or disruptions.
Note: The migration procedures within the
DBMS_AQMIGTOOL
package support rolling upgrades and Oracle GoldenGate replication. Enqueuing and dequeuing requests proceed without interruption as these migration procedures are non-blocking and can be executed during ongoing tasks. Watch the demonstration in the YouTube video below.
The demonstrated migration is just one option (INTERACTIVE
) among four supported modes: AUTOMATIC
, INTERACTIVE
, OFFLINE
, and ONLY_DEFINITION
. Each mode serves different user needs during the transition from AQ to TxEventQ. Further details on each mode can be found here.
Summary
And there you have it! We’ve successfully migrated from AQ to TxEventQ smoothly using two non-blocking procedures.
The online migration tool demonstrated above offers these benefits:
- User-friendly interface for intuitive operation
- Seamlessly executes migration without causing any disruptions or downtime
- Facilitates adoption of TxEventQ without manual intervention or application changes
- Provides flexibility in configuring the migration process
Further Readings
- If you’re looking for more information, you can check out the official migration tool user guide.
- To learn about all the procedures available in the
DBMS_AQMIGTOOL
package, you can read its detailed documentation in the Oracle official documentation for theDBMS_AQMIGTOOL
Package. - YouTube Video Demonstration: Elevating Your Event Queues-The Easy Path from Advanced Queue to Transactional Event Queue Migration