CodeX
Published in

CodeX

Amazon Aurora’s Read/Write Capability Enhancement with Apache ShardingSphere-Proxy

Originally posted on the Amazon AWS official blog

1. Introduction

Amazon Aurora is a relational database management system (RDBMS) developed by AWS(Amazon Web Services). Aurora gives you the performance and availability of commercial-grade databases with full MySQL and PostgreSQL compatibility. In terms of performance, Aurora MySQL and Aurora PostgreSQL have shown an increase in throughput of up to 5X over stock MySQL and 3X over stock PostgreSQL respectively on similar hardware.

In terms of scalability, Aurora achieves enhancements and innovations in storage and computing, both for horizontal and vertical scaling. Aurora supports up to 128TB of storage capacity and supports dynamic scaling of storage layer in increments of 10GB. In terms of computing, Aurora supports scalable configurations for multiple read replicas. Each region can have an additional 15 Aurora replicas. Aurora provides multi-primary architecture to support four read/write nodes. Its Serverless architecture allows vertical scaling and reduces typical latency to under a second, while the Global Database enables a single database cluster to span multiple AWS Regions in low latency.

Aurora already provides great scalability with the growth of user data volume. Can it handle more data volume and support more concurrent access? You may consider using sharding to support the configuration of multiple underlying Aurora clusters. To this end, a series of blogs, including this one, will provide you with a reference in choosing between Proxy and JDBC for sharding.

This post will focus on how to use ShardingSphere-Proxy, an open source sharding middleware, to build database clusters, covering aspects as sharding, read/write splitting, and dynamic configuration.

2. Introducing ShardingSphere-Proxy

Apache ShardingSphere is an ecosystem of open source distributed database solutions, including JDBC, Proxy and Sidecar (under planning) products that can be deployed either independently or in a mixed way. ShardingSphere-Proxy is positioned as a transparent database proxy. It adpots Apache2.0 protocol, and its latest version 5.1.0, supports MySQL and PostgreSQL. It is transparent to applications and compatible with MySQL/PostgreSQL clients. MySQL command line mysql and MySQL workbench can both directly access ShardingSphere-Proxy.

The ShardingSphere-Proxy layer can connect to different databases, which can be isomorphic or heterogeneous. There are two ways for users to define the sharding or read/writing splitting rules for the underlying database: 1) using YAML files for static configuration; 2) use the enhanced DistSQL language provided by ShardingSphere. Because DistSQL supports dynamic rule creation without restarting the Proxy itself, it has been the focus of the future development ShardingSphere-Proxy.

Whether a database proxy can provide a connection pool to enhance the processing of concurrent visits by users is a factor to be considered. When ShardingSphere-Proxy adds a data source and initializes, it will configure a Hikari connection pool for each database. Hikari is a widely used connection pool with low performance loss and is adopted by SpringBoot as the default connection pool. ShardingSphere-Proxy’s connection pool allows users to configure the maximum number of connections, maximum idle time, and cache-related information. In addition to the Hikari connection pool, ShardingSphere-Proxy also supports the configuration of other connection pools.

Syntax compatibility with existing SQL is also a key factor when users evaluate a database proxy, as it involves the change of application code. Take MySQL as an example. ShardingSphere supports most MySQL syntax, but there are a few exceptions, such as the Optimize table, resource group management, user creation, and GRANT permission management. For details, please refer to ShardingSphere's latest documentation.

Next I will share my experimental tests on ShardingSphere-Proxy’s connection to Aurora from several aspects: 1) sharding 2) dynamic expansion 3) read/write splitting 4) multiple tables join 5) failover recovery.

3. Setting up the environment

3.1 Creating an Aurora DB cluster

First, create three sets of Aurora MySQL clusters according to User Guide for Aurora Cluster Creation. The model is db.r5.2xlarge. Each set of clusters has one write node and one read node.

3.2 Building ShardingSphere-Proxy

Start an EC2 node under the same availability zone as Aurora, with the model being r5.8xlarge. Then install ShardingSphere-Proxy on it.

3.2.1 Downloading installation package

Download the binary installation package and decompress it. Download the latest version 5.1.0, which can better support DistSQL.

wget https://dlcdn.apache.org/shardingsphere/5.1.0/apache-shardingsphere-5.1.0-shardingsphere-proxy-bin.tar.gztar -xvf apache-shardingsphere-5.1.0-shardingsphere-proxy-bin.tar.gz

SharingSphereProxy comes with a library that contains a JDBC driver for PostgreSQL, but doesn’t have the driver for MySQL. Since the cluster is created by MySQL, the MySQL JDBC driver is needed to be copied under the lib directory.

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jarcp mysql-connector-java-5.1.47.jar apache-shardingsphere-5.1.0-shardingsphere-proxy-bin/lib/

3.2.2 Configuring the server client of the Proxy

In the root directory of ShardingSphere-Proxy, there is a profile directory conf, under which there is a file server.yaml. This file is used for information configuration and metadata storage for ShardingSphere-Proxy itself to provide services as a proxy. The following is a configuration example, which configures user authority, specific properties, and meta data to be stored in the zookeeper in cluster mode.

rules:  - !AUTHORITY    users:  //the user name and password for accessing the Proxy      - root@%:root      - sharding@:sharding    provider:  //Controls the user's login permissions to the schema.    type: ALL_PRIVILEGES_PERMITTED  - !TRANSACTION  //Transaction type configuration: support local transactions, XA two-phase transactions, and BASE flexible transactions    defaultType: XA    providerType: Atomikos
props: //Specific property configuration max-connections-size-per-query: 1 proxy-hint-enabled: true //The default value is falsemode: //ShardingSphere-Proxy supports three modes for storing meta information: memory, single machine, and cluster type: Cluster repository: type: ZooKeeper //both zookeeper and etcd are ok props: namespace: shardingproxy server-lists: localhost:2181 retryIntervalMilliseconds: 500 timeToLiveSeconds: 60 maxRetries: 3 operationTimeoutMilliseconds: 500 overwrite: false

