Inner Pod Communication in Pipedrive Using Streaming Unix Domain Sockets

Marek Martins
Pipedrive R&D Blog
Published in
16 min readApr 18, 2023
Generated by Microsoft Bing Image Creator

Introduction

In Pipedrive, we use Consul for service discovery. We’ve written a wrapper library called Diplomat that provides the necessary interfaces. An issue occurs with libraries when they need updating. There are various reasons for this, running the gamut from minor bugs to when somebody needs to swap out Consul for something else. When there are hundreds of microservices available, it takes time and effort.

One possible way to solve this problem is to move the changeable part from the library to the sidecar. The library contains only a static part of the API communication layer with the sidecar. The sidecar is just another container in a Kubernetes pod.

At Pipedrive, we’ve also built a sidecar called Spock. You can configure the Diplomat library by either calling Consul directly or using the Spock sidecar for service discovery. In the case of Spock, it takes care of calls to Consul.

We had a latency problem with the Spock sidecar in services with heavier traffic. Replacing the current implementation with a custom one helped reduce latency by more than 200% — but more on that later.

In the next paragraph, I’ll quickly describe all the terms so it’s easier to follow the upcoming chapters.

Glossary

  • Diplomat — A library in Pipedrive for making service discovery directly within Consul or through the Spock sidecar
  • Container — A building block of the Kubernetes pod and unit of software that packages code and its dependencies, usually running as one process in the pod
  • Consul — A free, open-source service networking platform developed by HashiCorp
  • gRPC — An open-source remote procedure call framework
  • MTU — Maximum transmission unit, the largest packet or frame size that can be sent in a packet or frame-based network
  • Pod — A collection of one or more containers and the smallest unit of a Kubernetes application
  • Sidecar — A separate container that runs alongside an application container in a Kubernetes pod
  • Spock — A Kubernetes sidecar in Pipedrive that’s responsible for making service discovery calls to different backends, for example, Consul
  • TCP — Transmission control protocol is a standard that defines how we establish and maintain network conversation by which applications can exchange data (considered reliable)
  • UDP — User datagram protocol is a communication protocol used across the internet for especially time-sensitive transmissions (considered unreliable)
  • UDS — Unix domain socket aka IPC (Inter-process communication) socket is a data communications endpoint for exchanging data between processes (considered reliable)

The problem

Our end goal was to migrate all services to Spock for service discovery. In the beginning, this went fine. We started with smaller, less critical services. Once we started to migrate bigger services with a bigger load, we began to see issues. Bigger services latency increased considerably, as well as CPU time used. It was bad enough that we reverted the change and didn’t migrate bigger services to Spock.

You can read more about how we choked our Kubernetes NodeJS services here. This article explains how we run our services in Kubernetes, manage their resources, how to make the most of them and what to look out for.

To understand the root cause, we measured latency on the Spock side, (how fast Spock itself processes requests internally) and it looked fine — for cached responses, it took mostly under 100 microseconds. So, we worked out that it must be something on the Diplomat, communication or networking side. Communication between Diplomat and Spock used the gRPC protocol, which is known to be low-latency and fast.

We tried to tune all possible knobs on the gRPC client library. Nothing helped.

Sidenote: most services in Pipedrive are written in NodeJS. Spock is written in Golang. All the big services we had issues with are also written in NodeJS.

In addition, we measured sidecar communication latency with the services written in NodeJS using Diplomat and Spock. This was roughly 600 microseconds in ideal conditions. For services making a lot of service discovery calls, this is too much. The CPU load was also noticeable. This latency was lowest when the Kubernetes cluster was under a light load. When Kubernetes nodes were under a heavy load, latency increased to 1500 microseconds or more.

We also checked the latency of clients written in Golang and BOOM. Here, latency was around 200 microseconds with a negligible CPU overhead. With a high Kubernetes load, it went up to 800 microseconds. Implementing details in the NodeJS gRPC library seemed heavy enough for our use case for services with high requests throughput.

