Skip to main content

Using Workers

A worker is a piece of code responsible for executing a task. In Conductor, workers can be implemented in any language and deployed anywhere. Here is the overview for using custom workers:

  1. Create a worker project in your preferred language.
  2. Set up the worker.
  3. Use the worker task in a workflow.

Creating a worker

To create a worker, you can use Conductor’s polyglot SDKs. These SDKs contain features that make it easier to create workers, such as polling threads, metrics, and server communication.

Best practices for writing workers

In Conductor, each worker should embody the microservice design pattern and follow these basic principles:

  • Defined I/O—Each worker executes a specific task and produces a well-defined output, given specific inputs.
  • Statelessness—Workers are stateless and do not implement workflow-specific logic, like deciding what task comes next.
  • Idempotency—Workers are meant to be idempotent and should handle cases where partially-executed tasks get rescheduled for completion.
  • Decoupled implementation—Workers do not implement the logic to handle retries, timeouts, and other implementation details, which are handled by the Conductor server.

Step 1: Set up your worker project

Begin by creating a project to run workers for your custom task.

For Python projects, it is a good practice to use a virtual environment.

// Using venv
python -m venv myProject
source myProject/bin/activate

// Using virtualenv
virtualenv myProject
source myProject/bin/activate

Step 2: Get Conductor SDK

Get Conductor SDK in your preferred language.

python3 -m pip install conductor-python

Step 3: Write your worker logic

In general, workers can be instantiated from classes that implement the Worker interface, or that are annotated using a WorkerTask annotation or decorator, depending on the language.

All workers must have a task definition name, which will used for reference during task polling and execution. The task definition name may be different from the name of the function or method that the worker is responsible for.

A worker in your project may look like this:

Create a Python worker by adding a @worker_task decorator to a function.

from conductor.client.worker.worker_task import worker_task

@worker_task(task_definition_name='myTask')
def greet(name: str) -> str:
return f'Hello {name}'

Step 4: Run your worker

Start the workers using the SDK-provided interface. The interface polls the server for work, executes worker code, and updates the results back to the server.

Run the Python worker by calling a TaskHandler.

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration

task_handler = TaskHandler(configuration=Configuration())
task_handler.start_processes()
task_handler.join_processes()

Full worker example: https://github.com/conductor-oss/python-worker-container/tree/main

For more information on creating workers in your preferred language, refer to the SDK guides.

Setting up the worker

To use the worker in a workflow, you should register the worker task to the Conductor server, set up authorized access, and launch your worker project.

The worker task cannot begin execution until the worker is connected to the Conductor server. If the workflow is run, the task will remain in the Scheduled status until the worker comes online to service the task.

Register worker task

All worker tasks should be registered to the Conductor server, which is done by creating a task definition. The task definition contains configuration options for failure handling and expected input/output payloads. Tasks can be defined in UI, using API, or SDK.

To define a worker task:

  1. Go to your Orkes Conductor cluster.
  2. In the left navigation menu, go to Definitions > Task.
  3. Select (+) Define task.
  4. Enter the task details, such as the rate limits, retry settings, timeout settings, and expected inputs and outputs.
    The Name must match the task name defined previously in your code.

    Define task in Orkes Platform

  5. Select Save > Confirm Save.

Set up authorized access

In Orkes Conductor, an application account with a Worker role type enables workers to authenticate and authorize against the Conductor server. To set up authorized access, you need to add the worker to an application and grant it Execute permission.

note

For well-defined access controls, your worker application should be kept separate from your workflow client application. Learn more about proper application separation.

To set up authorized access:

  1. Configure an application account.
    1. Go to your Orkes Conductor cluster.
    2. In the left navigation menu, go to Access Control > Applications.
    3. Create a new application or select an application to which you will be adding your worker.
      Ensure that the application role has Worker enabled.

      Add worker to application account in Orkes Platform

  2. Get the application access key for your worker project.
    1. Under Access Keys, select Create access key and store your credentials securely.
    2. Set the Key ID and Key Secret in your project.
  3. Grant Execute permission to the application.
    1. Under Permissions, select Add permission.
    2. Select the Task tab and then your worker task.
    3. Enable the Execute toggle.
    4. (If task-to-domain is used) In Domain, enter the domain name used in your worker code.
    5. Select Add Permissions.

The application account can now execute the worker task.

Add permissions to application account in Orkes Platform

Launch the worker

Launch the worker to begin polling the Conductor server. The specific method depends on your language and project configuration.

Example

python3 worker.py

Using the worker task

All custom worker tasks are denoted as Simple tasks in Conductor. To use a worker task, add it to a workflow definition.

Add to workflow

To add a worker task to a workflow:

  1. Go to your Orkes Conductor cluster.
  2. In the left navigation menu, go to Definitions > Workflow.
  3. Select or create a workflow.
  4. In the visual workflow editor, select the (+) icon to add a new task. There are two ways to add a worker task:
  • Search for your task using its task name and select to add it to the workflow.
  • Add a Worker Task (Simple) and enter the task name in Task Definition.
  1. Configure the task, such as its inputs, caching, and optionality.
  2. On the top right, select Save > Confirm.

Add task in Orkes Platform

Run workflow

Run the workflow to ensure that your worker has started successfully. If not, return to the previous steps and verify that all details have been entered correctly, such as:

  • Server URL, Key ID, and Key Secret—Set in your worker project.
  • Execute permissions—Added for the worker task in your application account.
  • (if applicable) Domain—The domain in your code matches the domain used during workflow execution and in the application permissions.

Advanced topics