Skip to main content

Quickstart 2: Write Workers

In Conductor, tasks are executed using a worker-queue architecture. System tasks are serviced by Conductor workers, while custom tasks are serviced by the workers that you create. In this quickstart, you will learn how to write your own workers that will execute custom tasks.

Decoupled by design

The worker code contains your task logic, which is decoupled from both the task definition (number of retries, rate limits) and the workflow-specific task configuration (inputs from other tasks, optionality).

Worker deployment

Conductor workers can run in a cloud-native environment or on-premise. Like any other application, workers can be easily deployed in a container, VM, or bare metal.

For the purpose of this quickstart, we will deploy the worker from your own machine.

Quickstart overview

  1. Create task worker(s) that poll for scheduled tasks at regular interval
  2. Create and register task definitions for these workers.
  3. Add the custom task to the workflow definition.
  4. Grant execution permission to the worker.

Before you begin

Ensure that you have prepared your tools and access.

Create a worker application

Create a new project for your worker application, keeping it separate from your workflow client.

You can create a worker by writing a Python function and annotating it with a @worker_task decorator.

from conductor.client.worker.worker_task import worker_task

@worker_task(task_definition_name='greetings')
def greetings(name: str) -> str:
return f'Hello, {name}'

A worker can take inputs which are primitives (str, int, float, bool, and so on) or complex data classes. Here is an example worker that uses dataclass as part of the worker input.

from conductor.client.worker.worker_task import worker_task
from dataclasses import dataclass

@dataclass
class OrderInfo:
order_id: int
sku: str
quantity: int
sku_price: float


@worker_task(task_definition_name='process_order')
def process_order(order_info: OrderInfo) -> str:
return f'order: {order_info.order_id}'

Workers use a polling mechanism (with a long poll) to check for any available tasks from the server periodically. The startup and shutdown of workers are handled by the conductor.client.automator.task_handler.TaskHandler class.

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration

def main():
api_config = Configuration()

task_handler = TaskHandler(
workers=[],
configuration=api_config,
scan_for_annotated_workers=True,
import_modules=['greetings'] # import workers from this module - leave empty if all the workers are in the same module
)

# start worker polling
task_handler.start_processes()

# Call to stop the workers when the application is ready to shutdown
task_handler.stop_processes()


if __name__ == '__main__':
main()

Add worker task to a workflow

All worker tasks need to be registered to the Conductor server before it can be added to a workflow. Let’s add a worker to a workflow and give it a test run:

  1. Register the task by adding its definition in Conductor.
  2. Add a Worker task to a workflow.

A. Code

Register the task definition to Conductor.

from conductor.client.http.models.task_def import TaskDef

taskDef = TaskDef(
name="PYTHON_TASK",
description="Python Task Example",
input_keys=["a", "b"]
)
metadata_client.register_task_def(taskDef)

Add the Worker task to your workflow.

workflow >> SimpleTask("simple_task", "simple_task_ref_2")
updatedWorkflowDef = workflow.to_workflow_def()
metadata_client.update_workflow_def(updatedWorkflowDef, True)

B. Orkes Platform

  1. Register the task definition to Conductor.
    1. In the left navigation menu, go to Definitions > Task.
    2. Select (+) Define task.
    3. Enter the Name for the task, which must match the task definition name in your worker code.
    4. Select Save > Confirm Save.
  2. Add the Worker task to your workflow.
    1. In the left navigation menu, go to Definitions > Workflow and select the workflow to add the task.
    2. In the visual workflow editor, select the (+) icon to add a new task. There are two ways to add a worker task:
      • Search for your task using its task name and select it.
      • Add a Worker Task (Simple) and enter the task name in Task Definition.
    3. On the top right, select Save > Confirm.

Grant execution permission to worker

Finally, your worker application requires programmatic access to the Conductor server. This can be done by creating an application account for your worker application.

To grant execution permission to worker:

  1. In Orkes Platform, go to Access Control > Applications and create a new application.
  2. Enable the Worker application role, which allows the application to poll and update tasks.
  3. Generate the application access key and set the Key ID and Key Secret in your project environment variables.
export CONDUCTOR_SERVER_URL=<SERVER_URL> // eg: https://play.orkes.io/api
export CONDUCTOR_AUTH_KEY=<KEY_ID>
export CONDUCTOR_AUTH_SECRET=<KEY_SECRET>
  1. Grant Execute permission to the application.
    1. Under Permissions, select Add permission.
    2. Select the Task tab and then your worker task.
    3. Enable the Execute toggle.
    4. Select Add Permissions.

The application account can now execute the worker task.

Launch the worker

Launch the worker to begin polling the Conductor server. The method depends on your language and project configuration.

Example

python3 main.py

When you run the workflow with the Worker task, the task should run to completion. Learn how to deploy the workflow in the next quickstart.