A Glimpse Inside Workers In Ballerina

Natasha Wijesekare
Ballerina Swan Lake Tech Blog
8 min readDec 25, 2018

Ballerina provides built-in support for concurrency. In simple terms, concurrency is when you are doing more than one thing at the same time. Concurrency should not be confused with parallelism. Concurrency is when many sequences of operations are executed in intersecting/overlapping periods of time. Programs can be designed as independent processes collectively working together in a specific composition with concurrency.

Workers in Ballerina create new concurrent flows. Worker actions that include: ‘receive’, ‘ send’, ‘synchronous send’, ‘wait’ and ‘flush’ offer flow control and allow communication between concurrent flows. The execution model of Ballerina is based on ‘strands’ that are lightweight execution flows. Strands are represented by futures in the language. These strands can run in parallel and are scheduled by the runtime.

Workers in Ballerina

Workers in Ballerina allow developers to assign their tasks to parallel running execution. Workers can be declared within functions and resources. By default, a function has an unnamed default worker. This default worker in each function will be executed in the same strand as the caller function. Apart from this implicit default worker, functions can have any number of named workers. These workers are peers of each other.

The worker execution starts precisely after invoking the particular function, remote function, or resource that encloses the worker(s). Stopping the execution of a function is independent of stopping the execution of its named workers. Returning from a named worker will terminate that worker and does not cause a return from the function. The return declarations used by functions are used for the workers who default to a nil return (not returning anything ()).

import ballerina/io;public function main() {
io:println("Starting worker execution...");

// Named worker w1
worker w1 {
int i = 0;
int sum = 0;
foreach var i in 1...10 {
sum += i + i;
}
io:println("Sum calculated by worker W1 -> " + sum);
}
// Named worker w2
worker w2 returns int {
int i = 0;
int sum = 0;
foreach var i in 1...20 {
sum += i + 2;
}
io:println("Sum calculated by worker W2 -> " + sum);
return sum;
}

io:println("Finishing worker execution...");
}

The workers inside a ‘fork’ block are peers of each other. These workers can only communicate (send and receive only ‘anydata’ variables) with each other but not with any other workers outside. So workers created inside a ‘fork’ block will have the same behavior as the workers created inside a function body. The ‘fork’ block will allow the developer to start any number of parallel workers at once. Workers are visible outside the ‘fork’ as futures.

public function main() {

fork {
worker w1 returns (int, string) {
int num = 100;
string s = "Hello World";
return (num, s);
}

worker w2 returns string {
string msg = "Hello I'm Natasha!!!";
return msg;
}
}
}

Futures in Ballerina

Futures are workers that can be waited for outside the function that starts the worker.

import ballerina/io;public function main() {   
// Named worker w1
worker w1 {
int i = 0;
int sum = 0;
foreach var i in 1...10 {
sum += i + i;
}
io:println("Sum calculated by worker W1 -> " + sum);
}
// Wait for worker w1 to finish
io:println("Waiting for worker w1 to finish executing...");
wait w1;
}

Futures are returned by operations that are executed in an asynchronous manner. This future can be used to get the result of the asynchronous function call (if available), check the running status of an execution or cancel the execution if needed.

public function main() {
// Call the `add()` function, that in asynchronous mode
future<int> f1 = start countInfinity();
// Check if the function call is done/finished
io:println(f1.isDone());
// Check if the asynchronous execution is cancelled
io:println(f1.isCancelled());
// Cancel the asynchronous execution.
boolean cancelled = f1.cancel();
io:println(cancelled);
}
function add(int a, int b) returns int {
return a + b;
}

Worker Actions in Ballerina

Workers communicate with each other by sending and receiving messages. These messages should be of ‘anydata’ type.

anydata type is equivalent to () | int | float | decimal | string | (anydata|error)[] | map<anydata|error> | xml | table

The messages are exchanged over a communication channel that is visible to the developer. Following are the worker actions provided by Ballerina.

Receive Action

Receive action is used by workers to receive/retrieve messages sent to the particular worker. All send and receive actions are validated by Ballerina at compile time to ensure that there is a matching receive in a worker for every message sent to that worker. Receive action will wait if the data is not available. If an error happens in the sending working while the receiving worker is waiting for a message to be received, then the receive action will propagate the error.

Send Action (Asynchronous Send)

Send action is used by workers to send messages asynchronously to other workers. These messages should be of ‘anydata’ type.

Synchronous Send Action

Synchronous send action is used by workers to send messages synchronously to other workers. These messages should be of ‘anydata’ type. Unlike asynchronous send, synchronous send will wait until the message is delivered. Synchronous send will return nil if the message was successful to the receiver or will return an error or panic based on the receiving worker’s state.

