Skip to main content

Event

The Event task is used to publish events into eventing systems. It supports various message brokers, including AMQP, Amazon MSK, AWS SQS, Azure Service Bus, Confluent Kafka, Apache Kafka, NATS Messaging, GCP Pub/Sub, and IBM MQ.

An Event task publishes a message to an event queue or topic. The specific eventing system used depends on the configured sink. The sink parameter defines the message broker type, integration name, and queue/topic name. The task execution payload is sent to this sink, and Conductor automatically appends additional system input parameters to the payload.

Prerequisites

Task parameters

Configure these parameters for the Event task.

ParameterDescriptionRequired/ Optional
sinkThe event queue sink in the format:
message-broker-type:integration-name:topic/queue-name
Where,
  • message-broker-type—The message broker type where the payload is being sent. Supported types:
    • amqp
    • sqs
    • azure
    • kafka
    • nats
    • gcp_pubsub
    • ibm_mq
  • integration-name—The integration name added to the cluster.
  • topic/queue-name Name—The name of the queue or topic where the payload is being sent.
Required.
inputParametersThe input parameters for the Event task, which can be passed as a dynamic input or a fixed value. These parameters determine the payload sent to the event sink during task execution.Optional.

The following are generic configuration parameters that can be applied to the task and are not specific to the Event task.

Schema parameters

You can enforce input/output validation for the task using the following parameters. Refer to Schema Validation for a full guide.

ParameterDescriptionRequired/ Optional
taskDefinition.enforceSchemaWhether to enforce schema validation for task inputs/outputs. Set to true to enable validation.Optional.
taskDefinition.inputSchemaThe name and type of the input schema to be associated with the task.Required if enforceSchema is set to true.
taskDefinition.outputSchemaThe name and type of the output schema to be associated with the task.Required if enforceSchema is set to true.
Other generic parameters

Here are other parameters for configuring the task behavior.

ParameterDescriptionRequired/ Optional
optionalWhether the task is optional.

If set to true, any task failure is ignored, and the workflow continues with the task status updated to COMPLETED_WITH_ERRORS. However, the task must reach a terminal state. If the task remains incomplete, the workflow waits until it reaches a terminal state before proceeding.
Optional.
asyncCompleteWhether the task is completed asynchronously. The default value is false.
  • false—Task status is set to COMPLETED upon successful execution.
  • true—Task status is kept as IN_PROGRESS until an external event marks it as complete.
Available since
v4.1.40 and later
Optional.

Additional system inputs to payload​

Conductor automatically adds the following parameters to the payload. Ensure that these fields are not present in the payload, as they will be overwritten during execution.

  • workflowInstanceId–Workflow (execution) ID from where this event was sent.
  • workflowType–Name of the workflow definition.
  • workflowVersion–Version of the workflow definition.
  • correlationId–Correlation ID of the workflow execution.

Example

Given the following task configuration:

{
"name": "event_task",
"taskReferenceName": "event_task_ref",
"type": "EVENT",
"sink": "kafka:integration-name:topic-name",
"inputParameters": {
"myKey": "myValue",
"myNumber": 100
}
}

The execution will produce the following input parameters:

{
"myKey": "myValue",
"myNumber": 100,
"workflowInstanceId": "967b19ae-10d1-4b23-b0e7-ae324524dac0",
"workflowType": "my-workflow-name",
"workflowVersion": "1",
"correlationId": "fbdcafd2-69c7-4b75-8dd1-653e33d48746"
}

Task configuration

This is the task configuration for an Event task.

{
"name": "event",
"taskReferenceName": "event_ref",
"type": "EVENT",
"sink": "messaging-type:integration-name:queue-or-topic-name",
"inputParameters": {}
}

Task output

The task output mirrors the payload sent during execution, including system-appended parameters. For example:

{
"eventId": "13a6bdfa-e375-4e87-af46-8ba66572ad9c",
"sink": "messaging-type:integration-name:queue-or-topic-name",
"workflowType": "eventTest",
"correlationId": null,
"workflowVersion": 1,
"_createdBy": "john.doe@acme.com",
"workflowInstanceId": "2610b755-c147-11f0-92e1-ae47cf6bc1ce"
}

Adding an Event task in UI

To add an Event task:

  1. In your workflow, select the (+) icon and add an Event task.
  2. In Sink, select the required integration and append the topic/queue name. Failure to do so may result in execution errors, as the payload won't have a valid destination.
  3. (Optional) Add any additional input parameters.

Adding event task

Examples

Here are some examples for using the Event task.

Using an Event task in a workflow

In this example, we will integrate AWS SQS with Orkes Conductor to publish messages to an SQS queue.

  1. Integrate AWS SQS with Orkes Conductor.
  2. Create a Workflow with an Event task.
  3. Run Workflow.
  4. Verify message delivery in AWS SQS.

Step 1: Integrate AWS SQS with Orkes Conductor

In this example, you integrate AWS SQS using the connection type “Access Key/Secret.” For this, you need the following credentials from your AWS account:

  • Region: The region of your AWS account. For example, us-east-1.
  • Account ID: The account ID available in your AWS account details.
  • Access Key and Access Secret: Create an IAM user with the AmazonSQSFullAccess permission, and then generate the access credentials.
  • SQS queue: A queue to receive events from Conductor.

Once you have these credentials, add the integration to your Conductor cluster.

  1. Go to Integrations from the left navigation menu on your Conductor cluster.
  2. Create a new integration for Amazon SQS using the credentials you retrieved.

AWS SQS sample integration

Step 2: Create a Workflow with an Event task

This step involves creating a workflow with an Event task. Here, you use the AWS SQS queue as the sink for the event.

For testing purposes, you can quickly build a workflow using the Conductor UI.

To create a workflow:

  1. Go to Definitions > Workflow, and select + Define Workflow.
  2. Add an Event task with the Sink sqs:<your-integration-name>:<your-queue-name>.

Here’s the complete workflow definition JSON:

{
"name": "eventTest",
"description": "A workflow that sends workflow details to AWS SQS queue",
"version": 1,
"tasks": [
{
"name": "event",
"taskReferenceName": "event_ref",
"type": "EVENT",
"sink": "sqs:<your-integration-name>:<your-queue-name>" //Replace with your integration and queue name
}
],
"schemaVersion": 2
}
  1. Save the workflow.

Step 3: Run Workflow

To run the workflow, select the Execute button from the workflow definition page.

Running workflow from Conductor UI

This takes you to the workflow execution page, where you can view the workflow status. Once the workflow is complete, you can view the workflow output payload sent to the SQS queue.

Workflow output passed into SQS queue

Step 4: Verify message delivery in AWS SQS

Next, verify the message delivery in the SQS console.

  1. In your Amazon SQS console, go to Amazon SQS > Queues > Select your queue.
  2. Select Send and receive messages.

Viewing the SQS queue in SQS console

  1. Scroll down to Receive messages and select Poll for messages.

Polling for messages from the queue

  1. The message ID appears, which is the eventId passed from Conductor. Select it to view the message body, which is the output of the Conductor workflow.

SQS message body received

  1. In the Attributes tab, you can view the task ID and the workflow execution ID.

Attribute containing the task and workflow execution ID

That’s it, you have sent the message to the SQS queue. You can extend this workflow based on your requirements by customizing the Event task’s input to suit your use case.