3.3 Starting Proxy

Start and stop scripts directly under bin in the root directory of ShardingSphere-Proxy. Runtime logs are under the directory logs. Start the Proxy:

bin/start.shbin/stop.sh

3.4 Verifying connections

If there is no special configuration, ShardingSphere-Proxy uses port 3307 by default. Login to Proxy using the user name and password configured in 3.2.2. Run MySQL command line tool on EC2 to connect. Note that there is no database here because we did not use the YAML configuration file to pre-configure the data source.

[ec2-user@ip-111-22-3-123 bin]$ mysql -h 127.0.0.1 -uroot --port 3307 -prootWelcome to the MariaDB monitor.  Commands end with ; or \g.Your MySQL connection id is 1Server version: 5.7.22-ShardingSphere-Proxy 5.1.0Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.MySQL [(none)]> show databases;Empty set (0.01 sec)

4.Testing functions

4.1 Using DistSQL to define sharding rule and conduct data sharding tests

This section verifies the basic sharding capability of ShardingSphere. ShardingSphere-Proxy supports two methods to create sharding rules and read/write splitting rules: YAML and DistSQL. DistSQL extends the SQL syntax and supports creating data sources online, and creating and modifying table creation rules, which is more flexible. This article will only talk about the case of DistSQL.

4.1.1 Creating a database

Connect to ShardingSphere-Proxy to create a database as a logical distributed database.

MySQL [(none)]> create database distsql_sharding_db;Query OK, 0 rows affected (0.90 sec)

Create databases on each Aurora cluster, which are connected as data sources. Specificallyrshard1, rshard2, and rshard3 are the aliases I defined to connect to the Aurora database.

alias rshard1=’mysql -h $dbname -u$username -p$password’[ec2-user@ ip-111-22-3-123 bin]$ rshard1 -e "create database dist_ds";[ec2-user@ ip-111-22-3-123 bin]$ rshard2 -e "create database dist_ds;"[ec2-user@ ip-111-22-3-123 bin]$ rshard3 -e "create database dist_ds;"

4.1.2 Creating a data source

Run the following DistSQL statement in ShadingSphere-Proxy to create 3 data sources corresponding to 3 different Aurora clusters.

MySQL [distsql_sharding_db]> add resource ds_0(url="jdbc:mysql://aurora-2-07-7-shard1.cluster-12345678.us-east-1.rds.amazonaws.com:3306/dist_ds?serverTimezone=UTC&useSSL=false",user=admin,password=12345678);Query OK, 0 rows affected (0.03 sec)MySQL [distsql_sharding_db]> add resource ds_1(url="jdbc:mysql://aurora-2-07-7-shard2.cluster-12345678.us-east-1.rds.amazonaws.com:3306/dist_ds?serverTimezone=UTC&useSSL=false",user=admin,password=12345678);Query OK, 0 rows affected (0.06 sec)MySQL [distsql_sharding_db]> add resource ds_2(url="jdbc:mysql://aurora-2-07-7-shard3.cluster-12345678.us-east-1.rds.amazonaws.com:3306/dist_ds?serverTimezone=UTC&useSSL=false",user=admin,password=12345678);Query OK, 0 rows affected (0.05 sec)

4.1.3 Creating a sharding rule

Here I’ll specify the sharding rule for the t_order table. Please note that the table names used by the sharding rule should be consistent with names of the tables to be created later. The specific rules are: execute hash-based sharding to the table for the underlying three data sources (Aurora cluster) according to order_id and partition the table into six shards. In addition, AUTO value generation strategy is used for order_id, using the snowflake algorithm. ShardingSphere supports two distributed primary key generation strategies: UUID and SNOWFLAKE. The primary key generated by the snowflake algorithm, in binary form, contains four parts, from high to low table: 1bit sign bit, 41bit timestamp bit, 10bit worker process bit and 12bit serial number bit. Run the following DistSQL statement in ShardingSphere-Proxy to establish sharding rules:

MySQL [distsql_sharding_db]> CREATE SHARDING TABLE RULE t_order(→ RESOURCES(ds_0,ds_1, ds_2),→ SHARDING_COLUMN=order_id,→ TYPE(NAME=hash_mod,PROPERTIES("sharding-count"=6)),→ KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME=snowflake))→ );Query OK, 0 rows affected (0.02 sec)

4.1.4 Creating a table

The table creation statements are consistent with the common MySQL statements for table creation. Run the following statements in the ShardingSphereProxy to create tables:

