Skip to main content

Writing Workers with the Go SDK

A worker is responsible for executing a task. Operator and System tasks are handled by the Conductor server, while user defined tasks needs to have a worker created that awaits the work to be scheduled by the server for it to be executed.

Worker framework provides features such as polling threads, metrics and server communication.

Design Principles for Workers

Each worker embodies design pattern and follows certain basic principles:

  1. Workers are stateless and do not implement a workflow specific logic.
  2. Each worker executes a very specific task and produces well-defined output given specific inputs.
  3. Workers are meant to be idempotent (or should handle cases where the task that partially executed gets rescheduled due to timeouts etc.)
  4. Workers do not implement the logic to handle retries etc, that is taken care by the Conductor server.

Creating Task Workers

Task worker is implemented using a function that confirms to the following function

type ExecuteTaskFunction func(t *Task) (interface{}, error)

Worker returns a struct as the output of the task execution. The struct MUST be serializable to a JSON map. If an error is returned, the task is marked as FAILED

Task worker that returns a struct

//TaskOutput struct that represents the output of the task execution
type TaskOutput struct {
Keys []string
Message string
Value float64

//SimpleWorker function accepts Task as input and returns TaskOutput as result
//If there is a failure, error can be returned and the task will be marked as FAILED
func SimpleWorker(t *model.Task) (interface{}, error) {
taskResult := &TaskOutput{
Keys: []string{"Key1", "Key2"},
Message: "Hello World",
Value: rand.ExpFloat64(),
return taskResult, nil

Controlling execution for long-running tasks

For the long-running tasks you might want to spawn another process/routine and update the status of the task at a later point and complete the execution function without actually marking the task as COMPLETED. Use TaskResult struct that allows you to specify more fined grained control.

Here is an example of a task execution function that returns with IN_PROGERSS status asking server to push the task again in 60 seconds.

func LongRunningTaskWorker(t *model.Task) (interface{}, error) {
taskResult := model.NewTaskResult(t)
taskResult.OutputData = map[string]interface{}{}

//Keep the status as IN_PROGRESS
taskResult.Status = task_result_status.IN_PROGRESS
//Time after which the task should be sent back to worker
taskResult.CallbackAfterSeconds = 60
return taskResult, nil

Starting Workers

TaskRunner interface is used to start the workers, which takes care of polling server for the work, executing worker code and updating the results back to the server.

apiClient := client.NewAPIClient(

taskRunner := worker.NewTaskRunnerWithApiClient(apiClient)
//Start polling for a task by name "simple_task", with a batch size of 1 and 1 second interval
//Between polls if there are no tasks available to execute
taskRunner.StartWorker("simple_task", examples.SimpleWorker, 1, time.Second*1)
//Add more StartWorker calls as needed


Task Management APIs

Get Task Details

task, err := executor.GetTask(taskId)

Updating the Task result outside the worker implementation

Update task by id

output :=  &TaskOutput{
Keys: []string{"Key1", "Key2"},
Message: "Hello World",
Value: rand.ExpFloat64(),
executor.UpdateTask(taskId, workflowInstanceId, task_result_status.COMPLETED, ouptut)

Update task by Reference Name

output :=  &TaskOutput{
Keys: []string{"Key1", "Key2"},
Message: "Hello World",
Value: rand.ExpFloat64(),
executor.UpdateTaskByRefName("task_ref_name", workflowInstanceId, task_result_status.COMPLETED, ouptut)

Worker Metrics

We use Prometheus to collect metrics. When enabled the worker starts an HTTP server which is used to publish metrics, which can be hooked up to a prometheus server to scrap and collect metrics.

Starting metrics collection

//Start a go routine.  The default settings  exposes port 2112 on /metrics endpoint
go ProvideMetrics(settings.NewDefaultMetricsSettings())

Worker SDK collects the following metrics:

task_poll_errorClient error when polling for a task queuetaskType, includeRetries, status
task_execute_errorExecution errortaskType
task_update_errorTask status cannot be updated back to servertaskType
task_poll_counterIncremented each time polling is donetaskType
task_poll_timeTime to poll for a batch of taskstaskType
task_execute_timeTime to execute a tasktaskType
task_result_sizeRecords output payload size of a tasktaskType

Metrics on client side supplements the one collected from server in identifying the network as well as client side issues.

Next: Create and Execute Workflows