Using Workers
A worker is a piece of code responsible for executing a task, which is custom logic that can be deployed anywhere. In Conductor, workers can be implemented in any language.
To create a worker task, you can use Conductor’s polyglot SDKs. These SDKs contain features that make it easier to create workers, such as polling threads, metrics, and server communication. Conductor maintains a registry of worker tasks. A task must be registered before being used in a workflow. This can be done by creating a Task Definition.
Best practices for writing workers
In Conductor, each worker should embody the microservice design pattern and follow these basic principles:
- Defined I/O—Each worker executes a specific task and produces a well-defined output, given specific inputs.
- Statelessness—Workers are stateless and do not implement workflow-specific logic, like deciding what task comes next.
- Idempotency—Workers are meant to be idempotent and should handle cases where partially-executed tasks get rescheduled for completion.
- Decoupled implementation—Workers do not implement the logic to handle retries, timeouts, and other implementation details, which are handled by the Conductor server.
Using workers in Conductor
In Conductor, you can use workers in two ways: the UI approach or the SDK-only approach. This guide will explore the UI approach.
To use a worker in Conductor (UI approach):
- Create a worker to run a task.
- Define the task in Conductor.
- Add the task to a workflow.
- Add worker to an application account and grant permissions for execution.
- Launch the worker to start.
Step 1: Create a worker task
Begin by creating a worker and defining the logic for your task. It is possible to use one worker to service multiple tasks.
Prerequisites
Ensure that you have installed the Conductor SDK for your chosen language.
- Python
- JavaScript
pip3 install conductor-python
npm i @io-orkes/conductor-javascript
Example worker task
Here is an example of a task that generates a random product inventory number. In this example, a domain is used to route the task to the correct worker.
- Python
- Java
- JavaScript
@worker_task(task_definition_name='get-inventory', domain='john')
def get_inventory(product_id: str) -> dict:
return dict(product_id = product_id, inventory = randrange(10))
@Service
public class Worker {
@WorkerTask(value = "get-inventory", domain = "john")
public Map getInventory(Map<String, String> input) {
return Map.of(
"product_id", input.get("product_id"),
"inventory", ThreadLocalRandom.current().nextInt(1, 11)
);
}
}
const {
orkesConductorClient, TaskManager,
} = require("@io-orkes/conductor-javascript");
const getInventory = {
taskDefName: 'get-inventory',
domain: 'john',
execute: async ({ inputData }) => ({
outputData: {
product_id: inputData.product_id,
inventory: 1 + Math.floor(Math.random() * 10)
},
status: "COMPLETED",
}),
};
orkesConductorClient({
keyId: process.env.CONDUCTOR_AUTH_KEY,
keySecret: process.env.CONDUCTOR_AUTH_SECRET,
serverUrl: process.env.CONDUCTOR_SERVER_URL,
}).then((client) =>
new TaskManager(client, [ getInventory ]).startPolling()
);
For more information on creating workers in your preferred language, refer to the SDK guides.
Step 2: Define the task in Conductor
All worker tasks must be defined in Conductor before they can be used in a workflow. Tasks can be defined on Orkes Platform or using an API.
To define a worker task:
- On Orkes Platform
- Using API
- Go to your Conductor cluster on Orkes Platform.
- In the left navigation menu, go to Definitions > Task.
- Select (+) Define task.
- Enter the task details, such as the rate limits, retry settings, timeout settings, and expected inputs and outputs. <br> The Name must match the task name defined previously in your code.
- Select Save > Confirm Save.
Refer to the Create Task Definition API.
Step 3: Add the task to a workflow
Once the task is defined, it can be added to a workflow definition.
To add a worker task to a workflow:
- On Orkes Platform
- Using API
- Go to your Conductor cluster on Orkes Platform.
- In the left navigation menu, go to Definitions > Workflow.
- Select or create a workflow.
- In the visual workflow editor, select the (+) icon to add a new task. There are two ways to add a worker task:
- Search for your task using its task name and select to add it to the workflow.
- Add a Worker Task (Simple) and enter the task name in Task Definition.
- Configure the task, such as its inputs, caching, and optionality.
- On the top right, select Save > Confirm.
Refer to the Create Workflow Definition or Update Workflow Definition APIs.
The task cannot begin execution until the worker is connected to the Conductor server. If the workflow is run, the custom task will remain in the Scheduled state until the worker comes online to service the task. This is achieved by creating the appropriate application accounts and permissions for the worker.
Step 4: Add worker to an application account
In Orkes, an application account with a Worker role type enables workers to authenticate and authorize against the Conductor server. To start a worker, you need to add the worker to an application account and grant Execute permission to the application.
To add worker to application account:
- Go to the application account.
- Go to your Conductor cluster on Orkes Platform.
- In the left navigation menu, go to Access Control > Applications.
- Select an application that you will be adding your worker to. Otherwise, create an application. <br> Ensure that the application role has Worker enabled.
- Get the application access key for your worker task.
- Under Access Keys, select Create access key and store your credentials securely.
- Set the Key ID and Key Secret in your environment variables.
export CONDUCTOR_SERVER_URL=<SERVER_URL> // eg: https://play.orkes.io/api
export CONDUCTOR_AUTH_KEY=<KEY_ID>
export CONDUCTOR_AUTH_SECRET=<KEY_SECRET>
- Grant Execute permission to the application.
- Under Permissions, select Add permission.
- Select the Task tab and then your worker task.
- Enable the Execute toggle.
- (If Task to Domain is used) In Domain, enter the domain name used in your worker code.
- Select Add Permissions.
The application account can now execute the worker task.
For more information on application accounts, refer to Enabling Security via Applications.
Step 5: Launch the worker
You must launch the worker project before it can start executing the custom task.
To launch the worker: Start your project using the respective method of your language and editor.
- Python
- Java
- JavaScript
python3 worker.py
gradle bootRun // using Gradle
mvn spring-boot:run // using Maven
node index.js
Run the workflow to ensure that it starts up successfully. If not, return to the previous steps and verify that all details have been entered correctly, including:
- Server URL, Key ID, and Key Secret— Added to your environment variables.
- Execute permissions—Added for the worker task in your application account.
- (if applicable) Domain—The domain in your code matches the domain in the run mapping and application permissions.
// run mapping during workflow execution
{ "get-inventory": "john" }