MySQL [distsql_sharding_db]> CREATE TABLE `t_order` ( `order_id` bigint NOT NULL, `user_id` int NOT NULL, `status` varchar(45) DEFAULT NULL, PRIMARY KEY (`order_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4-> ;Query OK, 0 rows affected (0.22 sec)

Check the status of the tables in the ShardingSphereProxy.

MySQL [distsql_sharding_db]> show tables;+-------------------------------+------------+| Tables_in_distsql_sharding_db | Table_type |+-------------------------------+------------+| t_order                       | BASE TABLE |+-------------------------------+------------+1 row in set (0.00 sec)

Connect to each of the three Aurora clusters to see if the table is created automatically. You can see that two tables are created on each of the underlying database clusters for a total of six tables, which are sorted by the table names in the t_oder_ numbers.

[ec2-user@ ip-111-22-3-123 bin]$ rshard1 -Ddist_ds -e "show tables;"+-------------------+| Tables_in_dist_ds |+-------------------+| t_order_0         || t_order_3         |+-------------------+[ec2-user@ ip-111-22-3-123 bin ]$ rshard2 -Ddist_ds -e "show tables;"+-------------------+| Tables_in_dist_ds |+-------------------+| t_order_1         || t_order_4         |+-------------------+[ec2-user@ ip-111-22-3-123 bin]$ rshard3 -Ddist_ds -e "show tables;"+-------------------+| Tables_in_dist_ds |+-------------------+| t_order_2         || t_order_5         |+-------------------+

4.1.5 Inserting and finding data

Insert and search data in ShardingSphere-Proxy, and the data can be inserted and searched successfully. Run the following statement in ShardingSphere-Proxy:

MySQL [distsql_sharding_db]> insert into t_order(user_id, status) values (1, 'ok');insert into t_order(user_id, status) values (2, 'abc');Query OK, 1 row affected (0.01 sec)
MySQL [distsql_sharding_db]> insert into t_order(user_id, status) values (2, 'abc');insert into t_order(user_id, status) values (3, 'abc');Query OK, 1 row affected (0.00 sec)
MySQL [distsql_sharding_db]> insert into t_order(user_id, status) values (3, 'abc');insert into t_order(user_id, status) values (4, 'abc');Query OK, 1 row affected (0.01 sec)
MySQL [distsql_sharding_db]> insert into t_order(user_id, status) values (4, 'abc');insert into t_order(user_id, status) values (5, 'abc');Query OK, 1 row affected (0.00 sec)
MySQL [distsql_sharding_db]> insert into t_order(user_id, status) values (5, 'abc');insert into t_order(user_id, status) values (6, 'abc');Query OK, 1 row affected (0.01 sec)
MySQL [distsql_sharding_db]> insert into t_order(user_id, status) values (6, 'abc');insert into t_order(user_id, status) values (7, 'abc');Query OK, 1 row affected (0.00 sec)
MySQL [distsql_sharding_db]> insert into t_order(user_id, status) values (7, 'abc');insert into t_order(user_id, status) values (8, 'abc');Query OK, 1 row affected (0.01 sec)
MySQL [distsql_sharding_db]> insert into t_order(user_id, status) values (8, 'abc');Query OK, 1 row affected (0.00 sec)
MySQL [distsql_sharding_db]> insert into t_order(user_id, status) values (9, 'abc');Query OK, 1 row affected (0.00 sec)
MySQL [distsql_sharding_db]> select * from t_order;+--------------------+---------+--------+| order_id | user_id | status |+--------------------+---------+--------+| 708700161915748353 | 2 | abc || 708700161995440128 | 5 | abc || 708700169725542400 | 9 | abc || 708700161877999616 | 1 | ok || 708700161936719872 | 3 | abc || 708700162041577472 | 7 | abc || 708700161970274305 | 4 | abc || 708700162016411649 | 6 | abc || 708700162058354689 | 8 | abc |+--------------------+---------+--------+9 rows in set (0.01 sec)

Find the data inserted into the sub-table in each Aurora cluster. You can see that the 9 records inserted in the Proxy are split into the underlying 6 tables. Because order_id is generated by the snowflake algorithm and the data volume is relatively small, the data here is uneven.

[ec2-user@ip-111-22-3-123 bin]$ rshard1 -Ddist_ds -e "select * from t_order_0;"[ec2-user@ip-111-22-3-123 bin]$ rshard1 -Ddist_ds -e "select * from t_order_3;"+--------------------+---------+--------+| order_id           | user_id | status |+--------------------+---------+--------+| 708700161915748353 |       2 | abc    |+--------------------+---------+--------+[ec2-user@ip-111-22-3-123 bin]$ rshard2 -Ddist_ds -e "select * from t_order_1;"[ec2-user@ip-111-22-3-123 bin]$ rshard2 -Ddist_ds -e "select * from t_order_4;"+--------------------+---------+--------+| order_id           | user_id | status |+--------------------+---------+--------+| 708700161995440128 |       5 | abc    || 708700169725542400 |       9 | abc    |+--------------------+---------+--------+[ec2-user@111-22-3-123 bin]$ rshard3 -Ddist_ds -e "select * from t_order_2;"+--------------------+---------+--------+| order_id           | user_id | status |+--------------------+---------+--------+| 708700161877999616 |       1 | ok     || 708700161936719872 |       3 | abc    || 708700162041577472 |       7 | abc    |+--------------------+---------+--------+[ec2-user@ip-111-22-3-123 bin]$ rshard3 -Ddist_ds -e "select * from t_order_5;"+--------------------+---------+--------+| order_id           | user_id | status |+--------------------+---------+--------+| 708700161970274305 |       4 | abc    || 708700162016411649 |       6 | abc    || 708700162058354689 |       8 | abc    |+--------------------+---------+--------+</code></pre></div>

The above experiments prove that ShardingSphere-Proxy has the ability to create a logical library, connect data sources, create sharding rules, automatically create sub-tables on the underlying database when creating logical tables, and perform query distribution and aggregation.

4.2 Dynamic scaling verification (online scaling and sharding)

In this section we’ll verify whether the ShardingSphere-Proxy can dynamically change the sharding rule.

ShardingSphere-Proxy supports online changes to sharding rules. However, if a sub-table has been successfully created according to the previous rules, no new sub-table will be created as the number of shards increases, and no original sub-table will be deleted as the number of shards decreases. So you need to manually create table names and migrate data on the underlying sharding database.

By increasing the number of shards for the table in Section 4.1 from 6 to 9, sharding rules can be changed successfully, but errors will be reported in case of the later searches as no new subtable is created yet. Run the following DistSQL on the ShardingSphere-Proxy:

MySQL [distsql_sharding_db]> alter SHARDING TABLE RULE t_order(    -> RESOURCES(ds_0,ds_1, ds_2),    -> SHARDING_COLUMN=order_id,    -> TYPE(NAME=hash_mod,PROPERTIES("sharding-count"=9)),    -> KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME=snowflake))    -> );Query OK, 0 rows affected (0.01 sec)
MySQL [distsql_sharding_db]> select * from t_order;ERROR 1146 (42S02): Table 'dist_ds.t_order_6' doesn't exist

If the corresponding sub-tables are created on the sub-clusters at this time, there will be no error when searching on ShardingSphere-Proxy. Connect to 3 Aurora clusters and manually create sub-tables.

[ec2-user@ip-111-22-3-123 bin]$ rshard1 -Ddist_ds -e "create table t_order_6(order_id bigint not null, user_id int not null, status varchar(45) default null, primary key(order_id)) engine=innodb default charset=utf8mb4; "[ec2-user@ ip-111-22-3-123 bin]$ rshard2 -Ddist_ds -e "create table t_order_7(order_id bigint not null, user_id int not null, status varchar(45) default null, primary key(order_id)) engine=innodb default charset=utf8mb4; "[ec2-user@ip-111-22-3-123 bin]$ rshard3 -Ddist_ds -e "create table t_order_8(order_id bigint not null, user_id int not null, status varchar(45) default null, primary key(order_id)) engine=innodb default charset=utf8mb4; "

Proxy will no longer report errors when searching the whole logic table. Run the following SQL on the ShardingSphere-Proxy:

MySQL [distsql_sharding_db]> select * from t_order;+--------------------+---------+--------+| order_id           | user_id | status |+--------------------+---------+--------+| 708700161915748353 |       2 | abc    || 708700161995440128 |       5 | abc    || 708700169725542400 |       9 | abc    || 708700161877999616 |       1 | ok     || 708700161936719872 |       3 | abc    || 708700162041577472 |       7 | abc    || 708700161970274305 |       4 | abc    || 708700162016411649 |       6 | abc    || 708700162058354689 |       8 | abc    |+--------------------+---------+--------+9 rows in set (0.01 sec)

If new data is inserted, the sub-tables are mapped according to the new sharding rules. View the query plan of SQL statements on ShardingSphere-Proxy:

MySQL [distsql_sharding_db]> preview insert into t_order values(7, 100, 'new');+------------------+---------------------------------------------+| data_source_name | sql                                         |+------------------+---------------------------------------------+| ds_1             | insert into t_order_7 values(7, 100, 'new') |+------------------+---------------------------------------------+1 row in set (0.00 sec)
MySQL [distsql_sharding_db]> insert into t_order values(7, 100, 'new');Query OK, 1 row affected (0.00 sec)

Login to the Aurora sub-cluster to view the sub-tables and you can see that the data has been successfully inserted.

[ec2-user@ip-111-22-3-123 bin]$ rshard2 -Ddist_ds -e "select * from t_order_7;"+----------+---------+--------+| order_id | user_id | status |+----------+---------+--------+|        7 |     100 | new    |+----------+---------+--------+

Let’s look at reducing the number of shards online. If the number of shards is cut, for example, to 3, the existing data in the table will not be migrated, and only part of the data will be retrieved when the whole table is searched. Run the following DistSQL and SQL statements on ShardingSphere-Proxy:

MySQL [distsql_sharding_db]> alter SHARDING TABLE RULE t_order(    ->     RESOURCES(ds_0,ds_1, ds_2),    ->     SHARDING_COLUMN=order_id,    ->     TYPE(NAME=hash_mod,PROPERTIES("sharding-count"=3)),    ->    KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME=snowflake))    ->     );Query OK, 0 rows affected (0.02 sec)MySQL [distsql_sharding_db]> select * from t_order;+--------------------+---------+--------+| order_id           | user_id | status |+--------------------+---------+--------+| 708700161877999616 |       1 | ok     || 708700161936719872 |       3 | abc    || 708700162041577472 |       7 | abc    |+--------------------+---------+--------+3 rows in set (0.00 sec)

After the above verification, our conclusion is that ShardingSphereProxy’s sharding rules can be changed online, but the creation of sub-tables and the redistribution of data needs to be done manually.

4.3 Testing binding tables and broadcast tables

This section verifies ShardingSphere-Proxy’s support for the join of multiple tables. Although operations in OLTP databases are generally simple, there are also cases where multiple tables join can be involved. Optimization of ShardingSphere-Proxy for multiple tables join supports binding tables and broadcast tables. If two tables are binding tables and shard keys are used to join, two tables can be joined. Broadcast tables allow fast join of large and small tables by copying small tables to each node.

4.3.1 Binding tables

ShardingSphereProxy can bind two tables through the CREATE SHARDING BINDING TABLE RULES in DistSQL. Here, the t_order table mentioned in Section 4.1 and a newly created table, t_order_item, are used as examples to illustrate.

Connect to ShardingSphere-Proxy and run the following DistSQL and SQL statements.

MySQL [distsql_sharding_db]> CREATE SHARDING TABLE RULE t_order_item(    ->  RESOURCES(ds_0,ds_1, ds_2),    ->  SHARDING_COLUMN=order_id,    -> TYPE(NAME=hash_mod,PROPERTIES("sharding-count"=6)));Query OK, 0 rows affected (0.04 sec)
MySQL [distsql_sharding_db]> CREATE TABLE `t_order_item` ( `order_id` bigint NOT NULL, `item_id` int NOT NULL, `name` varchar(45) DEFAULT NULL, PRIMARY KEY (`item_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ;Query OK, 0 rows affected (0.08 sec)

After creating the binding rule, we can look at the join plan and will find that the join is pushed down to the corresponding sub-table and the join of the sub-table. Run the following statements on ShardingSphere-Proxy:

MySQL [distsql_sharding_db]>  CREATE SHARDING BINDING TABLE RULES (t_order,t_order_item);Query OK, 0 rows affected (0.04 sec)
MySQL [distsql_sharding_db]> preview select * from t_order, t_order_item where t_order.order_id=t_order_item.order_id;+------------------+------------------------------------------------------------------------------------------+| data_source_name | sql |+------------------+------------------------------------------------------------------------------------------+| ds_0 | select * from t_order_0, t_order_item_0 where t_order_0.order_id=t_order_item_0.order_id || ds_0 | select * from t_order_3, t_order_item_3 where t_order_3.order_id=t_order_item_3.order_id || ds_1 | select * from t_order_1, t_order_item_1 where t_order_1.order_id=t_order_item_1.order_id || ds_1 | select * from t_order_4, t_order_item_4 where t_order_4.order_id=t_order_item_4.order_id || ds_2 | select * from t_order_2, t_order_item_2 where t_order_2.order_id=t_order_item_2.order_id || ds_2 | select * from t_order_5, t_order_item_5 where t_order_5.order_id=t_order_item_5.order_id |+------------------+------------------------------------------------------------------------------------------+6 rows in set (0.01 sec)

4.3.2 Broadcast tables

Each broadcast table has a complete backup in each database. You can specify this parameter using CREATE SHARDING BROADCAST TABLE RULES.

MySQL [distsql_sharding_db]> CREATE SHARDING BROADCAST TABLE RULES (t_user);Query OK, 0 rows affected (0.03 sec)
MySQL [distsql_sharding_db]> create table t_user (user_id int, name varchar(100));Query OK, 0 rows affected (0.04 sec)

Login to each shard Aurora cluster to view the created tables. Unlike the sub-table names of the shard table that have a number indicating the order at the end, the name of each library in the broadcast table is the same as that of the logical table itself.

[ec2-user@ip-111-22-3-123 bin]$ rshard1 -D dist_ds -e "show tables like '%user%';"+----------------------------+| Tables_in_dist_ds (%user%) |+----------------------------+| t_user                     |+----------------------------+[ec2-user@ip-111-22-3-123 bin]$ rshard2 -D dist_ds -e "show tables like '%user%';"+----------------------------+| Tables_in_dist_ds (%user%) |+----------------------------+| t_user                     |+----------------------------+[ec2-user@ip-111-22-3-123 bin]$ rshard3 -D dist_ds -e "show tables like '%user%';"+----------------------------+| Tables_in_dist_ds (%user%) |+----------------------------+| t_user                     |+----------------------------+

Run the join of broadcast tables and other tables in ShardingSphere-Proxy and adopt the local join mode.

MySQL [distsql_sharding_db]> preview select * from t_order, t_user where t_order.user_id=t_user.user_id;+------------------+------------------------------------------------------------------------+| data_source_name | sql                                                                    |+------------------+------------------------------------------------------------------------+| ds_0             | select * from t_order_0, t_user where t_order_0.user_id=t_user.user_id || ds_0             | select * from t_order_3, t_user where t_order_3.user_id=t_user.user_id || ds_1             | select * from t_order_1, t_user where t_order_1.user_id=t_user.user_id || ds_1             | select * from t_order_4, t_user where t_order_4.user_id=t_user.user_id || ds_2             | select * from t_order_2, t_user where t_order_2.user_id=t_user.user_id || ds_2             | select * from t_order_5, t_user where t_order_5.user_id=t_user.user_id |+------------------+--------

The above experiment verifies that ShardingSphere-Proxy can support the join of two binding tables, as well as the join of broadcast tables and shard tables. ShardingSphere-Proxy has a federation function which supports the join of two unbound shard tables, but the function is not very mature yet, but deserves attention in the future.

4.4 Read/write splitting verification

In this section we’ll verify ShardingSphere-Proxy’s support for read/write splitting. As business grows, the write and read loads on different database nodes can effectively provide the processing capability for the entire database cluster. Aurora uses a reader/writer endpoint to meet users' requirements to write and read with strong consistency, and a read-only endpoint to meet the requirements to read without strong consistency. Aurora's read and write latency is within single-digit milliseconds, much lower than MySQL's binlog-based logical replication, so there's a lot of loads that can be directed to a read-only endpoint.

The read/write splitting feature provided by ShardingSphere-Proxy further encapsulates Aurora’s reader/writer endpoint and read-only endpoint. Users can connect directly to the Proxy endpoint for automatic read/write splitting. The processing logic of ShardingSphere-Proxy for special cases is as follows: 1) In the same thread and the same database connection, if there is a write operation, the subsequent read operation will be read from the main library; 2) Hint can be used to forcibly send read requests to Write nodes (main libraries). The first Aurora cluster of the three will be used to verify the read/write splitting capability of ShardingSphere-Proxy.

