Temporal: Getting Started in JAVA

Dushant Singh
CARS24 Engineering Blog
8 min readNov 1, 2022

Learn the what, when, and how.

Introduction

In this tutorial, we’ll cover the following:

  • Introduction
  • Installation
  • tctl(Temporal CLI)
  • Getting Started in JAVA
  • External Interaction with Workflow
  • Await Workflow
  • Custom Search Attributes

Introduction
Temporal is a Pure code-based open-source orchestration framework. For most developers, it will be the easiest and most conventional of all as it allows workflows to be defined programmatically.
It supports various programming languages

  • JAVA
  • GO
  • PHP
  • PYHTON
  • TS
  • JS

It supports long-running workflows and can scale to millions of concurrent executions.

Installation
For the development environment, Temporal can be installed easily using docker.

  1. Install docker & docker-compose.

MAC:

1. download docker.dmg file from here as per your chipset
2. Double-click Docker.dmg to open the installer, then drag the Docker icon to the Applications folder.
3. Double-click Docker.app in the Applications folder to start Docker.
4. The Docker menu displays the Docker Subscription Service Agreement window. Select Accept to continue. Docker Desktop starts after you accept the terms.

refer here for other OS.

2. Clone the temporalio/docker-compose repository.

git clone https://github.com/temporalio/docker-compose.git

3. Change the directory for the project.

cd docker-compose

4. run the docker instance

The directory contains multiple docker-compose YAML files that can be used to run temporal with pre-defined configurations.

temporal docker-compose files with components

For now, Use the following command to run the temporal docker image

docker-compose up

The Default config uses PostgreSQL as a persistent layer and ElasticSearch.
you can use the below command to start the temporal server with MySql & ElasticSearch

docker-compose-mysql-es.yml up

Remember, to use docker only for the dev environment. For Production, refer here.

Results: You should have Temporal Cluster running http://127.0.0.1:7233 and the Temporal Web UI at http://127.0.0.1:8088(check for port in your compose file, may change with versions).

Temporal web UI

tctl

The Temporal CLI (tctl) is a command-line tool that you can use to interact with a Temporal Cluster. It allows us to perform various operations that aren’t supported on UI. Important operations supported by tctl are:-

  1. Creating Namespace

A namespace is to Temporal , what databases are to DBMS.
Namespaces allow multiple applications to run to the same temporal cluster in an isolated environment.

ctl --namespace custom  namespace register

2. Custom Search Attributes

Temporal allows us to query workflows using custom fields directly from the underlying ElasticSearch instance. Before we can set a custom attribute, it needs to be defined in temporal with the valid data type.

tctl admin cluster add-search-attributes --name custom --type Text

Here “Custom” is the attribute of type “Text”. The following types are supported by temporal

  1. Bool (backed by boolean in elastic)
  2. Text (backed by Text in elastic)
  3. DateTime (backed by date in elastic)
  4. Double (backed by scaled_float in elastic)
  5. int (backed by long in elastic)
  6. Keyword (backed by Keyword in elastic)

Use the below command to list existing search attributes

tctl admin cluster get_search_attributes

3. Reset Workflows

There may be instances, where we would like to re-run our workflow from a specific point. Internally temporal terminates the current workflow execution and starts a new execution by retaining already elapsed events before the reset point.

tctl workflow reset --wid <workflow_id> --event_id <event_id> --reason <reset reason>

The following info is required:

  1. workflow_id:- id of workflow, we would like to reset. This can be found out using temporal UI
  2. event_id:- event Id from where we would like to re-run the workflow. The event has to be one of the following types:
    1. WorkflowTaskCompleted
    2. WorkflowTaskFailed
    3. WorkflowTaskTimeout
  3. reason:- reason to reset the workflow execution.

Getting Started in JAVA

  • Add temporal dependencies to your project
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-sdk</artifactId>
<version>1.11.0</version>
</dependency>
  • The next step is to create Activities. In temporal all non-deterministic operations must be performed inside the activity only.
    Non-deterministic operations can be API calls, Random number generation, etc.
    Temporal executes activity method invocation once and stores its results in the underlying database and while replaying events uses the same result instead of again performing activity invocation, thus providing determinism to Workflow.
