Skip to main content

Event

The Event task is used to publish events into eventing systems. It supports various eventing models, including AMQP, Amazon MSK, AWS SQS, Azure Service Bus, Confluent Kafka, Apache Kafka, NATS Messaging, GCP Pub/Sub, and IBM MQ.

An Event task publishes a message to an event queue or topic. The specific eventing system used depends on the configured sink. The sink parameter defines the message broker type, integration name, and queue/topic name. The task execution payload is sent to this sink, and Conductor automatically appends additional system input parameters to the payload.

note

Prerequisite

Task parameters

Configure these parameters for the Event task.

ParameterDescriptionRequired/Optional
sinkThe event queue sink in the format: “Type : Config Name : Queue/Topic Name”.

Where,
  • Type—The message broker type where the payload is being sent. Supported types:
    • amqp
    • sqs
    • azure
    • kafka
    • nats
    • gcppubsub
    • ibm_mq
  • Config Name—The integration name added to the cluster.
  • Queue/Topic Name—The name of the queue or topic where the payload is being sent.
Required.
inputParametersThe input parameters for the Event task, which can be passed as a variable or a fixed value. These parameters determine the payload sent to the event sink during task execution.Optional.

Additional system inputs to payload​

Conductor automatically adds the following parameters to the payload. Ensure that these fields are not present in the payload, as they will be overwritten during execution.

  • workflowInstanceId–Workflow ID from where this event was sent.
  • workflowType–Name of the workflow definition.
  • workflowVersion–Version of the workflow definition.
  • correlationId–Correlation ID of the workflow execution.

Example

Given the following task definition:

{
"name": "event_task",
"taskReferenceName": "event_task_ref",
"type": "EVENT",
"sink": "kafka:integration-name:topic-name",
"inputParameters": {
"myKey": "myValue",
"myNumber": 100
}
}

The execution will produce the following input parameters:

    {
"myKey": "myValue",
"myNumber": 100,
"workflowInstanceId" : "967b19ae-10d1-4b23-b0e7-ae324524dac0",
"workflowType" : "my-workflow-name",
"workflowVersion" : "1",
"correlationId" : "fbdcafd2-69c7-4b75-8dd1-653e33d48746",
}

Task configuration

This is the task configuration for an Event task.

  {
"name": "event",
"taskReferenceName": "event_ref",
"type": "EVENT",
"sink": "messaging-type:integration-name:queue-or-topic-name",
"inputParameters": {}
}

Task output

The task output mirrors the payload sent during execution, including system-appended parameters.

Adding an Event task in UI

To add an Event task:

  1. In your workflow, select the (+) icon and add an Event task.
  2. In Sink, select the required integration and append the topic/queue name. Failure to do so may result in execution errors, as the payload won't have a valid destination.
  3. (Optional) Add any additional input parameters.

Adding event task

Examples

Here are some examples for using the Event task.

Using Event task in a workflow

In this example, we’ll integrate Confluent Kafka with Orkes Conductor to publish messages to a Kafka topic.

  1. Integrate Confluent Kafka with Orkes Conductor.
  2. Create an Event Handler in Conductor.
  3. Create a Workflow with an Event task.
  4. Run Workflow.

Step 1: Integrate Confluent Kafka with Orkes Conductor

Get the configuration credentials from the Kafka console and integrate Kafka as a message broker in Conductor cluster.

Confluent Kafka sample integration

Once the integration is complete, grant the required permissions to other users in the cluster through user groups.

Step 2: Create an Event Handler in Orkes Conductor

Next, create an event handler for the added integration.

To create an event handler:

  1. Go to Definitions > Event Handlers, from the left menu in the Conductor cluster.
  2. Select + Define event handler and configure the parameters.
  3. In the event field, specify the integration in the following format.
Type : Config Name : Queue/Topic Name

For Confluent Kafka:

  • Set the Type to kafka_confluent (default).
  • Config Name is the name of the integration created in the previous step.
  • Topic Name is the Kafka topic for publishing messages.

Sample Event Handler JSON

{
"name": "event-handler-name",
"event": "kafka_confluent:confluent-kafka-test:topic_0",
"condition": "",
"actions": [
{
"action": "start_workflow",
"start_workflow": {
"name": "http-sample-test",
"version": 1,
"correlationId": "",
"input": {}
},
"expandInlineJSON": false
}
],
"active": true,
"evaluatorType": "javascript"
}
note

If creating the event handler in the Conductor UI, the drop-down selection in the Event field only lists integrations added to the cluster. Select the required integration and ensure to append the topic name.

Save the definition, and click the play button to run it.

Creating and running an event handler definition

Step 3: Create a Workflow with an Event task

This step involves creating a workflow with an Event task. Here, we utilize the Kafka topic as a sink for the event.

For testing purposes, we can quickly build a workflow using Conductor UI.

To create a workflow:

  1. Go to Definitions > Workflow, and select + Define Workflow.

  2. Add an Event task with the Sink kafka_confluent:confluent-kafka-test:topic_0.

  3. In Input parameters, add the following parameters:

    • _schema—Set it to the topic name, including the schema subject.
      To locate the schema subject name from Confluent console:

      • Go to Home > Environment > [Choose your environment].
      • Under Schema Registry, find the subject name.

        Identifying subject name in Confluent Kafka

      • Add this subject name as the input parameter: "_schema": "topic_0-value"
    • Add all fields in the topic’s schema as the input parameters as well.

      • Locate the schema for your topic by navigating to the Schema sub-tab from your topic and selecting Evolve schema.

        Identifying Schema parameters

        For example, The topic here is `topic_0`, which has the following schema:
             {
      "doc": "Sample schema to help you get started.",
      "fields": [
      {
      "doc": "The string is a unicode character sequence.",
      "name": "my_field3",
      "type": "string"
      }
      ],
      "name": "sampleRecord",
      "namespace": "com.mycorp.mynamespace", //Use a unique `name` and `namespace` to avoid any conflicts.
      "type": "record"
      }

      Ensure that each field in the schema is mapped as input parameters in the Event task.

      So, in this example, the input parameters (including the schema fields and schema subject name) are as follows :

       "inputParameters": {
      "_schema": "topic_0-value",
      "my_field3": "Some-Value-71gfy"
      },

Here’s the complete workflow definition JSON:

{
"name": "Confluent-Kafka-workflow",
"description": "Sample Workflow for Confluent Kafka Integration",
"version": 1,
"tasks": [
{
"name": "event",
"taskReferenceName": "event_ref",
"inputParameters": {
"_schema": "topic_0-value",
"my_field3": "Some-Value-71gfy"
},
"type": "EVENT",
"sink": "kafka_confluent:confluent-kafka-test:topic_name"
}
],
"schemaVersion": 2,
"ownerEmail": "john.doe@acme.com"
}
  1. Save the workflow.

Step 4: Run Workflow

The workflow can be run using different methods.

Use the Start Workflow Execution API to run the workflow.

POST /api/workflow/{name}

Once started, you can track execution progress in Executions > Workflow in the Conductor UI.

After successful execution, verify the message's delivery in the Confluent portal.

  1. From your cluster details page, navigate to Topics in the left menu.
  2. In the Messages tab, verify that the message is consumed successfully.

    Verifying the consumed message from Confluent Kafka