4.4.1 Viewing Aurora cluster’s read/write endpoint and read-only endpoint

The Aurora cluster has two endpoints: a writer endpoint and a reader endpoint.

4.4.2 Creating a database in the Aurora cluster

Connect to Aurora cluster and run the following statements:

[ec2-user@ip-111-22-3-123 ~]$ rdbw -e "create database wr_ds;"

4.4.3 Configuring data source

Create data sources on ShardingSphere-Proxy. The data source for write operations will be directed to Aurora’s read/write endpoint, while the data source for read operations will be directed to Aurora’s read-only endpoint.

Note: for domain names, ShardingSphere-Proxy only supports the creation of data sources through URL, not HOST or Port. Create logical database distsql_rwsplit_db on ShardingSphere-Proxy and add data source to the database:

MySQL [(none)]> create database distsql_rwsplit_db;Query OK, 0 rows affected (0.02 sec)MySQL [(none)]> use distsql_rwsplit_db;Database changedMySQL [distsql_rwsplit_db]> add resource write_ds(url="jdbc:mysql://aurora-2-07-7-shard1.cluster-12345678.us-east-1.rds.amazonaws.com:3306/wr_ds?serverTimezone=UTC&useSSL=false",user=admin,password=12345678), read_ds(url="jdbc:mysql://aurora-2-07-7-shard1.cluster-ro-12345678.us-east-1.rds.amazonaws.com:3306/wr_ds?serverTimezone=UTC&useSSL=false",user=admin,password=12345678);Query OK, 0 rows affected (0.08 sec)

