Use ETCD Service Discovery in test infra

Huang Luohua
test-go-where
Published in
5 min readNov 25, 2019

Recently I did a sharing to the team about ETCD service discovery. It also can be used to implement your test infra to hide test server changes from your infra.

The below is the architecture (I also used MySQL, Kafka and ES for other task requirements. You can ignore it)

Think about a test infra requirement:

  • You are doing API automation testing. You not only have to verify the response but also want to check if the *.log in server contain correct logging info. Say, if it fails, response will be something like 5xx error code from server which is generic, you want to confirm it contains something like ‘balabala duplicated. Transaction xxx’ — so you can ensure it is really failing due to that specific error
  • Your servers are deploying in a docker/micro services environment which IPs may keep changed.

In this scenario you can use ETCD to achieve the goal:

  • Firstly, start a very basic TCP server in your every server instance. The TCP server will listen to request which contains finding-text and will return the log entry which contains the finding-text. The logic is very simple, it will accept a find-text, and grep the file and return the log entry to client.
func startFileParseServer(port string, file string) {
ln, err := net.Listen("tcp", ":"+port)
if err != nil {
eChan <- err
}
defer ln.Close()

for {
conn, err := ln.Accept()
if err != nil {
eChan <- err
}
buf := make([]byte, 1024) // Read the incoming connection into the buffer.
_, err = conn.Read(buf)
if err != nil {
log.Println("Error reading:", err.Error())
}

buf = getValidByte([]byte(buf))
log.Println("#" + string(buf) + "#")
readFile, err := os.Open(file)
if err != nil {
log.Fatalf("failed to open file: %s", err)
}

fileScanner := bufio.NewScanner(readFile)
fileScanner.Split(bufio.ScanLines)
var fileTextLines []string
for fileScanner.Scan() {
fileTextLines = append(fileTextLines, fileScanner.Text())
}

_ = readFile.Close()
for _, l := range fileTextLine {
log.Println(l)
if strings.Contains(l, string(buf)) {
io.WriteString(conn, l)
}
}
// io.WriteString(conn, "") conn.Close() } }
// https://studygolang.com/articles/2911
// func getValidByte(src[]byte) []byte{
// var str_buf []byte
// for _, v := range src{
// if v!= 0{
// str_buf = append(str_buf, v)}} return str_buf}
//}
}
}
  • Secondly, make your TCP servers registered to ETCD
package serviceregistryimport (
"context"
"crypto/rand"
"errors"
cli "git.garena.com/shopee/qa/deep/searchtools/redistool/util"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"log"
"math/big"
errUtil "xxx/component/err"
configFactory "xxx/config"
"time"
)
var (
conf = configFactory.Config()
tcpUploadServices = make(map[string]string)
tcpSearchServices = make(map[string]string)
warn = “[WARN]SvcR” cli *clientv3.Client
) // cli *clientv3.Client, keyPrefix string
func init() {
log.Println("Initializing Service Registry...")
var err error
if cli, err = clientv3.New(clientv3.Config{
Endpoints: []string{conf.Etcd.Url}, DialTimeout: 5 * time.Second});
err != nil {
log.Fatal(err)
}
initServices(conf.Etcd.Etcd_tcp_upload_prefix_tcp)
initServices(conf.Etcd.Etcd_tcp_search_prefix_tcp)
}
func initServices(prefix string) {
resp, err := cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
log.Println(err)
}
for _, kv := range resp.Kvs {
switch prefix {
case conf.Etcd.Etcd_tcp_upload_prefix_tcp:
tcpUploadServices[string(kv.Key)] = string(kv.Value)
case conf.Etcd.Etcd_tcp_search_prefix_tcp:
tcpSearchServices[string(kv.Key)] = string(kv.Value)
}
log.Println("*** Available Service:" + string(kv.Key) + string(kv.Value))
}
}
func Watch(prefix string) {
defer func() {
if err := cli.Close(); err != nil {
log.Printf("%s Somehow fail to close then etcd client v3 conn handler.Ignoring...\n", warn)
}
}()
log.Printf("Service Registry is listening for services %s.* updates...", prefix)
rch := cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
switch ev.Type {
case mvccpb.PUT:
switch prefix {
case conf.Etcd.Etcd_tcp_upload_prefix_tcp:
tcpUploadServices[string(ev.Kv.Key)] = string(ev.Kv.Value)
case conf.Etcd.Etcd_tcp_search_prefix_tcp:
tcpSearchServices[string(ev.Kv.Key)] = string(ev.Kv.Value)
}
log.Println("*** Service is PUT: " + string(ev.Kv.Key) + "#" + string(ev.Kv.Value))
case mvccpb.DELETE:
switch prefix {
case conf.Etcd.Etcd_tcp_upload_prefix_tcp:
delete(tcpUploadServices, string(ev.Kv.Key))
case conf.Etcd.Etcd_tcp_search_prefix_tcp:
delete(tcpSearchServices, string(ev.Kv.Key))
}
log.Println("*** Service is DELETED: " + string(ev.Kv.Key) + "#" + string(ev.Kv.Value))
}
}
}
}
func Resolver(prefix string) (string, error) {
if prefix == conf.Etcd.Etcd_tcp_upload_prefix_tcp {
return fetchServer(tcpUploadServices)
}
if prefix == conf.Etcd.Etcd_tcp_search_prefix_tcp {
return fetchServer(tcpSearchServices)
}
return "", errors.New(errUtil.Lookup(errUtil.SERVER_ERR_UNSUPPORTED_OPERATION))
}
func fetchServer(services map[string]string) (string, error) {
// TODO: can implement a real strategy here, say a weighted round robin algorithm
max := big.NewInt(int64(len(services)))
if max.Int64() == int64(0) {
return "", errors.New(errUtil.Lookup(errUtil.SERVER_ERR_SERVER_UNAVAILABLE))
}
i, _ := rand.Int(rand.Reader, max)
r := i.Int64()
var count = int64(0)
var resolve = ""
for k, v := range services {
if count != r {
count = count + 1
continue
} else {
log.Printf("*** Picked up [%s]:[%s] to serve", k, v)
resolve = v
}
}
return resolve, nil
}
  • Thirdly, write a simple Load Balance (LB) which will provide a single endpoint to your test infra for consumption
