You no superhero, queue your tasks

Sakib Sami
LiveKlass
Published in
4 min readApr 26, 2018

Marvel studio’s lastest movie Avengers : Infinity war is going to release today. Maybe you are excited but I am a DC fan.

Well, its fine but problem is neither you, nor me is a superhero. And we are not going to save the world. Oh, wait I have to save my system.

It’s not who I am underneath, but what I do that defines me.

Lets say you are developing a blogging system and when a post is being created you have to send email to all the subscribers.

How will you handle this ?

Pull subscribers from database and throw a mail to all of them together ?

Ow, what a easy solution !!!

But wait, what if there are thousands of user ? What if a mail fails ?

One of the solution could be queueing all of them as task.

Let me show how,

The below implementation is based on,

  1. Golang
  2. Machinery
  3. RabbitMQ

Step 1 :

func NotifySubscriber(args ...string) error {
fmt.Println(fmt.Sprintf("Notifying to : %s", args[0]))
if strings.HasPrefix(args[0], "a") || strings.HasPrefix(args[0], "c") {
return tasks.NewErrRetryTaskLater("forced error to requeue task", 10*time.Second)
}
return nil
}

Step 2 :

// Machinery configuration to connect to RabbitMQ
var cfg = &config.Config{
Broker: "amqp://batman:batman@localhost:5672/",
DefaultQueue: "machinery_tasks",
ResultBackend: "amqp://batman:batman@localhost:5672/",
AMQP: &config.AMQPConfig{
Exchange: "machinery_exchange",
ExchangeType: "direct",
BindingKey: "machinery_task",
},
}

Step 3:

// Making connection to server
server, err = machinery.NewServer(cfg)
if err != nil {
// do something with the error
}
// Registering task by name to server to listen
server.RegisterTask(TaskNotifySubscriber, tasks.NotifySubscriber)

Step 4 :

// Spawning 10 concurrent worker to consume tasks
go func
() {
worker := server.NewWorker("yo_no_superhero", 10)
err := worker.Launch()
if err != nil {
// do something with the error
}
}()

Step 5 :

// Assume these are the email addresses of subscribers
emails := []string{"a@email.com", "b@email.com", "c@email.com", "d@email.com", "e@email.com"}

for _, email := range emails {
signature := &t.Signature{
Name: TaskNotifySubscriber,
Args: []t.Arg{
{
Type: "string",
Value: email,
},
},
}
server.SendTask(signature) // Sending task to server
}

Results :

Notifying to : a@email.com
WARNING: 2018/04/27 00:44:57 worker.go:199 Task task_9d055fca-86de-47d8-be78-c315856625f3 failed. Going to retry in 10 seconds.
Notifying to : b@email.com
INFO: 2018/04/27 00:44:57 worker.go:222 Processed task task_1b59daf3-dbff-4f4a-a6b3-4c24014d4859. Results = []
Notifying to : c@email.com
WARNING: 2018/04/27 00:44:58 worker.go:199 Task task_33f32c8a-f249-47e6-b58e-cc386111dc82 failed. Going to retry in 10 seconds.
Notifying to : d@email.com
INFO: 2018/04/27 00:44:58 worker.go:222 Processed task task_e4bf444f-dce2-4c11-b91f-af5400069b55. Results = []
Notifying to : e@email.com
INFO: 2018/04/27 00:44:58 worker.go:222 Processed task task_41024a4e-0d65-4914-bed9-4dd6dff69d0f. Results = []
Notifying to : a@email.com
WARNING: 2018/04/27 00:45:07 worker.go:199 Task task_9d055fca-86de-47d8-be78-c315856625f3 failed. Going to retry in 10 seconds.
Notifying to : c@email.com
WARNING: 2018/04/27 00:45:08 worker.go:199 Task task_33f32c8a-f249-47e6-b58e-cc386111dc82 failed. Going to retry in 10 seconds.

So, what is happening here.

Step 1, created a method which is responsible for processing task. Just to demonstrate the re-queue process, returned error with 10 seconds of delay. So the task will be executed 10 seconds later.

Step 2, Initialized configuration to connect to task queue server.

Step 3, establishing a connection to task queue server which is RabbitMQ. And registered the task. server.RegisterTask takes two parameter. One is task name (type string), another one is a function which will process the task.

Step 4, at this step spawning 10 concurrent worker to consume tasks from queue. Any registered tasks will be consumed by these workers.

Step 5, finally here sending some tasks to process. Also you can queue task with initial delay.

Results : have a look on results task for email a@email.com and c@email.com failed and being re-queued. And task for other email has been executed successfully, so those are not being re-queued.

So now your task will be processed from queue and if it fails to execute if you want you can re-queue to be processed later. Even if the server goes down. RabbitMQ will store these tasks and will process when you makes it up.

We-no-superhero but yet we can save our system. And don’t dare to process everything together, these may make your system fall and you heart broken ;) .

Source code : https://github.com/s4kibs4mi/you-no-superhero

Machinery : https://github.com/RichardKnop/machinery

RabbitMQ : https://www.rabbitmq.com/

--

--

Sakib Sami
LiveKlass

Senior Software Engineer @ Twilio | Entrepreneur | Tech Ninja