4.4.4 Configuring read/write splitting rule

Create the read/write splitting rules to send write requests to write data sources and read requests to read data sources. Unlike the sharding rule, which requires that a RULE be followed by a table name, the RULE here is followed by the name of the data source, which applies to all tables created in the database. Run the following DistSQL statements on ShardingSphere-Proxy:

MySQL [distsql_ rwsplit_db]> CREATE READWRITE_SPLITTING RULE wr_ds (    -> WRITE_RESOURCE=write_ds,    -> READ_RESOURCES(read_ds),    -> TYPE(NAME=random)    -> );Query OK, 0 rows affected (0.36 sec)

4.4.5 Creating a table

Create a normal table with the same statement as MySQL table creation. Run the following SQL statement on ShardingSphere-Proxy:

MySQL [distsql_ rwsplit_db]> create table wr_table (a int, b int, c varchar(20));Query OK, 0 rows affected (0.17 sec)

4.4.6 Checking whether read/write splitting is implemented

Run the following statements on ShardingSphere-Proxy to view the query plan and see which underlying data source the statement is sent to. You can see that the write request is sent to the write node and the read request is sent to the read/write node.

MySQL [distsql_rwsplit_db]> preview insert into wr_table values(1,1,'ab');+------------------+---------------------------------------+| data_source_name | sql                                   |+------------------+---------------------------------------+| write_ds         | insert into wr_table values(1,1,'ab') |+------------------+---------------------------------------+1 row in set (0.10 sec)MySQL [distsql_rwsplit_db]> preview select * from wr_table;+------------------+------------------------+| data_source_name | sql                    |+------------------+------------------------+| read_ds          | select * from wr_table |+------------------+------------------------+1 row in set (0.02 sec)