package mainimport (
"io"
"log"
"net"
configFactory "xxxx/config"
"xxxxx/serviceregistry"
"sync"
"time"
)
var (
conf = configFactory.Config()
warn = "[WARN]LB"
)
func handleConnection(in net.Conn, prefix string) {
dest, err := serviceregistry.Resolver(prefix)
if err != nil {
log.Println(err.Error())
_ = in.Close() // inform client return
}
log.Printf("Connecting to service: %s...\n", dest)
out, err := net.Dial("tcp", dest)
defer func() {
err1 := in.Close()
err2 := out.Close()
if err != nil || err2 != nil {
log.Printf("%s: unable to close tcp for %s. Ignoring...\n", warn, dest)
} else {
log.Printf("Closed tcp for %s.\n", dest)
} // try to recover from it - when there are many requests the io.Copy randomly will crash due to NPE
if r = recover(); r != nil {
log.Printf("%s: while tried to recover from: %s", warn, r)
}
}()
eChan := make(chan error, 2)
cp := func(dst io.Writer, src io.Reader) {
_, err := io.Copy(dst, src)
eChan <- err
}
go cp(out, in)
go cp(in, out)
err = <-eChan
if err != nil && err != io.EOF {
log.Printf("%s: %s\n", warn, err)
return
}
}
func init() {
go serviceregistry.Watch(conf.Etcd.Etcd_tcp_upload_prefix_tcp)
go serviceregistry.Watch(conf.Etcd.Etcd_tcp_search_prefix_tcp) // await service registry initialization
time.Sleep(2 * time.Second)
}
func main() {
messages := make(chan int)
var wg sync.WaitGroup
wg.Add(2)
go func() {
loadBalance(conf.Upload.Protocol, conf.Upload.Host, conf.Upload.Port, conf.Etcd.Etcd_tcp_upload_prefix_tcp)
messages <- 1
}()
go func() {
loadBalance(conf.Tcpsearch.Protocol, conf.Tcpsearch.Host, conf.Tcpsearch.Port, conf.Etcd.Etcd_tcp_search_prefix_tcp)
messages <- 2
}()
wg.Wait()
}
func loadBalance(Protocol string, HOST string, PORT string, prefix string) {
l, err := net.Listen(Protocol, HOST+":"+PORT)
if err !=
nil {
log.Println(err)
return
}
defer func() {
if err := l.Close()
err != nil {
log.Printf("%s: unable to shutdown server. Ignoring…\n", warn)
}
}()
log.Printf("Load Balancer is serving services %s.* updates…", prefix)
for {
c, err := l.Accept()
if err == nil {
log.Println(err)
return
}
go handleConnection(c, prefix)
}
}
  • The last, in your test infra/test code, can just connect to LB to grep logs from remote servers
func GetDeductServerInfoLog(finder string) string { // connect to this 
socket
conn, _ := net.Dial("tcp", < YOUR_LB_ENDPOINT > + ":<PORT>")
conn.Write([]byte(finder))
message, _ := bufio.NewReader(conn).ReadString('\n')
log.Print("GetDeductServerInfoLog: " + message)
defer conn.Close()
return message
}

Originally published at http://luohuahuang.org on November 25, 2019.

--

--