The solution

We tried to change library behavior with all possible knobs, but nothing helped. Therefore, we decided to drop gRPC altogether and test our own simple protocol on top of unix domain sockets using only the standard library for NodeJS runtime.

There are multiple unix domain socket types:

  1. Streaming sockets, which operate like TCP
  2. Datagram sockets, which work like UDP sockets
  3. Sequence packet sockets, which combine elements of both

It was a pretty easy choice to make: we chose streaming sockets. Firstly, because it’s the only one that NodeJS supports out of the box via the standard library; Golang supports all three. Another more important reason is that streaming sockets operate like TCP — meaning it’s very easy to swap it out using TCP/IP instead of UDS (unix domain sockets) if needed. For example, if one day, we decide to move the Spock sidecar to a separate service, we can do it with very few code changes:

  1. On the Diplomat side, when creating a network connection, instead of specifying a path socket file for where to connect, we specify a port and optional host:
    net.createConnection({ path: sockPath }); -> net.createConnection({ host: 1.2.3.4’, port: 8765});
  2. On the Spock side, it means changing net.Listen parameters:
    net.Listen(“unix”, socketPath) -> net.Listen(“tcp”, hostPortAddr)

The streaming Unix domain socket works like TCP without the associated acknowledgments, checksums, flow control, etc. So, in a sidecar, it makes sense to use it to reduce overheads as much as possible.

As a side note, gRPC can also use UDS, but when we tested it, we didn’t see any change in latency compared to TCP/IP.

Data Protocol

When we started to use a streaming UDS, we needed a protocol or agreed data structure for communication between the Diplomat library and Spock sidecar. We tried to stay as simple as possible for the first approach (which was also our last, because it worked so well). We came up with something like this:

  1. We decided to use JSON for the data format, as it’s well-supported in both languages without needing external libraries.
  2. Potentially, it has a higher cost in terms of both CPU and size compared to Protobuf, but we wanted to see if this would actually be a problem.
  3. As it’s a streaming protocol, we need to know how each previous message ends and the next one begins. We decided to use line breaks as message separation. Again, this was because it’s supported in both languages and works well with serialized JSON payloads. Both languages have a straightforward way to read a stream of bytes until some line separates byte(s), which is a default line break.
  4. We also need to solve the issue of how to map requests to responses. For this, we added a special field called id, which is just a simple counter. The counter is calculated at the Diplomat library side and library function blocks. It doesn’t resolve Promise until it gets a response with the same id or reaches a timeout.
  5. In addition, there are two fields:
    a. func: basically says which function to call in Spock (as a REST analogy, think of this as a route to call)
    b. data: what data is sent to function (as a REST analogy, this could be request body or query parameters)
  6. For the end request, the payload looked something like this:
    {
    "id": 1,
    "func": "getService",
    "data": {
    "service": "barista"
    }
    }

    This example is for service discovery and responds with an IP address and port using what consumers can connect to the barista service.
    An example response:
    {
    "id": 1,
    "data": {
    "host": "1.2.3.4",
    "port": 8765
    }
    }

Volumes in Kubernetes for enabling IPC

As we use Kubernetes and Unix Domain Sockets, we need a way to share UDS between containers. This can be done using Kubernetes volumes.

First of all, we need to define a new volume in the Kubernetes manifest:

volumes:
- name: uds
emptyDir:
medium: Memory

Here, we specify volume medium type as memory, meaning any other storage isn’t used. We do this to maximize performance and minimize latency. This data only lives in context when both the main service and sidecar containers are running and information is exchanged, so memory medium type is a perfect fit.

Now we only need to mount this volume in both containers — main service and sidecar:

volumeMounts:
- name: uds
mountPath: /uds
readOnly: false

readOnly must be false, as we also write there when sending data over UDS. Although there’s no need to specify it in the manifest as false, it’s the default value for this property.

Startup race condition between the sidecar and service container