Run a script multiple times and verify it in Aurora cluster indicator monitoring. The script is a loop that runs 1000 times, inserting one record at a time and searching for the total number of records in the table.

[ec2-user@ip-111-22-3-123 shardingproxy]$ cat testReadWrite.sh#!/bin/bashn=1while [ $n -le 1000 ]do    mysql -h 127.0.0.1 -uroot --port 3307 -proot -Ddistsql_rwsplit_db -e "insert into wr_table values($n,$n,'ok');"    mysql -h 127.0.0.1 -uroot --port 3307 -proot -Ddistsql_rwsplit_db -e "select count(*) from wr_table;"    let n++done

Checking the read/write latency of write node and read node in the Aurora cluster, you can see that write latency occurs only on write node and read latency occurs only on read node. It indicates that read/write splitting rules take effect.

Although the replication latency between Aurora’s write node and read node is very low within single-digit milliseconds, some applications still have strong consistency requirements that require a read immediately after a write. In this case, you can force read request to be sent to write node. ShardingSphere-Proxy supports it with hint.

Firstly, add a proxy-hint-enabled: true attribute to conf/server.yaml. Then explicitly set the readwrite_splitting hint source value to write in the connection to enable forced routing to the write node. Default rules can be used by setting the value to auto or clear hint. The readwrite_splitting hint source can take effect at the session level.

Run the following statements in turn on ShardingSphere-Proxy. You can see that the default read request is sent to the read node. If the readwrite_splitting hint source is set to write, it will be sent to the write node and then set to auto, which can be sent back to the read/write node.

MySQL [distsql_rwsplit_db]> preview select count(*) from wr_table;+------------------+-------------------------------+| data_source_name | sql                           |+------------------+-------------------------------+| read_ds          | select count(*) from wr_table |+------------------+-------------------------------+1 row in set (0.01 sec)
MySQL [distsql_rwsplit_db]> set readwrite_splitting hint source = write;Query OK, 0 rows affected (0.00 sec)
MySQL [distsql_rwsplit_db]> preview select count(*) from wr_table;+------------------+-------------------------------+| data_source_name | sql |+------------------+-------------------------------+| write_ds | select count(*) from wr_table |+------------------+-------------------------------+1 row in set (0.01 sec)MySQL [distsql_rwsplit_db]> set readwrite_splitting hint source = auto;Query OK, 0 rows affected (0.00 sec)
MySQL [distsql_rwsplit_db]> preview select count(*) from wr_table;+------------------+-------------------------------+| data_source_name | sql |+------------------+-------------------------------+| read_ds | select count(*) from wr_table |+------------------+-------------------------------+1 row in set (0.00 sec)

