一个事务中并发执行查询带来的问题

impopper-engineering
10 min readMar 12, 2019

--

from https://www.freewebmentor.com/2014/11/write-multiple-queries-using-mysql-transaction.html

1. 问题来源

简单来说就是企图在一个事务中开启多个协程执行并发查询(不要有这样的企图)。

接下来这篇文章将尝试深入探讨产生 busy buffer 的根本原因。

2. 为什么会出现 busy buffer

事务执行的语句的返回的 Rows 如果没有关闭,那么在事务里再次执行其他语句,会返回 busy buffer,这样也对,但有些肤浅,也不够准确。
- 我们所使用的 database/sql 提供的是一个数据库的抽象,具体负责连接数据库、收发包的则是 go-sql-driver/mysql。当在一个事务中执行一条 exec 语句时,客户端通过 mysqlConn 向 db 发送 CommandPacket,具体的执行流程是:tx.ExecContext -> tx.db.execDC-> ctxDriverExec -> mc.Exec(Execer) -> mc.writeCommandPacketStr(mysqlConn)。
- 向 mysqlConn 连接中的 buffer中写数据之前,需要预先申请空间,多出的4个字节为头部。一个 mysqlConn 连接中在同一时间只有一个 buffer 可以用,如果 buffer 被占用,就会返回 ErrBusyBuffer,实际上就是 busy buffer

// 来自:go-sql-driver/mysql@1.4.0/packets.go#L419
func (mc *mysqlConn) writeCommandPacketStr(command byte, arg string) error {
// Reset Packet Sequence
mc.sequence = 0
pktLen := 1 + len(arg)
data := mc.buf.takeBuffer(pktLen + 4)
if data == nil {
// cannot take the buffer. Something must be wrong with the connection
errLog.Print(ErrBusyBuffer)
return errBadConnNoWrite
}
// Add command byte
data[4] = command
// Add arg
copy(data[5:], arg)
// Send CMD packet
return mc.writePacket(data)
}
// 来自:go-sql-driver/mysql@1.4.0/buffer.go#L112
// takeBuffer returns a buffer with the requested size.
// If possible, a slice from the existing buffer is returned.
// Otherwise a bigger buffer is made.
// Only one buffer (total) can be used at a time.(一次仅仅只能有一个 buffer 被使用)
func (b *buffer) takeBuffer(length int) []byte {
// 如果 buffer 有值,就会返回 nil (即不能分配 buffer)
if b.length > 0 {
return nil
}
// test (cheap) general case first
if length <= defaultBufSize || length <= cap(b.buf) {
return b.buf[:length]
}
if length < maxPacketSize {
b.buf = make([]byte, length)
return b.buf
}
return make([]byte, length)
}

那么 buffer 什么时候清空,然后 buffer.length 重新等于 0,可以执行新的查询呢?把 buffer 里面的内容全部读出来就可以了,也就是调用 readPacket()

// 来自:go-sql-driver/mysql@1.4.0/packets.go#L27
// Read packet to buffer ‘data’
func (mc *mysqlConn) readPacket() ([]byte, error) {
var prevData []byte
for {
// read packet header
data, err := mc.buf.readNext(4)
//…校验
// read packet body [pktLen bytes]
data, err = mc.buf.readNext(pktLen)
//…其他判断
prevData = append(prevData, data…)
}
}
// 来自:go-sql-driver/mysql@1.4.0/buffer.go#L94
func (b *buffer) readNext(need int) ([]byte, error) {
if b.length < need {
// refill
if err := b.fill(need); err != nil {
return nil, err
}
}

offset := b.idx
b.idx += need
b.length -= need
return b.buf[offset:b.idx], nil
}

3. 为什么不关闭 Rows会出现 busy buffer

首先,这里的 Rows 是执行 query 返回的结果,当我们执行 rows, err := tx.Query("SELECT 1") 拿到 Rows 时,数据还在 buffer 里面,并没有读出来。
这时如果有其他协程在这个事务里执行语句,要向 db 发包,就会出现 busy buffer,因为在一个事务里,只有一个连接可以用
前面提到,把数据从 buffer 中读出来就可以重新利用这一条 mysqlConn 发包,那么把 Rows 中的数据读出来还会出现这个问题吗?是可以的,加上一句 for rows.Next(){rows1.Scan()}就不会出现 busy buffer 了。

// 来自:go-sql-driver/mysql@1.4.0/rows.go
func (rows *textRows) Next(dest []driver.Value) error {
if mc := rows.mc; mc != nil {
if err := mc.error(); err != nil {
return err
}
// Fetch next row from stream
// 这里会调用 mc.readPacket() 将 buffer 里的数据读出来。
return rows.readRow(dest)
}
return io.EOF
}

当然关闭也是可以的,我们看一下关闭 rows 后 buffer 会怎样。会由 mysqlConn 来负责处理没有读的 buffer,会调用 mc.readUntilEOF() 将 buffer 里的内容全部读出来,直到 EOF。mc.discardResults() 也会调用 mc.readUntilEOF(),而 mc.readUntilEOF() 则会调用 mc.readPacket()

// 来自:go-sql-driver/mysql@1.4.0/rows.go
func (rows *mysqlRows) Close() (err error) {
if f := rows.finish; f != nil {
f()
rows.finish = nil
}
mc := rows.mc
if mc == nil {
return nil
}
if err := mc.error(); err != nil {
return err
}
// Remove unread packets from stream
if !rows.rs.done {
err = mc.readUntilEOF()
}
if err == nil {
if err = mc.discardResults(); err != nil {
return err
}
}
rows.mc = nil
return err
}

当然了,打开的资源是要关闭的,因此只执行 for rows.Next(){rows1.Scan()}是不够的,还要关闭 rows.

4. 为什么不要在一个事务中执行并发的查询?

首先不要用并发的协程去操作同一个连接,实际上上面的分析也说明了,如果你这么做了,go-sql-driver/mysql 很有可能会返回给你一个 busy buffer

// 来自 src/database/sql/driver.go
// Conn is a connection to a database. It is not used concurrently
// by multiple goroutines.
//
// Conn is assumed to be stateful.
type Conn interface {
// Prepare returns a prepared statement, bound to this connection.
Prepare(query string) (Stmt, error)
// Close invalidates and potentially stops any current
// prepared statements and transactions, marking this
// connection as no longer in use.
//
// Because the sql package maintains a free pool of
// connections and only calls Close when there’s a surplus of
// idle connections, it shouldn’t be necessary for drivers to
// do their own connection caching.
Close() error
// Begin starts and returns a new transaction.
//
// Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
Begin() (Tx, error)
}

一个事务只有一个连接,当调用 db.Begin() 开启一个事务时,sql.DB 会从连接池中取出一个连接或者创建一个新的连接分配给这个事务。

5. 可以在一个事务中执行并发的更新吗?

是可以的,因为更新或者插入的操作调用的是 Exec()db返回的 result 在这里就直接从 buffer 中读出来了。

// 来自:go-sql-driver/mysql@1.4.0/connection.go
// Internal function to execute commands
func (mc *mysqlConn) exec(query string) error {
// Send command
if err := mc.writeCommandPacketStr(comQuery, query); err != nil {
return mc.markBadConn(err)
}
// Read Result
resLen, err := mc.readResultSetHeaderPacket()
if err != nil {
return err
}
if resLen > 0 {
// columns
if err := mc.readUntilEOF(); err != nil {
return err
}
// rows
if err := mc.readUntilEOF(); err != nil {
return err
}
}
return mc.discardResults()
}

by kippa

--

--