How do you retry if the sidecar isn’t ready yet? In the pod startup, all containers start simultaneously, and there’s no startup order guaranteed between them. Therefore, we must take care of it in the Diplomat library code. Multiple things can be done here. The first thing partly overlaps with the next section, managing a dropping UDS connection. If, for some reason, the connection to the Unix domain socket fails — for example, it doesn’t yet exist — then eventually, a close event handler is called. If it’s not a graceful shutdown, it should attempt to connect back.

Also, because sidecar is responsible for creating a UDS file, we can wait until it exists on service startup in the container by creating a function that doesn’t return before a file is created and a UDS connection is established. In addition, a timeout can be added — after exceeding it. First, the timeout error is returned and logged. Secondly, process.exit(1); is called so Kubernetes can try to restart the service container.

As service discovery is a hard dependency, even if we didn’t timeout in code, it would work, as Kubernetes will eventually force restart the container itself.

A potential pseudo code for waiting when UDS file is there without timeout handling could look something like this:

const waitUdsListenerReady = async () => {
while (true) {
try {
await fs.access(sockPath, fs.constants.R_OK | fs.constants.W_OK);
break;
} catch (e) {
// waits 1 second
// production ready code it could be dynamic
// like Math.min(2 ** ATTEMPT_NUMBER, 5000)
await sleep(1000);
}
}
};

Managing a dropping UDS connection

So far, this has been extremely stable for us. We could say it’s so stable that we shouldn’t even care if the UDS connection drops. Actually, we should care, as we never know when or why this might happen — we want to write software that’s robust and works reliably (also when edge cases occur). Even if we’ve tested it often and the problem never appeared, we should still cover all possibilities.

In short, when a close event happens in the Diplomat library, it should connect back, unless it’s a graceful shutdown of the pod. Here’s an example pseudocode:

// when connection ends
// does not matter if with unexpected error or not
client.on('close', (hadError) => {
// only connect back if graceful shutdown not in progress
if (gracefulShutdownInProgress) {
return;
}
// if closed with error then backoff for a second before connecting back
// can happen on startup if sock file is not there yet
if (hadError) {
setTimeout(createClient, 1000);
} else {
// reconnect to sidecar
createClient();
}
});

How to not close sidecar too early at a graceful shutdown

Another issue we need to take care of is graceful shutdown. When a graceful shutdown occurs, Kubernetes sends a SIGTERM signal to all containers in the pod at once. This may cause a problem where the sidecar shuts down earlier than the main service container and the main service may still need the sidecar at that moment.

As a UDS connection is a “persistent connection”, we can keep track of how many connections are opened on the Spock sidecar side. Now, if a graceful shutdown occurs and the service containers close, the connections count should drop to zero. Only then, if it drops to zero, we exit gracefully from the sidecar container. If a malfunctioning main service doesn’t shut down gracefully, the Kubernetes grace period takes care of it. By default, this is 30 seconds, which means Kubernetes will kill the container if it hasn’t been gracefully exited by that time. This can be changed in the Kubernetes manifest by specifying terminationGracePeriodSeconds property.

Here’s a pseudo code of how listener may look:

// using atomic to track active connections
var activeConnections atomic.Int32
// channel used for tracking if graceful shutdown can be proceed
var allClientsDisconnected chan struct{}
// flag to determine if graceful shutdown is running
var shuttingDown atomic.Bool

_ = os.Remove(socketAddr)
listener, err := net.Listen("unix", socketAddr)
if err != nil {
log.Fatal(err)
}

if err := os.Chmod(socketAddr, os.ModeSocket|0666); err != nil {
log.Fatal(err)
}

go func() {
<-allClientsDisconnected
listener.Close()
}()