Another way to change without using a YAML file is to set two variables, proxy_hint_enabled and readwrite_splitting hint source, directly in DistSQL.

MySQL [distsql_rwsplit_db]> set variable proxy_hint_enabled=true;Query OK, 0 rows affected (0.01 sec)MySQL [distsql_rwsplit_db]> set readwrite_splitting hint source = write;Query OK, 0 rows affected (0.01 sec)MySQL [distsql_rwsplit_db]> preview select * from wr_table;+------------------+------------------------+| data_source_name | sql                    |+------------------+------------------------+| write_ds         | select * from wr_table |+------------------+------------------------+1 row in set (0.00 sec)

The above experiments verify that ShardingSphere-Proxy has a good read/write splitting ability. It verifies a scenario where a single Aurora cluster is connected below for read/write splitting. ShardingSphere-Proxy also supports both sharding and read/write splitting. For example, if three Aurora clusters are allocated and each cluster needs to provide read/write splitting, we can directly put the data source name (wr_ds in Section 4.4.4) defined by the read/write splitting rule into the data source (ds_0, ds_1, ds_2 in Section 4.1.3) specified by sharding rules for each table.

4.5 Failover verification

This section verifies ShardingSphere-Proxy’s sensibility to the failover of Aurora clusters. In the event of the active/standby switch of an Aurora cluster, it would be ideal if the Proxy could dynamically detect the switch and connect to the new primary database. Again, this experiment is to verify the first Aurora cluster.

The test script is as follows. It continuously connects to the write node and sends update requests at 1 second intervals.

[ec2-user@ip-111-22-3-123 shardingproxy]$ cat testFailover.sh#!/bin/bashwhile truedo    mysql -h 127.0.0.1 -uroot --port 3307 -proot -Ddistsql_rwsplit_db -e "update wr_table set c='failover' where a = 1;"    now=$(date +"%T")    echo "update done: $now"    sleep 1done

Run the script and click Action->Failover on the write node of the Aurora cluster. It will start an automatic switch between Aurora’s write node and read node. During the switchover, the reader/writer endpoint and read-only endpoint remain the same for the entire cluster, but the underlying mapped nodes will change.

By observing the Event in Aurora, you can see that the failover is completed in about 30 seconds.

Unfortunately, the application directly connected to ShardingSphere-Proxy, which means that the previous run script cannot automatically detect underlying IP changes. The run script keeps throwing errors:

ERROR 1290 (HY000) at line 1: The MySQL server is running with the --read-only option so it cannot execute this statementupdate done: 15:04:04ERROR 1290 (HY000) at line 1: The MySQL server is running with the --read-only option so it cannot execute this statementupdate done: 15:04:05ERROR 1290 (HY000) at line 1: The MySQL server is running with the --read-only option so it cannot execute this statementupdate done: 15:04:06

The same error occurs when you connect to Proxy directly from the MySQL command line.

MySQL [distsql_rwsplit_db]> update wr_table set c="failover" where a =2;ERROR 1290 (HY000): The MySQL server is running with the --read-only option so it cannot execute this statement

The reason is that when Aurora failover occurs, the mapping between the reader/writer endpoint and the IP address changes, and the connection pool of ShardingSphere is not updated to a new IP address when Aurora is connected. We can use the following workaround so that ShardingSphere-Proxy can point to a new write node, namely to recreate the data source. Although the definition of the data source itself has not changed, ShardingSphere-Proxy can run successfully by reconstructing the alter resource operation of the data source and retrieving the mapping from the endpoint to the IP.

MySQL [distsql_rwsplit_db]> alter resource write_ds(url="jdbc:mysql://aurora-2-07-7-shard1.cluster-12345678.us-east-1.rds.amazonaws.com:3306/wr_ds?serverTimezone=UTC&useSSL=false",user=admin,password=12345678), read_ds(url="jdbc:mysql://aurora-2-07-7-shard1.cluster-ro-12345678.us-east-1.rds.amazonaws.com:3306/wr_ds?serverTimezone=UTC&useSSL=false",user=admin,password=12345678);Query OK, 0 rows affected (0.05 sec)
MySQL [distsql_rwsplit_db]> update wr_table set c="failover" where a =2;Query OK, 1 row affected (0.01 sec)

We can detect a failover event each time Aurora fails, or explicitly call the above statement when the application receives a read-only error. In order to reduce the impact on the application, we can use Lambda to automate the operation of resetting the data source with failover. Since the Aurora failover event can be detected, we can write a Lambda function that displays the call to change the resource when the failover is detected.

The general idea is: RDS passes Event notifications to SNS topics via Event Subscription, which in turn passes them to Lambda method, and then explicitly connects ShardingSphere-Proxy to call the DistSQL statement of alter resource in the Lambda method.

The specific steps are as follows:

4.5.1 Create SNS

FollowSNS Creation Guide to create SNS. Open the SNS dashboard, click Create topic and select the Standard type. Others choose to default or adjust as needed.

4.5.2 Create Event Subscription for the Aurora cluster to be executed

On RDS Event Subscriptions, click "Create Event Subscription" and in the tabs that pops up, select the SNS created in the previous step as Target and Cluster as its Source Type. Select the Aurora Cluster in Cluster and select Failover for event.

4.5.3 Creating Lamdba function

