How to deal with mysql binlog using Golang with examples

Artem Zheltak
6 min readAug 31, 2018

--

Hello, my name is Artem, I am working as a golang developer. Our team has spent a lot of time taming mysql binlog. Here is the shortest way to integrate it with no hidden traps left. The example code will be in the end.

Why do we need it?

In our main product there are high-loaded modules, in which every database querie pulls the user away from the result. A piece of advice is to use caching. That sounds good, but when should cache be reset? So let the data tell us when it was updated.

Mysql has a beautiful thing such as master-slave replication. Our daemon can pretend to be a slave and get the data via binlog. Binlog should be set in row format. There will be all database commands, but commands under transaction will be recorded only after being commited. After reaching the memory usage limit (1GB by default), it will make another file. Every new file will have an increment after name.

More infomation here — https://mariadb.com/kb/en/library/binary-log/ or here https://dev.mysql.com/doc/refman/8.0/en/binary-log.html

There will be two parts in this article.

  1. How to make processing of the new data in binlog
  2. How to set it and expand it.

Part 1. Make it run as soon as we can.

We will use the library https://github.com/siddontang/go-mysql/ to work with binlog.

Let’s connect to a new channel (canal is a label in the library). We will need ROW format for binlog https://mariadb.com/kb/en/library/binary-log-formats/.

func binLogListener() {
c, err := getDefaultCanal()
if err == nil {
coords, err := c.GetMasterPos()
if err == nil {
c.SetEventHandler(&binlogHandler{})
c.RunFrom(coords)
}
}
}
func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306)
cfg.User = "root"
cfg.Password = "root"
cfg.Flavor = "mysql"
cfg.Dump.ExecutionPath = ""

return canal.NewCanal(cfg)
}

Now lets make a wrapper

type binlogHandler struct {
canal.DummyEventHandler // Dummy handler from external lib
BinlogParser // Our custom helper
}
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil}
func (h *binlogHandler) String() string {return "binlogHandler"}

BinlogParser — https://github.com/JackShadow/go-binlog-example/blob/master/src/parser.go

Then we should make it more useful by adding logic to the method OnRow()

func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {var n int //starting value
var k int // step
switch e.Action {
case canal.DeleteAction:
return nil // not covered in example
case canal.UpdateAction:
n = 1
k = 2
case canal.InsertAction:
n = 0
k = 1
}
for i := n; i < len(e.Rows); i += k {
key := e.Table.Schema + "." + e.Table.Name
switch key {
case User{}.SchemaName() + "." + User{}.TableName():
/*
Real data parsing
*/
}
}
return nil
}

The main point of this wrapper is to decompose received data. We’re getting data by two entries on update (the first row contains initial data, the second — updated one). And here we’ll support multi inserts and multi updates. In this case for the UPDATE we have to take every second entry. In INSERT, we take just every row. We’re going to use n and k variables for this purpose.

Let’s make a model to get data from binlog. We will use it to load data from rows. We provide column naming in annotations:

type User struct {
Id int `gorm:"column:id"`
Name string `gorm:"column:name"`
Status string `gorm:"column:status"`
Created time.Time `gorm:"column:created"`
}
func (User) TableName() string {
return "User"
}
func (User) SchemaName() string {
return "Test"
}

MYSQL Table structure:

CREATE TABLE Test.User(
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(40) NULL ,
status ENUM("active","deleted") DEFAULT "active",
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP
)
ENGINE =InnoDB;

It’s time to place the code instead of the placeholder:

user := User{}
h.GetBinLogData(&user, e, i)

And that’s all, we will get data from the new row in user variable. Let’s print it to make it all prettier:

if e.Action == canal.UpdateAction {
oldUser := User{}
h.GetBinLogData(&oldUser, e, i-1)
fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, )
} else {
fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, )
}

Good news everyone, the code is ready to run, “Hello, binlog world”:

func main() {
go binLogListener()
// placeholder for your handsome code
time.Sleep(2 * time.Minute)
fmt.Print("Thx for watching")
}

Add and update users:

INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack");
UPDATE Test.User SET name="Jonh" WHERE id=1;

The result is:

User 1 is created with name Jack
User 1 name changed from Jack to Jonh

This code is working with binlog and parsing new rows. When we get the data from the table we need, the code parses the data into the struct and prints the result. I didn’t cover data parser(BinlogParser), which hides model hydration logic.

Part 2. As Cobb mentioned, we need to go deeper.

Hidden part of parser is based on reflection.We have hydrated model with data by using the method:

h.GetBinLogData(&user, e, i)

It works with simple data types:

bool
int
float64
string
time.Time

and it can parse the struct from json.

If you need more types, or you just want to understand how binlog parsing work, the best way is to extend parsing types on your own.

Let’s take as an example, the `int` type:

type User struct {
Id int `gorm:"column:id"`
}

We will get the type name via reflection. The method parseTagSetting just makes annotations easier to use:

element := User{} //In common cases we have interface, but here we will start with model
v := reflect.ValueOf(element)
s := reflect.Indirect(v)
t := s.Type()
num := t.NumField()
parsedTag := parseTagSetting(t.Field(k).Tag)if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" {
continue
}
for k := 0; k < num; k++ {
name := s.Field(k).Type().Name()
switch name {
case "int":
// here we deal with an incoming row
}
}

We got the name of the type and knowing that we can set it’s value via reflection:

func (v Value) SetInt(x int64) {//...

Parsing annotation helper (borrowed from the gorm lib):

func parseTagSetting(tags reflect.StructTag) map[string]string {
setting := map[string]string{}
for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} {
tags := strings.Split(str, ";")
for _, value := range tags {
v := strings.Split(value, ":")
k := strings.TrimSpace(strings.ToUpper(v[0]))
if len(v) >= 2 {
setting[k] = strings.Join(v[1:], ":")
} else {
setting[k] = k
}
}
}
return setting
}

It has an int64 type in params. Let’s create the method which will transform the data from the row into int64:

func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 {
columnId := m.getBinlogIdByName(e, columnName)
if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER {
return 0
}
switch e.Rows[n][columnId].(type) {
case int8:
return int64(e.Rows[n][columnId].(int8))
case int32:
return int64(e.Rows[n][columnId].(int32))
case int64:
return e.Rows[n][columnId].(int64)
case int:
return int64(e.Rows[n][columnId].(int))
case uint8:
return int64(e.Rows[n][columnId].(uint8))
case uint16:
return int64(e.Rows[n][columnId].(uint16))
case uint32:
return int64(e.Rows[n][columnId].(uint32))
case uint64:
return int64(e.Rows[n][columnId].(uint64))
case uint:
return int64(e.Rows[n][columnId].(uint))
}
return 0
}

Everything here looks logical except the getBinlogIdByName() method.

This trivial helper is needed to work with column name instead of it’s id. And this leads to the following opportunities:

  • to take column name from gorm annotation;
  • there will be no need to make correction on adding columns in the beginning or in the middle;
  • it’s more convenient to work with the field name than with column number 3.

Finally, we will add the handler:

s.Field(k).SetInt(m.intHelper(e, n, columnName))

Two more examples to go

ENUM: here we got value as an index — so the status “active” will be 0. In common cases, we need the string representation of enum, not id. We will get it from the field description. Important hint, 1 in the value is zero index fields in description values. The array of values starts from 0.

Enum parser might look like this:

func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string {columnId := m.getBinlogIdByName(e, columnName)if e.Table.Columns[columnId].Type == schema.TYPE_ENUM {
values := e.Table.Columns[columnId].EnumValues //fields value
if len(values) == 0 || e.Rows[n][columnId] == nil {
return ""
}
return values[e.Rows[n][columnId].(int64)-1] // first id in result is zero one in values}

I want to store Json

It’s a good idea, why not? Json is a string on mysql engine’s side. We should point to our parser that data is serialised. To achieve it, we will add custom gorm annotation “fromJson”

Just an example of the different data we might need:

type JsonData struct {
Int int `gorm:"column:int"`
StructData TestData `gorm:"column:struct_data;fromJson"`
MapData map[string]string `gorm:"column:map_data;fromJson"`
SliceData []int `gorm:"column:slice_data;fromJson"`
}
type TestData struct {
Test string `json:"test"`
Int int `json:"int"`
}

We could make a lot of conditions and it might work. But adding new field would ruin it. Looking for answers on stackoverflow will lead to “How can we unmarshal from json unknown struct?” “No idea why you need it but try …”

By converting struct to interface we will succeed:

if _, ok := parsedTag["FROMJSON"]; ok {
newObject := reflect.New(s.Field(k).Type()).Interface()
json := m.stringHelper(e, n, columnName)
jsoniter.Unmarshal([]byte(json), &newObject)
s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type()))
}

If you have question, correction or recommendation you are welcome to comments. Also some clarification can be found in the tests https://github.com/JackShadow/go-binlog-example/blob/master/src/parser_test.go .

--

--