for {
conn, err := listener.Accept()
if err != nil {
log.Println(err)
continue
}

activeConnections.Add(1)

go func() {
defer conn.Close()

scanner := bufio.NewScanner(conn)

for scanner.Scan() {
_, err = conn.Write(append(process(scanner.Bytes()), '\n'))
if err != nil {
log.Println(err)
}
}

if err := scanner.Err(); err != nil {
log.Println(err)
}

curActiveConnections := activeConnections.Add(-1)

// checking if graceful shutdown is in progress
// checking if it was last closed connection
if shuttingDown.Load() && curActiveConnections == 0 {
close(allClientsDisconnected)
}
}()
}

The function called when a SIGTERM signal is received may look something like this:

func WaitAllDisconnected() {
// minimum graceful shutdown time
time.Sleep(10 * time.Second)

// indicates that graceful shutdown is running
shuttingDown.Store(true)

// if connections are already closed
if activeConnections.Load() == 0 {
return
}

// waiting until main service container is exiting
<-allClientsDisconnected
}

In more production-ready code, all variables like shuttingDown, activeConnections and others should be part of the struct that keeps full state.

Client-side timeout

We can never trust a service or sidecar entirely from a consumer perspective. To prevent a consumer or main service from waiting too long, we added a timeout mechanism to the Diplomat library for all calls to the sidecar. After a certain period, without an answer from the sidecar, an error is thrown.

A sendRequest function example pseudocode that also handles client-side timeout:

const sendRequest = (func, data, timeout = SPOCK_DEFAULT_REQUEST_TIMEOUT_MS) => {
// id overflow is not handled here and it's highly unlikely to happen if maxRequests is big enough
// could just throw an error if that happens or backoff for some time to retry again
currentId = (currentId + 1) % maxRequests;

return new Promise((resolve, reject) => {
// send data to sidecar
client.write(
`${JSON.stringify({
id: currentId,
func,
data,
})}\n`,
);

// capture the variable from closure
((currentId) => {
// this object is used to track inflight requests
// used for resolving/rejecting responses
requests[currentId] = {
resolve,
reject,
// client side timeout for request
// will be cleared when answer comes before
timeout: setTimeout(() => {
requests[currentId].reject(new Error('Request to Spock timed out'));
// cleanup used request socket in array
// using null as we are using fixed length arrays for less allocations
requests[currentId] = null;
}, timeout),
};
})(currentId);
});
};

Results

By using this solution, latency dropped to around 0.1–0.2 ms. Even when Kubernetes nodes were under a heavy load, it was around 0.6 ms. Also, the heavier the load, the bigger the difference between the old and new solution.

Old vs. new solution in terms of latency:

+----------------+--------------+--------------+-----------------+
| | Old solution | New solution | Difference |
+----------------+--------------+--------------+-----------------+
| Node load low | 0.6 ms | 0.2 ms | ~0.4 ms / ~300% |
+----------------+--------------+--------------+-----------------+
| Node load high | 1.8 ms | 0.6 ms | ~1.2 ms / ~300% |
+----------------+--------------+--------------+-----------------+

Latency has a linear correlation to node load. The new solution outperforms the old one by roughly 300%.

Full sidecar example implementation

package uds

import (
"bufio"
"encoding/json"
"errors"
"log"
"net"
"os"
"sync/atomic"
"time"
)

// request/response structure
// response case Function is just empty
type Message struct {
Id uint32 `json:"id"` // id - counter
Function string `json:"func,omitempty"` // function name
Data json.RawMessage `json:"data"` // data to send function
}

type UdsServer struct {
activeConnections atomic.Int32
shuttingDown atomic.Bool
allClientsDisconnected chan struct{}
}

func New() *UdsServer {
us := UdsServer{
allClientsDisconnected: make(chan struct{}),
}

return &us
}

