Real-time replication of data from MySQL

Arun Thulasidharan
Hevo Data Engineering
6 min readJul 23, 2020

We, at Hevo, support real-time, reliable, and loss-less data replication from different sources, comprising of databases(MySQL, Postgres, MongoDB, DynamoDB, etc) and enterprise solutions(salesforce, AppsFlyer, Mixpanel, etc). Mysql is one of the most popular and commonly used databases that we support. Hevo uses MySQL binary logs to replicate data from Mysql in real-time and at scale.

What is Binary Log

Binary logs or binlogs are a set of append-only log files, which contains events that represent changes made to table data(inserts, updates, deletes, etc) and changes related to table/database creations and schema updates. It is used in Mysql for two purposes.

  • Replication: Mysql achieves data replication to slave servers using binlogs. Slaves read the binlogs from the master and then apply the change events to its tables to be in sync with the master.
  • Recovery: During recovery from backups, the changes made to the database after the last backup time are re-applied to the tables using the binary log change events.

Binlogs are the most reliable way to replicate data from Mysql as it is really fast and guarantees zero data loss.

How Hevo does binlog replication

Hevo taps into the Mysql replication stream of client servers using the shyiko library, which reads the binlogs and de-serializes the events into event objects(for eg: UpdateRowsEventData, WriteRowsEventData, DeleteRowsEventData, TableMapEventData, etc). These data events are then converted to internal data objects by Hevo and get published to a Kafka topic to be consumed by consumers, which will eventually sync these events to destination.

Challenges with this solution

  1. Keeping table schemas in sync

A change event (create/update/delete events) in Mysql binlogs does not contain the column name or other column metadata(like column type, column length, primary keys, etc). As a result, Shyiko also delivers the event as an array of values. To convert this array to a valid data object which can be replicated, Hevo needs the schema of the table at the time this update was made.

Hevo solves this problem by persisting and storing the schemas corresponding to binlog positions, whenever the schema of the table gets changed. When the replication first starts, current table schema is fetched from the Mysql server and the schema is then kept in sync by listening to the alter/create statements in the binlog. We use JsqlParser to parse the DDL statements and update the schema accordingly. Since the schema versions are stored corresponding to specific binlog positions, the schema corresponding to a previous position can also be fetched and applied, if we decide to move the replication offset to a previous binlog position.

This approach has the following limitations:

  • We are unable to move the replication offset to a position that was written before the pipeline was created.
  • All the alter/create statements need to be handled to keep the schemas in sync. It’s very difficult to do this exhaustively, as the SQL syntax will keep on changing over a period of time across versions.
  • We are evaluating to see if we can leverage TABLE_MAP_EVENT in the binlogs to get the table schemas corresponding to the update, to mitigate these limitations.

2. Pipeline resumption in the middle of a transaction

TABLE_MAP_EVENT is the event preceding the change event, which contains the metadata about the table which is about to be changed. It contains a table_id and the table metadata corresponding to that table_id. Every change event in the binary logs will have the table_id, to indicate the table schema corresponding to the event and is used by Shyiko to deserialize the change events. Some interesting facts to consider about table_id, which are not well-documented

  • table_id is an in-memory global counter in Mysql and will get reset if the server is restarted.
  • table_ids are not unique for a table, even when it is not restarted and can change when the table schema is altered or even when some columns are renamed. Essentially, it represents a particular table schema and not a table as such.

It is not necessary for every table change event to be preceded by a TABLE_MAP_EVENT. In the case of transactions, a set of change events can be grouped together and can share a single TABLE_MAP_EVENT. The following figure shows the binary log events for a transaction starting at position 645. In this case, there is a single TABLE_MAP_EVENT(at position 719) for the whole transaction.

Shyiko keeps the table map events in an in-memory map when a table map event is encountered in the binary logs and uses that to deserialize the subsequent change events. It is possible that Hevo can stop the poll without reading all the events of a transaction and store the offset corresponding to the last read binlog position. When the next poll is started, we pass the last read offset to Shyiko, which will try to read the events from that position and fail to deserialize the events as it does not have a table map event corresponding to that update.

To solve this, Hevo persists the table map events to a persistent store via a write-through cache, along with binlog positions where they are encountered (to handle the case where table ids are reset, when the server restarts). These table map positions are then passed on-demand to Shyiko deserializer if it fails to find the table map events in its in-memory map.

What about existing data

In the beginning, Hevo always starts replicating data from the last known position in binlogs, which will capture the updates from the time the replication starts.

Now, what about data that is already present on the database?

The existing data on the tables is ingested using historical load tasks(one for each table), which will replicate historical data incrementally using an auto-detected unique column or a column of user’s choice. For example, an auto-incrementing id or a primary key would be an ideal candidate to pull up historical data from the tables. We will write about how we achieve this in detail, at a later stage.

Some interesting numbers

The binlog based solution has been in production for a couple of years and around 50+ customers have created around 600+ binlog replication pipelines on our platform over the years.

Currently, we process close to 60 billion binlog events per month combined across our shared and private environments. Regarding throughput from a single pipeline(which connects to one database server), we replicate close to 1.5M records/minute or 25K records/second from one of our most write-heavy Mysql servers.

What lies ahead

  • As mentioned above, we are evaluating to use TABLE_MAP_EVENT to figure out the schema/metadata(column name, column types, primary keys, etc) of a particular change event, which can help us overcome some of the limitations we have right now and also simplify the process and make it more reliable.
  • We are planning to make some optimizations around historical loads, which will enable it to seamlessly scale across tens of thousands of tables and millions of columns across those tables.

--

--