Routing Tasks (Task-to-domain)
In Conductor, tasks are executed by workers, and the Conductor server manages task distribution through a task queue. The task-to-domain feature allows tasks to be routed to specific sets of workers based on domain labels. This gives you finer control over task execution, enabling you to assign tasks to workers with specific capabilities, permissions, or configurations.
What is task-to-domain?
Task-to-domain is a concept that allows you to route task executions to a specific pool of workers via domain mapping. The domain is an arbitrary string that can be freely defined so that you can split your task traffic by application, client type, and so on.
To better understand the concept of routing tasks, consider an example task, send_email
. By default, the task queue distributes this task to all available workers, as shown:
To route this task to a specific group of workers, you can assign it a domain when triggering the workflow. The task is then routed only to workers configured for that domain.
Here, the send_email
task has two additional sets of worker instances that listen to specific domain-based tasks. For example, all the tasks triggered with domain dedicated_for_app_x
are sent to the workers configured with domain dedicated_for_app_x
.
The task-to-domain feature is useful in the following scenarios:
- Ensuring that the task is routed to a worker with the appropriate permissions.
- Load balancing or prioritizing some tasks with a set of dedicated workers.
- Implementing unique task-related configuration by domain, such as retry policy.
- Debugging a task with a worker deployed on a local machine or a worker running a different version of the code.
While these use cases can be achieved by creating separate task definitions, task-to-domain is more flexible. For example, in production environments, instead of creating new task definitions, you can use the same task definition while customizing routing based on the domain.
Routing tasks using task-to-domain
To successfully route a task by domain:
- Configure workers to poll for tasks mapped to a specific domain.
- When triggering the workflow, ensure
taskToDomain
is mapped to the correct domain.
Worker configuration
Workers must be configured to listen for tasks mapped with a specific domain. Below are examples demonstrating how to set up workers to poll for tasks in the test
domain across various programming languages:
- Java
- Python
- JavaScript
- Typescript
- Clojure
The following table shows the order of precedence when initializing the task domain for a worker. Suppose a system property is set according to the table below. In that case, it takes priority over initializing the taskToDomain map or passing the domain as an argument when using annotations. If ${TASK_NAME}
is replaced by all
in the system property name, then all workers will pick up that task domain.
Description | PropertyName | Example |
---|---|---|
System property by taskName | conductor.worker.${TASK_NAME}.domain | conductor.worker.taskName.domain=test |
System property for all workers | conductor.worker.all.domain | conductor.worker.all.domain=test |
Class TaskRunner constructor param | taskToDomain | taskToDomain=Map.of("taskName", "test") |
Annotation @WorkerTask constructor param | domain | @WorkerTask(value="taskName", domain="test") |
Code example for TaskRunner
:
Map<String, String> taskToDomains = new HashMap<>();
taskToDomains.put("taskName", "test");
Map<String, Integer> taskThreadCount = new HashMap<>();
TaskRunnerConfigurer.Builder builder = new TaskRunnerConfigurer.Builder(taskClient, workers);
TaskRunnerConfigurer taskRunner = builder.withTaskToDomain(taskToDomains).build();
Code example for @WorkerTask
:
@WorkerTask(value="taskName", domain="test")
public TaskResult sendAnnotatedTaskDomain(Task task) {
TaskResult result = new TaskResult(task);
// Populate result here
return result;
}
In the example above, we map the task taskName
to the domain test
. Only workers configured to poll for the test
domain will execute the task when the workflow is triggered. See the complete code here.
# Function Worker
def execute(task: Task) -> TaskResult:
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id='your_custom_id'
)
task_result.add_output_data('worker_style', 'function')
task_result.status = TaskResultStatus.COMPLETED
return task_result
# Class Worker
class SimpleWorker(WorkerInterface):
def execute(self, task: Task) -> TaskResult:
task_result = self.get_task_result_from_task(task)
task_result.add_output_data('worker_style', 'class')
task_result.status = TaskResultStatus.COMPLETED
return task_result
def get_polling_interval_in_seconds(self) -> float:
return 0.4
# Overriding it for specifying the DOMAIN of class workers
def get_domain(self) -> str:
return "test"
def startTaskRunnerWorkers():
configuration = Configuration(
authentication_settings=AuthenticationSettings(
key_id='key',
key_secret='secret'
),
server_api_url='https://developer.orkescloud.com/api',
debug=True
)
workers = [
Worker(
task_definition_name='task_1',
execute_function=execute,
poll_interval=0.25,
domain="test" # specifying DOMAIN for function workers
),
SimpleWorker(task_definition_name="task_2")
]
In this example, we configure the SimpleWorker
to listen for tasks with the test
domain by implementing the get_domain()
method. See additional examples.
// You can specify on the worker
export const userInfoWorker = () => {
return {
domain: "myDomain",
taskDefName: GET_USER_INFO,
execute: async ({ inputData }) => {
const userId = inputData?.userId;
return {
outputData: {
email: `${userId}@email.com`,
phoneNumber: "555-555-5555",
},
status: "COMPLETED",
};
},
};
};
// or on the Poller Option
new TaskManager(
client,
workers,
{
logger: console,
options: { concurrency: 5, pollInterval: 100, domain: "domain" },
}
);
// *Note* worker domain has precedence over the domain passed in the poller
// You can specify on the worker
export const userInfoWorker = (): ConductorWorker => {
return {
domain:"myDomain",
taskDefName: GET_USER_INFO,
execute: async ({ inputData }) => {
const userId = inputData?.userId;
return {
outputData: {
email: `${userId}@email.com`,
phoneNumber: "555-555-5555",
},
status: "COMPLETED",
};
},
};
}
// or on the Poller Option
new TaskManager(
client,
workers,
{
logger: console,
options: { concurrency: 5, pollInterval: 100, domain: "domain" },
}
)
// *Note* worker domain has precedence over the domain passed in the poller
(runner-executer-for-workers options [worker] 1 {:domain 'some-domain'})
Workflow configuration
When you start a workflow, you can specify which tasks must run on which domains.
- Using API
- Using Conductor UI
Use the Start Workflow Execution API to trigger the workflow by providing the taskToDomain
as the input payload.
POST /api/workflow/{name}
For running a workflow from the Conductor UI, define the following task-to-domain mapping:
{
"task_x": "test"
}
RBAC configuration
While configuring groups or applications in Conductor, you can add granular permissions to access specific resources. This includes granting permission to specific domains and allowing applications or groups to execute all tasks under that domain by eliminating the need to configure access for individual tasks.
To enable domain permissions:
- Go to Access Control > Applications/Groups in the left menu on your Conductor cluster.
- Select your application or group.
- Scroll down to Permissions, and select (+) Add Permission.
- Under the Domain tab, select (+) Add, and enter the domain name.
- Enable the Execute toggle.
- Select Add Permissions.
The application/group can now execute all tasks under the specified domain.
Fallback task-to-domain
A fallback domain is a secondary or backup domain that the system will use if the primary domain fails or is unreachable. These domains can only be specified when triggering a workflow, as clients polling for tasks can use only one domain at a time.
Conductor tracks the last polling time for each worker. When assigning tasks, it first checks if any active workers are available for the primary domain. If no active workers are found, the Conductor tries the next domain in the fallback sequence.
- A worker is considered active if the polled time is within the active threshold, which defaults to 10 seconds.
- Workers do not poll when they are busy doing work and resume polling after completing their tasks.
- The active threshold can be adjusted using the configuration field
conductor.app.activeWorkerLastPollTimeout
. This applies to all worker tasks, so extending the duration slows down the fallback response behavior across all tasks. - The domain of a task is determined at the time when the task is scheduled. Therefore, a domain worker becoming available after a task is scheduled will not change the domain of the already scheduled task.
A fallback mapping for `task_x’ is as follows:
{
"task_x": "test,fallback,NO_DOMAIN"
}
In this configuration,
- Conductor first assigns the task to workers in the
test
domain if available. - If no workers are active in the
test
domain, it tries thefallback
domain. - If neither
test
norfallback
have active workers, the task is assigned toNO_DOMAIN
.
NO_DOMAIN
is a generic keyword for workers with no domain.- Always use
NO_DOMAIN
as the final fallback option. - If
NO_DOMAIN
is not included, the task falls back to subsequent domains. If it reaches an inactive domain, it remains there indefinitely until workers for that domain become active. - Use the
*
token to apply domains for all tasks. This can be overridden by providing task-specific mappings along with*
.
Example
Using fallback domain
"taskToDomain": {
"*": "mydomain",
"task-a": "NO_DOMAIN",
"task-b": "abc, NO_DOMAIN",
"task-c": "someInactiveDomain1, someInactiveDomain2"
}
Here,
- The
task-a
is routed to theNO_DOMAIN
queue, meaning it doesn't have an assigned domain. - The
task-b
is routed first to theabc
domain if available, or otherwise to the default domain (NO_DOMAIN
). - The
task-c
is routed to thesomeInactiveDomain1
and then to thesomeInactiveDomain2
, but these are inactive, so they may not be processed in these domains. - All other tasks in this workflow are routed to
mydomain
.