func (us *UdsServer) StartServer(socketAddr string) {
// let's cleanup just for case
// helps if container crashes or is killed for some reason and file is already there
_ = os.Remove(socketAddr)
listener, err := net.Listen("unix", socketAddr)
if err != nil {
log.Fatalf("Failed to start UDS listener at %q: %v\n", socketAddr, err)
}

if err := os.Chmod(socketAddr, os.ModeSocket|0666); err != nil {
log.Fatalf("Failed to change UDS socket permissions: %v\n", err)
}

// wait until service container has finished on graceful shutdown
go func() {
<-us.allClientsDisconnected
listener.Close()
}()

log.Printf("UDS listener started at %s\n", socketAddr)

for {
conn, err := listener.Accept()
if err != nil {
// normal closing case at graceful shutdown when all clients disconnected
// break out of loop
if errors.Is(err, net.ErrClosed) {
break
}

if nErr, ok := err.(net.Error); ok && nErr.Timeout() {
// log error when timeout happens in case of connecting to client
log.Printf("Connection timeout happened in establishing UDS connection %v\n", nErr)
} else {
// log unknown error
log.Printf("Failed to establish UDS connection %v\n", err)
}

// continue to accept next connection
continue
}

// client connected, incrementing active connections count
us.activeConnections.Add(1)

go func() {
defer conn.Close()

scanner := bufio.NewScanner(conn)

for scanner.Scan() {
_, err = conn.Write(append(process(scanner.Bytes()), '\n'))
if err != nil {
log.Printf("Writing back to client failed with error: %v\n", err)
}
}

if err := scanner.Err(); err != nil {
if !errors.Is(err, net.ErrClosed) {
log.Printf("UDS scanner stopped with error: %v\n", err)
}
}

log.Printf("UDS client disconnected from %s\n", conn.RemoteAddr().String())

// client disconnected, decrementing active connections count
activeConnections := us.activeConnections.Add(-1)

// checking if graceful shutdown is in progress
// checking if it was last closed connection
if us.shuttingDown.Load() && activeConnections == 0 {
close(us.allClientsDisconnected)
}
}()

log.Printf("UDS client connected from %s\n", conn.RemoteAddr().String())
}

log.Printf("UDS listener closed at %s\n", socketAddr)
}

// indicates that graceful shutdown started
// if connection closes then should gracefully end
func (us *UdsServer) WaitAllDisconnected() {
// minimum graceful shutdown time
time.Sleep(10 * time.Second)

// indicates that graceful shutdown is running
us.shuttingDown.Store(true)

// if connections are already closed
if us.activeConnections.Load() == 0 {
return
}

// waiting until main service container is exiting
<-us.allClientsDisconnected
}

// errors we do not handle here for sake of simpler example
func process(in []byte) []byte {
req := Message{}
_ = json.Unmarshal(in, &req)

funct := req.Function
var res []byte

switch funct {
case "examplefunc1":
// here call function processor
// res = processExampleFunc1(req.Data)
// and so on for each func
}

// we are reusing request struct for response
req.Function = ""
req.Data = res

out, _ := json.Marshal(req)

return out
}

Full client library example implementation

const fs = require('fs');
const net = require('net');
const readline = require('readline');
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
// maximum tolerable inflight requests number
const maxRequests = Math.pow(2, 17) - 1; // 131071
// used for tracking inflight requests
const requests = Array.from({ length: maxRequests });
const sockPath = '/uds/sidecar.sock';
const SIDECAR_DEFAULT_REQUEST_TIMEOUT_MS = 1000;

// tracking for current ID, autoincrement
let currentId = 0;
// client is cached here
// behaves as singleton
let client;
// line reader is cached here
let rl;
// keeps track if graceful shutdown is already in progress
let gracefulShutdownInProgress = false;

// listen SIGTERM event to mark graceful shutdown in progress
process.on('SIGTERM', () => {
gracefulShutdownInProgress = true;
});

const waitUdsListenerReady = async () => {
while (true) {
try {
// check that socket file is there with read and write access
// correct rights are set by sidecar after socket file listener is setup
fs.accessSync(sockPath, fs.constants.R_OK | fs.constants.W_OK);
break;
} catch (e) {
// waits 1 second
// production ready code it could be dynamic
// like Math.min(2 ** ATTEMPT_NUMBER, 5000)
await sleep(1000);
}
}
};

