Using Event Handlers
Event handlers in Orkes Conductor listen for messages from external systems (like Kafka, SQS, or AMQP) and trigger workflow actions based on configurable conditions. You can use them to:
- Start a new workflow
- Terminate a workflow
- Complete a task
- Fail a task
- Update variables
Configuring event handlers
Before configuring an event handler, ensure you’ve integrated the appropriate message broker with your Conductor cluster.
To configure an event handler:
- Go to Definitions > Event Handler from the left menu on your Conductor cluster.
- Select + Define event handler.
- Enter the event handler parameters:
Parameter | Description | Required/ Optional |
---|---|---|
name | A unique name for the event handler. | Required. |
description | A description of the event handler. | Optional. |
event | The event queue sink in the format: message-broker-type:integration-name:topic/queue-name. Where,
Warning While configuring via Conductor UI, the Event field only lists message broker integrations configured in your cluster. Select the required integration and append the topic name or queue name. Failure to do so may result in execution errors, as the payload won't have a valid destination. | Required. |
condition | An ECMAScript (JavaScript) function to control message processing. The function should return true for the message to be processed. Learn more about filtering events. | Required. |
evaluatorType | The type of evaluator for the condition. Currently supports javascript . | Required. |
actions | An array of actions to perform. Each action requires specific input parameters. Supported actions include: | Required. |
active | Whether the event handler is enabled. Set to true to enable the event handler or false to disable it. | Required. |
- Select Save > Confirm Save.
Example JSON Schema
{
"name": "sample-event-handler",
"description": "Sample event handler",
"event": "message-type:integration-name:queue/topic-name",
"evaluatorType": "javascript",
"condition": "true",
"actions": [
// action payload
],
"active": true
}
Configuring conditions to filter events
Use ECMAScript expressions to filter events based on their payload. Use ${$}
to reference the entire payload.
Example
Given the following payload:
{
"fileType": "AUDIO",
"version": 3,
"metadata": {
"length": 300,
"codec": "aac"
}
}
The following expressions can be used in condition with the indicated results:
Expression | Result |
---|---|
$.version > 1 | true |
$.version > 10 | false |
$.metadata.length == 300 | true |
Configuring actions
Multiple actions can be triggered based on the events received. Below are the actions and their configuration details.
Complete Task
Mark a task as complete using one of the following methods:
- Using
workflowId
andTaskRefName
. - Using
taskId
.
Example Payload
- Using workflowId and TaskRefName
- Using taskId
{
"name": "sample-event-handler",
"description": "Sample event handler",
"event": "kafka:sampleConfig:sampleName",
"evaluatorType": "javascript",
"condition": "true",
"actions": [
{
"action": "complete_task",
"expandInlineJSON": false,
"complete_task": {
"workflowId": "${workflowId}",
"taskRefName": "${taskReferenceName}",
"output": {
"ket": "value"
}
}
}
]
}
{
"name": "sample-event-handler",
"description": "Sample event handler",
"event": "kafka:sampleConfig:sampleName",
"evaluatorType": "javascript",
"condition": "true",
"actions": [
{
"action": "complete_task",
"expandInlineJSON": false,
"complete_task": {
"taskId": "${taskId}",
"output": {
"key": "value"
}
}
}
]
}
Parameters
Parameter | Description | Required/ Optional |
---|---|---|
actions. action | The action to be triggered on receiving events. Set this to complete_task . | Required. |
actions. complete_task. workflowId | The ID of the workflow that contains the task to be completed. | Required if using workflowId and taskRefName method. |
actions. complete_task. taskRefName | The reference name of the task to be marked as completed. | Required if using workflowId and taskRefName method. |
actions. complete_task. taskId | The task execution ID of the task to be marked as completed. | Required if using taskId method. |
actions. complete_task. output | The output data to be sent along with the completion. Can be string, number, boolean, null, or object/array. | Optional. |
Terminate Workflow
Terminates a running workflow.
Example Payload
{
"name": "sample-event-handler",
"description": "Sample event handler",
"event": "kafka:sampleConfig:sampleName",
"evaluatorType": "javascript",
"condition": "true",
"actions": [
{
"action": "terminate_workflow",
"expandInlineJSON": false,
"terminate_workflow": {
"workflowId": "${event.payload.workflowId)",
"terminationReason": "A termination reason"
}
}
]
}
Parameters
Parameter | Description | Required/ Optional |
---|---|---|
actions. action | The action to be triggered on receiving events. Set this to terminate_workflow . | Required. |
actions. terminate_workflow. workflowId | The ID of the workflow to be terminated. | Required. |
actions. terminate_workflow. terminationReason | The reason for termination. | Required. |
Update Variables
Updates variables in a running workflow. Useful for controlling inputs in a long-running workflow.
Example Payload
{
"name": "sample-event-handler",
"description": "Sample event handler",
"event": "kafka:sampleConfig:sampleName",
"evaluatorType": "javascript",
"condition": "true",
"actions": [
{
"action": "update_workflow_variables",
"expandInlineJSON": false,
"update_workflow_variables": {
"workflowId": "${targetWorkflowId}",
"appendArray": true,
"variables": {
"key": "value"
}
}
}
]
}
Parameters
Parameter | Description | Required/ Optional |
---|---|---|
actions. action | The action to be triggered on receiving events. Set this to update_workflow_variables . | Required. |
actions. update_workflow_variables. workflowId | The ID of the workflow whose variables need to be updated. | Required. |
actions. update_workflow_variables. appendArray | If set to true , all list (array) variables in the workflow are appended with new values instead of being replaced. This can be used to collect data from a series of events into a single workflow. | Optional. |
actions. update_workflow_variables. variables | The variables to be updated in the workflow. Can be string, number, boolean, null, or object/array. | Required. |
Fail Task
Marks a task as failed using one of the following methods:
- Using
workflowId
andTaskRefName
. - Using
taskId
.
Example Payload
- Using workflowId and TaskRefName
- Using taskId
{
"name": "sample-event-handler",
"description": "Sample event handler",
"event": "kafka:sampleConfig:sampleName",
"evaluatorType": "javascript",
"condition": "true",
"actions": [
{
"action": "fail_task",
"expandInlineJSON": false,
"fail_task": {
"workflowId": "${workflowId}",
"taskRefName": "${taskReferenceName}",
"output": {
"key": "value"
}
}
}
]
}
{
"name": "sample-event-handler",
"description": "Sample event handler",
"event": "kafka:sampleConfig:sampleName",
"evaluatorType": "javascript",
"condition": "true",
"actions": [
{
"action": "fail_task",
"expandInlineJSON": false,
"fail_task": {
"taskId": "${taskId}",
"output": {
"key": "value"
}
}
}
]
}
Parameters
Parameter | Description | Required/ Optional |
---|---|---|
actions. action | The action to be triggered on receiving events. Set this to fail_task . | Required. |
actions. complete_task. workflowId | The ID of the workflow that contains the task to be marked as failed. | Required if using workflowId and taskRefName method. |
actions. complete_task. taskRefName | The reference name of the task to be marked as failed. | Required if using workflowId and taskRefName method. |
actions. complete_task. taskId | The task execution ID of the task to be marked as failed. | Required if using taskId method. |
actions. complete_task. output | The output data to be sent along with the completion. Can be string, number, boolean, null, or object/array. | Optional. |
Start Workflow
Starts a new workflow instance.
Example Payload
{
"name": "sample-event-handler",
"description": "Sample event handler",
"event": "kafka:sampleConfig:sampleName",
"evaluatorType": "javascript",
"condition": "true",
"actions": [
{
"action": "start_workflow",
"start_workflow": {
"name": "workflow-name",
"version": "1",
"correlationId": "1234",
"idempotencyKey": "xxxxxx",
"input": {
"key": "value"
},
"taskToDomain": {
"key": "value"
},
"idempotencyStrategy": "RETURN_EXISTING"
},
"expandInlineJSON": false
}
]
}
Parameters
Parameter | Description | Required/ Optional |
---|---|---|
actions. action | The action to be triggered on receiving events. Set this to start_workflow . | Required. |
actions. start_workflow. name | The name of the workflow to be executed. This workflow should have a pre-existing definition in Conductor. | Required. |
actions. start_workflow. version | The version of the workflow to be executed. If unspecified, the latest version will be used. | Required. |
actions. start_workflow. correlationId | A unique identifier for the workflow execution, used to correlate the current workflow instance with other workflows. | Optional. |
actions. start_workflow. idempotencyKey | A unique, user-generated key to prevent duplicate workflow executions. Idempotency data is retained for the life of the workflow execution. | Optional. |
actions. start_workflow. idempotencyStrategy | The idempotency strategy for handling duplicate requests. Supported values:
| Required if idempotencyKey is used. |
actions. start_workflow. input | The input data to be passed to the new workflow. Can be string, number, boolean, null, or object/array. | Optional. |
actions. start_workflow. taskToDomain | A mapping of task reference names to domain-specific values to route the task to defined workers. | Optional. |