Sample code snippets to depict these actions are given below:

  1. Send and receive actions between 2 named workers
public function main() {
worker w1 {
int a = 10;
// Send messages to worker `w2`. This message contains a
// variable of 'int' type.
a -> w2;
// Receive a message from worker `w2`. This message contains a
// 'string' type value.
string msg = <- w2;
}
worker w2 {
int a = 0;
// Receive a message from worker `w1`. This message contains a
// 'string' type value.
a = <- w1;

string s = "Hello World!!";
// Send messages to worker `w1`. This message contains a
// variable of 'string' type.
s -> w1;
}
}

2. Synchronous send and receive actions between 2 named workers

public function main() {
worker w1 {
int a = 10;
// Synchronous send to worker 'w2'. Worker 'w1' will wait until
// 'w2' receives the message.
var fRes = a ->> w2;

// Synchronous send to worker 'w2' will return nil if the
// message was delivered successfully.
() sRes = a ->> w2;
}

worker w2 {
int b = 0;
// Receive messages from worker 'w2'. The messages received
// contains 'int' type value.
b = <- w1;
b = <- w1;
}
}

3. Interactions between the default worker and named workers

public function main() {
worker w1 {
string m = "";
// Receive messages from the default worker
m = <- default;

string v = "Hello from worker w1";
// Send messages to the default worker
v -> default;
}

// Send messages to worker 'w1'
"Hello from the default worker" -> w1;

// Receive messages from worker 'w1'
string result = <- w1;
}

Wait Action

Wait action can be used to wait for one or more futures to complete. The return values of the futures waited for are available including any errors that were raised during their execution. Waiting for a future and waiting for a worker have the same concept. If a future that is waited for panics, then the wait action will also panic.

As of now, wait action supports three semantics:

  1. Wait for one: This will wait for the given future to complete. The value returned by the future will be taken as the result.
public function main() {
worker w1 returns int {
int x = 50;
return x + 1;
}
// Wait for worker 'w1' to complete
int result = wait w1;
}

2. Wait for any: This will wait for any one of the given futures to complete. The value returned by the future that finishes first will be taken as the result.

import ballerina/runtime;public function main() {
worker w1 returns int {
int i = 4;
// Sleep for 100 milli seconds
runtime:sleep(100);
return i * i;
}
worker w2 returns string {
string msg = "Hello Natasha!! Good morning"
return msg;
}

// Wait for any one of the given workers to complete
// Result will be the return value of the worker that finishes
// first. Here worker 'w2' will finish first since a
// runtime:sleep is added in worker 'w1' to delay its execution.
int|string result = wait w1 | w2;
}

3. Wait for all: This will wait for all the given futures to complete. The values returned from each future will be taken together as the result as a record or map.

public function main() {
worker w1 returns int {
int x = 30;
return x;
}

worker w2 returns int {
int y = 15;
int g = y + 1;
return g;
}

// Wait for all the given workers 'w1' and 'w2' to complete
// The result of the 2 workers will be assigned to a map.
// with the key as the worker names if a key is not provided
// explicitly.
map<int> result = wait {w1, w2};
}
public function main() {
worker w1 returns int {
int x = 100;
return x;
}

worker w2 returns string {
string s = "Hello Natasha!! How are you?"
return s;
}

// Wait for all the given workers 'w1' and 'w2' to complete
// The result of the 2 workers will be assigned to a record with
// the field name as the worker name if a field name is not
// provided.
record{ int w1; string w2; } result = wait {w1, w2};
}

Flush Action

Flush action is used by workers to check if all messages sent to a given worker is successfully delivered i.e. it will wait until all messages are sent from the current worker to the given worker. If the receiving worker fails with an error or panics, then the error will be propagated to the waiting strand.

public function main() {
worker w1 {
int a = 10;
a -> w2;
a + 10 -> w2;

// Flush all messages sent to worker 'w2'. Worker 'w1' will
// stop here until all messages are sent or for a failure in
// 'w2'.
error? result = flush w2;

a -> w2;

// Flush all messages sent to all workers from worker 'w1'.
// Worker 'w1' will stop here until all messages are sent to
// the workers or for any failures by the workers.
error? result = flush;
}

worker w2 {
int b;
b = <- w1;
b = <- w1;

b = <- w1;
}

worker w3 {
int a = <- w1;
}
}

So this was just a brief overview of workers and their actions in Ballerina. Further examples can be found here. You can try these out by downloading ballerina.

Now as for how to end with a bang? Well, that’s another blog post for another time :) :) Till we meet again :) :)

--

--