Since Lambda calls ShardingProxy deployed on EC2 in the VPC, it should be bound to a special Role that is permitted to execute Lambda in the VPC: AWSLambdaVPCAccessExecutionRole creates a document to create Role and Policy according to IAM Role document, enabling the role of failoverlambda to be permitted by AWSLambdaVPCAccessExecutionRole.

Next, follow Lambda Document to create the Lambda function.

Once you have created the Lambda method, click Trigger and specify SNS and the SNS topic created in Section 4.5.1.

4.5.4 Program Lambda function

import osimport jsonimport pymysql
# connect to ShardingProxy to reset the data sourcedef resetDataSource(): db = pymysql.connect(host='111.22.3.123', user='root', password='root', port=3307, database='distsql_rwsplit_db')
cur = db.cursor() SQL = "alter resource write_ds(url=\"jdbc:mysql://aurora-2-07-7-shard1.cluster-12345678.us-east-1.rds.amazonaws.com:3306/wr_ds?serverTimezone=UTC&useSSL=false\",user=admin,password=12345678), read_ds(url=\"jdbc:mysql://aurora-2-07-7-shard1.cluster-ro-12345678.us-east-1.rds.amazonaws.com:3306/wr_ds?serverTimezone=UTC&useSSL=false\",user=admin,password=12345678);" print (SQL) cur.execute(SQL) result = cur.fetchall() for x in result: print(x) db.close()def lambda_handler(event, context): wholeMessage = event['Records'][0]['Sns']['Message'] print ("whole message" + wholeMessage) wholeMessageInJson = json.loads(wholeMessage) eventMessage = wholeMessageInJson['Event Message'] print ("event message: " + eventMessage) isFailover = eventMessage.startswith('Completed failover to DB instance') if isFailover == True: print ("Failover completed! " + eventMessage) resetDataSource() return { 'statusCode': 200, 'body': Lambda Invocation Successful!' }

Lambda methods are written in Python and accessed in MySQL mode when accessing ShardingSphere-Proxy, so we need to introduce the lib library of pymysql. Specific steps are as follows:

1)Install pymysql on Linux. Taking Amazon-Linux virtual machine as an example, it will be installed to the default directory ./home/ec2-user/.local/lib/python3.7/site-packages/pymysql

2)Copy the pymysql directory to the temporary directory /TMP.

3)Write Lambda methods and store them in the file lambda_function.py.

4)Package zip -r lambda.zip pymysql lambda_function.py

5)Upload it from the console via S3 or locally.

4.5.5 Set the security group of EC2 where ShardingSphere-Proxy is located

Since Lambda needs to access ShardingSphere-Proxy in VPC and ShardingSphere-Proxy runs on port 3307, corresponding secuity group should be configured. Enable port 3307 for internal access to the same VPC. The security group configured according to the security group configuration document are as follows:

4.5.6 Failover testing

Repeat the operations in this section and run the testFailover.sh command. Manually run failover to the Aurora node in the RDS console page, you will find that testFailover.sh maintains stable output with no more read-only errors.

update done: 13:44:44update done: 13:44:47update done: 13:45:00update done: 13:45:01update done: 13:45:17

Check the log of Lambda function in cloudwatch, and you will find that Lambda was successfully invoked.

The above experiments verify the sensitivity of ShardingSphere-Proxy to Aurora cluster failover. Although ShardingSphere-Proxy does not provide good compatibility on its own, ShardingSphere-Proxy can explicitly reset the data source by monitoring Aurora cluster events to trigger the Lamdba method. In this case, the failover capability can be realized by the combination of ShardingSphere-Proxy and Aurora.

5. Conclusion

This post explains Aurora’s sharding and read/write splitting capabilities through database middleware ShardingSphere-Proxy.

ShardingSphere-Proxy has a built-in connection pool and strong support for MySQL syntax, and it excels in sharding and read/write splitting. It supports multiple-table join with the same sharding rules and join of small tables and large tables, basically meeting the requirements of OLTP scenarios. In terms of dynamic sharding, ShardingSphere-Proxy supports online changes to sharding rules, but requires users to manually create sub-tables and migrate data in the underlying Aurora cluster, which requires a certain amount of work. In terms of failover, ShardingSphere-Proxy doesn't work well with Aurora, but the Aurora failover Event provided in this article can be used to explicitly reset the data source by calling the Lambda method. This way, ShardingSphere-Proxy's sensibility to the failover of Aurora cluster can be achieved.

In general, ShardingSphere-Proxy, a middleware product, is a good match for Aurora clusters and further improves the read and write capability of Aurora clusters. Equipped with well-designed documents, ShardingSphere-Proxy is an open source product and is recommended to readers when considering sharding with Aurora. We will continue to update a series of blogs about ShardingSphere-JDBC as well as other middlewares.

Author

Ma Lili

A database solution architect at AWS, with over 10 years of experience in the database industry. Lili has been involved in the R&D of the Hadoop/Hive NoSQL database, enterprise-level database DB2, distributed data warehouse Greenplum/Apache HAWQ and Amazon’s cloud native database.

 by the author.

--

--

--

Everything connected with Tech & Code. Follow to join our 900K+ monthly readers

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Apache ShardingSphere

Apache ShardingSphere

Transform any DBMS in a distributed database system & enhance it with sharding, elastic scaling features & more. https://linktr.ee/ApacheShardingSphere

More from Medium

1st Academic Paper by Apache ShardingSphere Accepted by ICDE, A Top-Level Database Conference

The Stack and the Heap

How we slashed the size of our Postgres-DB in half by inspecting the indices

Using the TiDB Upgrade Toolkit to Guarantee a Safe Database Upgrade