Worker pattern in Golang for data ETL

Tiểu Đông Tà
Life of a Senior Data Engineer
5 min readJul 5, 2020

Chapter I

Today I am going to share an interesting issue that many of us see pretty frequently.

As you work with data, it is a very common task to extract some badly formatted CSV file and load it somewhere. Let say, to a database.

As a sane-engineer would do, instead of do it manually, which you could but that would hurt your engineer’s pride so much so that you decide to write a script to handle the job. Fairly simple, you told your product owner :

“Hi Tom, it’s piece of cake, I can make it happen in few hours”

This is how you, and me, plan to get it done

Write a small script to load a csv file to database. How hard can it be ?

And then, our story begins. There is one thing you forget to check . It turns out that the your CSV file is not a normal csv, but it’s a huge one, dumped in a harshly manner from a department you have never known that they exist. The CSV file contains hundred thousands or even hundred thousands of line. You cannot even open it properly and you start to wonder:

“ How on earth that guy manage to dump that CSV file such big ? “

Trust me, he may have spent his weekend clicking on retry/try again just to get the CSV file for you.

And now, your script takes 8 hours to execute, and still does not complete. Out of sudden , something happens , and you have to execute your pipeline again.

“But I have a 16Gb RAM, how the **** that could happen?”

You, your face, your emotion when that simple task cannot be done in time

Chapter II

Now, most of the times your code look as simple as this. You read the CSV file , iterate through it one by one and write the data to database, or edit it and then load to the database

The catch here is that you are using a single thread to do it. In another words, your program is pretty much synchronous, which means the 2nd line of the CSV has to wait until the 1st line finishes its processing, the 3rd line has to wait for the 2nd… etc

In fact, the rows in your csv are independent, which mean line number

1,000,000 can be processed before line number 1 and it does not change the nature of the job.

Now, naturally , to fasten it up, you would do something like this

Multi-processing write to database

You realize that reading from CSV is amazingly fast, it is the database writes that is I/O time consuming, so instead of write each file separately, you spawn a new process to handle the write operation. That means multi rows can be processed/written at the same time into the database.

Bravo. Welcome to the world of concurrency programming.

Chapter III

For this task, let’s use Golang, as this language is designed to handle such things.

First, let’s create an interface called Worker

The worker interface has only one method, as well as one single purpose: to execute a task. In our case, for simplicity, we give a string.

Next, we create a WorkerPool. This Pool is the combination of available workers up for processing a task.

The WorkerPool has an array to store available worker , or precisely, pointers of workers (as we are going to see in the next implementation)

When given a job, the WorkerPool is going to pool out a random worker in its pool and execute the job in a separate goroutine.

This means your heavy work load , whatever it is , is going to be executed in a goroutine, without blocking each other . This is the secret sauce that accelerate your program.

Now, depending on the type of work you want to do, you may create different type of worker. Just remind you that WorkerPool stores pool as array of pointer to Work interface, so that as long as your Worker satisfies Work interface, you should be fine.

Here, I create just a simple Worker to print out the message. It implements the executeTask method of Work interface.

If you read up to this line you may wonder the variable

wg *sync.WaitGroup

As mentioned above, your task is executed in a separate goroutine, so some how, you must have a mechanism to notify the termination of the execution process. To make it happen, you have 3 choice in Golang: mutex, chanel and waitGroup.

In this example, for every task about to execute, I increment the waitgroup by 1.

wp.wg.Add(1)

And for every task complete, I notify the waitGroup about its termination.

defer wg.Done()

For details about synchronizing state between goroutine, you can read this nice article

https://medium.com/mindorks/https-medium-com-yashishdua-synchronizing-states-using-mutex-vs-channel-in-go-25e646c83567

Putting things together, we can execute the code. In this example, I create an array contains 3 words “alpha” , “beta”, “gamma” to represent the work load.

You may choose the number of workers as you wish. The number of workers should be reasonably big enough to make the difference, but sometimes too many is just too many. Note that the gain in time is not a linear function. So you have to figure it out based on experiments.

It is going to give you out put:

Worker’s id 0 , executing task, message is gamma
Worker’s id 2 , executing task, message is beta
Worker’s id 2 , executing task, message is alpha

Conclusion

Golang gives you the power to use worker pattern in async programming. This pattern in Golang is very powerful, just like Golang itself. Without setting up a complex Queue engine, you have the capacity to lever up your ETL scripts and data processing.

Let’s thanks this cute little guy for his contribution.

--

--

Tiểu Đông Tà
Life of a Senior Data Engineer

Làm việc tùy tiện theo ý mình, y bốc tướng số trị thủy toán thư, môn nào cũng muốn học lấy một chút