// creates line reader on client
const createLineReader = () => {
rl = readline.createInterface({
input: client,
crlfDelay: Infinity,
});

// if full line is read from incoming stream then line event is emitted
rl.on('line', (line) => {
let res;

try {
res = JSON.parse(line);
} catch (e) {
console.warn(`Sidecar sent data not in JSON format (line=${line})`);
return;
}

const id = res.id;
// retrieves cached request object by id
const request = requests[id];

// meaning request already timed out and is removed from object
if (!request) {
console.warn(`Timed out sidecar request returned (${id})`);
return;
}

// in case or error reject
if (res.error) {
request.reject(res.error);
} else {
request.resolve(res.data);
}

// clear timeout timer
clearTimeout(request.timeout);
// cleanup used request socket in array
// using null as we are using fixed length arrays for less allocations
requests[id] = null;
});
};

const sendRequest = (func, data, timeout = SIDECAR_DEFAULT_REQUEST_TIMEOUT_MS) => {
// id overflow is not handled here and it's highly unlikely to happen if maxRequests is big enough
// could just throw an error if that happens or backoff for some time to retry again
currentId = (currentId + 1) % maxRequests;

return new Promise((resolve, reject) => {
// send data to sidecar
client.write(
`${JSON.stringify({
id: currentId,
func,
data,
})}\n`,
);

// capture the variable from closure
((currentId) => {
// this object is used to track inflight requests
// used for resolving/rejecting responses
requests[currentId] = {
resolve,
reject,
// client side timeout for request
// will be cleared when answer comes before
timeout: setTimeout(() => {
requests[currentId].reject(new Error('Request to sidecar timed out'));
// cleanup used request socket in array
// using null as we are using fixed length arrays for less allocations
requests[currentId] = null;
}, timeout),
};
})(currentId);
});
};

const createClient = async () => {
// waits when unix domain socket file is there with correct rights
await waitUdsListenerReady();

client = net.createConnection({
path: sockPath,
});

client.on('error', (err) => {
console.warn(`Sidecar UDS connection got error: ${err.toString()}`);
});

client.on('ready', () => {
// if client is ready setup line reader for incoming stream
createLineReader();
});

client.on('end', () => {
// if client ended then close properly line reader to not leak
rl.close();
});

// when connection ends
// does not matter if with unexpected error or not
client.on('close', (hadError) => {
// only connect back if graceful shutdown not in progress
if (gracefulShutdownInProgress) {
return;
}
// if closed with error then backoff for a second before connecting back
// can happen on startup if sock file is not there yet
if (hadError) {
setTimeout(createClient, 1000);
} else {
// reconnect to sidecar
createClient();
}
});

// returns function what can be used sending requests to sidecar
return sendRequest;
};

// behaves as singleton
module.exports = async () => {
// only creates client once
if (client) {
return sendRequest;
}

return await createClient();
};

Conclusion

In our case, using custom implementation justified itself. We were able to get communication between containers fast enough that there was no longer a bottleneck, even for services with heavier loads that make lots of service discovery calls.

Nevertheless, there’s always room for improvement, and if we want better performance in the future and even lower latencies, a few things we can try are:

  1. Instead of JSON, we could use Protobuf. This requires less space and is faster to serialize and deserialize. If we want to go a step further, we could implement our own binary protocol, which would probably be even quicker than a general-purpose protocol like Protobuf as we can tune it precisely to our use case.
  2. Using UDP instead of TCP (in context of UDS it means using Datagram sockets). This requires a lot of extra work as we need to make it reliable ourselves and implement some kind of acknowledgement mechanism, retransmitting packets, avoiding fragmentation by keeping the payload size under MTU (maximum transmission unit) value, building some kind of session management for connections and much more. However, it may be worth it if we want to go that last extra mile to squeeze out everything possible. Saying that, I think that for 99% of use-cases, TCP is more than adequate.

--

--

Marek Martins
Pipedrive R&D Blog

Passionate software developer who loves to dive into the low-level details of software performance and optimization.