@ActivityInterface
public interface CustomActivity{
@ActivityMethod
Map<String, String> fetchCustomMap();
}

Next, define an implementation of your activity Interface

@Service
public class CustomActivityImpl implements CustomActivity { Map<String, String> fetchCustomMap(){
//perform any non deterministic operation like API call
return new HashMap(); }}
  • The next step is defining our Workflow. Workflows in temporal can be long-running and must be Deterministic (multiple executions of the same instance must result in the same output).
    note:- Keep in mind Thread.wait(), Thread.sleep(), System.currentTimeMillis(), etc are not allowed inside the workflow as these are non-deterministic in nature. Also, global/static variables that can be modified by other workflows must not be used.

Let’s begin by defining our Workflow interface. Here, CustomRequest is a custom Class for providing input to Workflow.

// Workflow interface
@WorkflowInterface
public interface CustomWorkflow {
@WorkflowMethod
String start(CustomRequest request);
}

Next, define Implementation for Workflow

public class CustomWorkflowImpl implements CustomWorkflow {private CustomActivity customActivity;@Override
public void start(CustomRequest request) {
//workflow code
System.out.println("workflow execution start");
initialize();Map<String, String> activityResult = customActivity.fetchCustomMap(); ---3
System.out.println("activity result "+ activityResult);
System.out.println("workflow execution ends");
}/*
initialize all activities.
*/
private void initialize() {
RetryOptions retryOptions = RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(1))
.setMaximumInterval(Duration.ofSeconds(5000))
.setBackoffCoefficient(1).setMaximumAttempts(5)
.build(); ---1

ActivityOptions options = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(60))
.setRetryOptions(retryOptions)
.build(); ---2customActivity = Workflow.newActivityStub(CustomActivity.class, options);
}
  1. Retry Parameter for Activity Methods
  2. Activity Options for Activity Methods, where start to close timeout = max time for which workflow should wait for the activity to complete for 1 try.
  3. Activity method invocation
  • Continue by creating WorkflowClient which is used to start Workflows
@Bean
public WorkflowServiceStubs workflowServiceStubs() {
return WorkflowServiceStubs
.newInstance(WorkflowServiceStubsOptions
.newBuilder()
.setTarget("127.0.0.1:7233") --- 1
.build());
}
@Bean
public WorkflowClient workflowClient(WorkflowServiceStubs workflowServiceStubs) {
return WorkflowClient.newInstance(workflowServiceStubs,
WorkflowClientOptions.newBuilder()
.setNamespace("default") --- 2
.build());
}
  1. Host where Temporal Server is running
  2. Namespace to be used for the project (new namespaces can be created using tctl as described. above)
  • The next step is to create WorkerFactory. WorkerFactory is actually responsible for executing workflows. All Workflows and Activities need to be registered with workers.
@Bean
public WorkerFactory workerFactory(WorkflowClient workflowClient, CustomActivityImpl customActivity) throws ClassNotFoundException {
WorkerFactory workerFactory = WorkerFactory.newInstance(workflowClient);
WorkflowImplementationOptions workflowImplementationOptions =
WorkflowImplementationOptions.newBuilder() .setFailWorkflowExceptionTypes(NullPointerException.class) ---1
.build();
//create Worker
Worker worker.registerActivitiesImplementations = workerFactory.newWorker("queue"); ---2

//register workflows with worker worker.registerActivitiesImplementations(workflowImplementationOptions, MainImpl.class, SecondaryImpl.class); ---3
//register activities with worker
worker.registerActivitiesImplementations(customActivity); ---4

workerFactory.start();
return workerFactory;
}
  1. Exceptions on which workflow should fail. These can be used where no further processing is possible
  2. Queue on which worker will listen for events
  3. All Workflow implementation classes need to be registered
  4. All Activities are registered

Note:- Here we are registering Classes for Workflows and Objects for Activities as Activities in temporal are assumed to be stateless whereas Workflows can be stateful.

Worker threads run in the background and listen to specific queues for events and perform actions accordingly. Actions can be performed on Workflows or Activities. If the specified Workflow/Activity is not registered with the worker, the message is ignored.
We can create multiple workers as per our requirements.

  • Now we have created all required elements, It’s time to finally test our code by starting our workflow. This can be done on client input using API or on server startup.
    For Simplicity, we are doing an server startup
@Component
@AllArgsConstructor
@Slf4j
public class StartupListener {private final WorkflowClient client;@EventListener
public void onApplicationEvent(ApplicationStartedEvent startedEvent) {
WorkflowOptions workflowOptions = WorkflowOptions.newBuilder()
.setWorkflowId("custom_workflow_1") ---1
.setTaskQueue("queue")
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(2).build())
.build();CustomRequest customRequest = new CustomRequest();try {
CustomWorkflow customWorkflow = client.newWorkflowStub(CustomWorkflow.class, workflowOptions);
WorkflowClient.start(customWorkflow::start, customRequest); ---2
} catch (WorkflowExecutionAlreadyStarted e)---3 {
log.info("workflow already running");
}}
}
  1. Workflow Id for current execution. Must be unique between executions
  2. Start asynchronous workflow execution. Synchronous Execution is also possible by directly invoking the workflow method from stub like this.
    customWorkflow.start(request);
  3. WorkflowExecutionAlreadyStarted Exception is thrown in the case already a workflow is in progress with the same Id.

Result: Visit Temporal UI on http://127.0.0.1:8088(check for port in your compose file, may change with versions) to see workflow execution status.

Workflow execution status on Temporal Web UI

Here,

workflow_id = workflow id specified when starting execution
run_id = unique Id assigned to each execution by Temporal

name = unique name assigned by temporal to each Implementation Class.

status = execution status

start time = execution start time

end time = execution end time

Click on execution run_id to view a detailed view

Event History for execution

External Interaction with Workflow

Workflow can communicate with Other workflows or clients. These can be achieved using

  1. Queries
  2. Signals

Queries

A Query is a synchronous operation that is used to get the state of a Workflow Execution. First, you need to define the query method in your workflow Interface

@QueryMethod
String details();

Provide its implementation in the workflow Implementation class.

Now workflow can be queried as follows:-

CustomWorkflow customWorkflow = client.newWorkflowStub(CustomWorkflow.class, workflowOptions);
String queryResult = customWorkflow.details();

Here, client = WorkflowClient Object

Signals

A Signal is a message sent to a running Workflow Execution. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution. First, you need to define a signal method in your workflow Interface

@SignalMethod
void update(CustomSignal signal);

Provide its implementation in the workflow Implementation class.

Now workflow can be signaled as follows:-

CustomSignal signal = new CustomSignal(); //custom class for passing arguments
WorkflowStub custom_workflow = client.newUntypedWorkflowStub("custom_workflow_1");
custom_workflow.signal("update", signal);

Here, client = WorkflowClient Object

Await Workflow

We can make our workflow execution wait for a certain interval or some condition to be true.

This is extremely useful when an external signal is required to proceed further. this can be achieved as follows:-

Workflow.await(Duration.ofSeconds(60), () -> wakeUp)

Here, the workflow will wait for max 60 seconds for wakeUp to be true and then continue to execute the next statement.

Custom Search Attributes

Custom search attributes must first be defined in the temporal cluster using tctl as mentioned above and then we can set its value for each Execution instance.

Let's first define a custom attribute user of type Text.

tctl admin cluster add-search-attributes --name user --type Text

Now let's define its value from inside workflow execution. Its value can be updated at any time during the execution.

Map<String, Object> attributes = new HashMap<>();
attributes.put("user", "bob");
Workflow.upsertSearchAttributes(attributes);

Conclusion

Temporal lets us design resilient workflows quickly. Being language-agnostic and developer-friendly is a great choice for an orchestration framework. It also provides a testing library to write unit tests for our system.

Author : Aviral Ahuja

--

--