Skip to main content

Enabling CDC (Change Data Capture) on Conductor Workflows

Change Data Capture (CDC) is a design pattern for tracking changes in the source data and replicating the changes to the target systems.

This document outlines Orkes Conductor’s CDC functionality, which enables sending workflow state updates to eventing systems like Kafka, AWS SQS, AMQP, etc.

The major steps for enabling CDC on Conductor workflow include:

  1. Add Eventing Integration in Orkes Conductor.
  2. Configuring CDC Parameters in Conductor Workflows.
  3. Execute Workflows.
  4. Verify Changes in Eventing Systems.

Add Eventing Integration in Orkes Conductor

The first step in enabling workflow CDC is to add the required integration in Orkes Conductor.

Supported integrations include:

AMQP

Steps to integrate AMQP with Orkes Conductor.

Get Configuration Credentials from AMQP

Before beginning the integration process in Orkes Conductor, you must obtain specific configuration credentials from AMQP, such as protocol, username, password, host, port, and virtual host.

Refer to the official AMQP documentation on how to get these configuration parameters.

Integrating with AMQP as a Message Broker

Now, you have the required configuration credentials from AMQP. Let’s integrate with Orkes Conductor.

  1. Navigate to Integrations from the left menu on the Conductor cluster.
  2. Click + New integration from the top-right corner of your window.
  3. Under the Message Broker section, choose AMQP.
  4. Click +Add and provide the following parameters:

Integration configuration for AMQP

ParemetersDescription
Integration NameA name to identify your integration.
ProtocolSpecify the communication protocol to be used. It can be ‘amqp’ or ‘amqps’ (Recommended for secure connections).
UsernameSpecify the username to authenticate and authorize the connection.
PasswordSpecify the password associated with the provided username.
HostThe hostname of the server where the message broker is running.
PortThe port number on the host where the message broker is running. The default port for AMQPS is 5671, and the default port for AMQP is 5672.
Virtual HostSpecify the virtual host namespace. In AMQP, a virtual host is a namespace that allows multiple messaging environments to coexist within a single broker.
DescriptionProvide a description of the integration.
  1. You can toggle on the Active button to activate the integration instantly.
  2. Click Save.

AWS SQS

Steps to integrate AWS SQS with Orkes Conductor.

Get Configuration Credentials from AWS SQS

Before beginning the integration process in Orkes Conductor, you must get specific configuration credentials from your Amazon SQS account.

  • AWS Account ID & region where the SQS is located.
  • Amazon Resource Name (ARN) to identify & access the queue. ARN is generally of the format arn:aws:sqs:region:account-id:queue-name.
  • External ID - When you assume a role belonging to another account in AWS, you need to provide the external ID, an ID that can be used in an IAM role trust policy to designate the person to assume the role. Learn more.
  • Access Key & Secret from AWS SQS account.

Refer to the AWS SQS official documentation on how to get these credentials.

Integrating with AWS SQS as a Message Broker

Now, you have the required configuration credentials from AWS SQS. Let’s integrate with Orkes Conductor.

  1. Navigate to Integrations from the left menu on the Conductor cluster.
  2. Click + New integration from the top-right corner of your window.
  3. Under the Message Broker section, choose AWS SQS. Click +Add and provide the following parameters:

Integration configuration for AWS SQS

ParametersDescription
NameProvide a name to identify your event handler definition.
Connection TypeChoose the required connection type. Depending upon how the connection is to be established, it can take the following values:
  • Current Conductor Role - Choose this if you are using the current Conductor role to establish the connection.
  • Assume External Role - Choose this if you are assuming a role belonging to another AWS account. Learn more.
  • Access Key/Secret - Choose this if you are establishing the connection using the access key and secret.
RegionProvide the valid AWS region where the SQS is located.
Account IDProvide your AWS Account ID. This field is optional.

Note: If ARN is not used for the “Sink” in the workflow definition, the account ID should be used.
Role ARNSpecify the Amazon Resource Name (ARN) required for setting up the connection.

Note: This field is applicable only if the Connection Type is chosen as Assume External Role.
External IDIf applicable, provide the external ID for assuming the role.

Note:This field is applicable only if the Connection Type is chosen as Assume External Role.
Access KeyProvide the AWS Access Key.

Note:This field is applicable only if the Connection Type is chosen as Access Key/Secret.
Access SecretProvide the AWS Access Secret.

Note:This field is applicable only if the Connection Type is chosen as Access Key/Secret.
DescriptionProvide a description of the integration.
  1. You can toggle on the Active button to activate the integration instantly.
  2. Click Save.

Azure Service Bus

Steps to integrate Azure Service Bus with Orkes Conductor.

Get Configuration Credentials from Azure Service Bus

Before beginning the integration process in Orkes Conductor, you must get specific configuration credentials, such as the endpoint and namespace of the service bus.

Integrating with Azure Service Bus as a Message Broker

Now, you have the required configuration credentials from Azure Service Bus. Let’s integrate with Orkes Conductor.

  1. Navigate to Integrations from the left menu on the Conductor cluster.
  2. Click + New integration from the top-right corner of your window.
  3. Under the Message Broker section, choose Azure Service Bus.
  4. Click +Add and provide the following parameters:

Integration configuration for Azure Service Bus

ParemetersDescription
Integration NameA name to identify your integration.
Connection TypeChoose the required connection type. It can take the following values:
  • Connection String
  • Password Less
EndpointProvide the endpoint of the service bus. Refer to the previous section on how to get this.

Note: This field is applicable only if the Connection Type is chosen as Connection String.
NamespaceProvide the namespace of the service bus. Refer to the previous section on how to get this.

Note: This field is applicable only if the Connection Type is chosen as Password Less.
DescriptionProvide a description of the integration.
  1. You can toggle on the Active button to activate the integration instantly.
  2. Click Save.

Kafka (Apache Kafka, Amazon MSK, Confluent Kafka)

Steps to integrate Kafka with Orkes Conductor.

Get Configuration Credentials from Apache Kafka

Before beginning the integration process in Orkes Conductor, you must obtain specific configuration credentials from the Kafka cluster, such as the Bootstrap server, Schema registry URL, and API keys.

The configuration steps vary depending on the type of Kafka cluster to be integrated.

Case - 1: Getting configuration credentials from self-managed Apache Kafka.

Refer to the official Apache Kafka documentation for setting up Apache Kafka locally. Get the bootstrap server and API keys & secrets for integrating with Conductor.

Case - 2: Getting configuration credentials from Confluent Kafka.

To obtain the API keys from Confluent Kafka:

  1. From the Confluent Cloud portal, choose the cluster to be integrated with Orkes Conductor and navigate to Cluster Overview > API Keys.
  2. Create a new key by clicking Create Key/+Add key and selecting the required access (Global access/Granular access).
  3. Note down the values for the key and secret.

Generating API Keys from Confluent Cloud

To get the Bootstrap server from Confluent Kafka:

  1. Navigate to Cluster Overview > Cluster Settings > Endpoints and copy the Bootstrap server.

Getting Bootstrap token from Confluent Cloud

  1. Then, navigate to Topics to see the list of topics on this cluster and identify the Topic name to use for this integration.

Topics in Confluent Cloud

To get the Schema registry server, API key & secret (This is only required if you are integrating with a schema registry):

  1. Go to Clients > Add new client.
  2. Under the “Copy the configuration snippet for your clients” section, copy the Schema Registry URL and download the Schema Registry API Key. The downloaded file will contain the Schema Registry API key and secret.

Getting Schema Registry URL

Case - 3: Getting configuration credentials from Amazon MSK.

To get the Bootstrap server:

  1. Login to Amazon MSK console.
  2. Once logged in, the table lists all the clusters under the account for the current region.
  3. Choose your cluster, and click View client information on the cluster summary page. This gives the bootstrap broker and the Apache ZooKeeper connection string.

Refer to the official Amazon MSK documentation for more details.

Integrating with Apache Kafka as a Message Broker

Once you have the configuration credentials from the Kafka cluster, let’s integrate with Orkes Conductor.

  1. Navigate to Integrations from the left menu on the Conductor cluster.
  2. Click + New integration from the top-right corner of your window.
  3. Under the Message Broker section, choose Apache Kafka.
  4. Click +Add and provide the following parameters:

Integration configuration for Apache Kafka

ParametersDescription
Integration NameA name to identify the integration.
Bootstrap ServerProvide the bootstrap server of the Apache Kafka cluster.
Sending ProtocolChoose the required sending protocol for the integration. It can take two values:
  • String - Messages are sent as simple string data.
  • AVRO - Messages are serialized using AVRO. [Not supported for Amazon MSK clusters]
If you are integrating with a schema registry, choose AVRO as the sending protocol.
Connection SecurityChoose the security mechanism for connecting to the Kafka cluster. It can take values:
  • SASL_SSL / PLAIN - Secure connection using SASL (Simple Authentication and Security Layer) with SSL encryption. [Not supported by Amazon MSK].
  • SASL_SSL / SCRAM-SHA-256 / JKS - Secure connection using SASL with SCRAM-SHA-256 authentication and SSL encryption. [Not supported by Amazon MSK].
  • SASL_SSL/SCRAM-SHA-512 - Secure connection using SASL with SCRAM-SHA-512 authentication and SSL encryption. [Supported only for Amazon MSK].
  • PLAIN TEXT - Plain text connection without any encryption or authentication.
UsernameIf authentication is enabled (SASL_SSL), provide the username to authenticate with the Kafka cluster.

Note: For AVRO configuration, provide the API key copied previously as the username.
PasswordProvide the password associated with the username to authenticate the connection.

Note: For AVRO configuration, provide the API secret copied previously as the password.
Schema Registry URLProvide the Schema Registry URL from the Apache Kafka console.

Notes:
  • This field is only applicable if the Sending Protocol is chosen as AVRO.
  • Not supported for Amazon MSK clusters.
Schema Registry Auth TypeSpecifies the authentication mechanism for connecting to the schema registry. It can be of the following types:
  • Password in URL
  • Schema Registry User Info (Key/Password)
  • NONE
Notes:
  • This field is only applicable if the Sending Protocol is chosen as AVRO.
  • Not supported for Amazon MSK clusters.
Schema Registry API KeyProvide the Schema Registry API Key from the Kafka console.

Notes:
  • This field is only applicable if the Sending Protocol is chosen as AVRO.
  • Not supported for Amazon MSK clusters.
Schema Registry API SecretProvide the Schema Registry API Secret from the Kafka console.

Notes:
  • This field is only applicable if the Sending Protocol is chosen as AVRO.
  • Not supported for Amazon MSK clusters.
Value Subject Name StrategyDefines the strategy for constructing the subject name under which the AVRO schema will be registered in the schema registry. It can take the following values:
  • io.confluent.kafka.serializers.subject.TopicNameStrategy
  • io.confluent.kafka.serializers.subject.RecordNameStrategy
  • io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Notes:
  • This field is only applicable for AVRO configuration.
  • Not supported for Amazon MSK clusters.
Truststore typeIf SSL encryption is enabled, provide the type and password for the trust store containing the CA certificates used to verify the Kafka broker's SSL certificate. It can be of the following types:
  • NONE
  • JKS - Upload the Java JKS trust store file with CAs.
  • PEM - Upload the PEM certificate file
Note: Not supported for Amazon MSK clusters.
Trust Store PasswordIf the trust store type is JKS, provide the password for the trust store.

Note: Not supported for Amazon MSK clusters.
Consumer Group IDEnter the Consumer Group ID from Kafka. This unique identifier helps manage message processing, load balancing, and fault tolerance within consumer groups.
DescriptionProvide a description of the integration.
  1. You can toggle on the Active button to activate the integration instantly.
  2. Click Save.

NATS Messaging

Steps to integrate NATS Messaging with Orkes Conductor.

Get Configuration Credentials from NATS Messaging

Pre-Requisites

You must set up NATS messaging before integrating with Orkes Conductor. Refer to the NATS Messaging official documentation for more details.

Get the following credentials from the NATS server:

Integrating with NATS Messaging as a Message Broker

Once you have the required configuration credentials from NATS Messaging, let’s integrate with Orkes Conductor.

  1. Navigate to Integrations from the left menu on the Conductor cluster.
  2. Click + New integration from the top-right corner of your window.
  3. Under the Message Broker section, choose NATS Messaging.
  4. Click +Add and provide the following parameters:

Integration configuration for NATS Messaging

ParemetersDescription
Integration NameA name to identify your integration.
ServerProvide the NATS server name to be integrated with Orkes Conductor.
Connection TypeChoose the required connection type for the integration. It can take two values:
  • Default
  • Jetstream
Authentication TypeChoose the required authentication type. You can opt for With Credentials or Without Credentials.
UsernameProvide the username for authentication. This field is required only if the Authentication Type is chosen as With Credentials.
PasswordProvide the password for authentication. This field is required only if the Authentication Type is chosen as With Credentials.
DescriptionProvide a description of the integration.
  1. You can toggle on the Active button to activate the integration instantly.
  2. Click Save.

GCP Pub Sub

Steps to integrate GCP Pub Sub with Orkes Conductor.

Get Configuration Credentials from GCP Pub Sub

Before beginning the integration process in Orkes Conductor, you must get specific configuration credentials such as project ID, subscription ID, and Service Account JSON from the GCP console.

To get the project ID:

  1. Login to Google Cloud Console and create a project.
  2. Click the drop-down menu on the top left of the console to select your desired project.
  3. The Project ID will be displayed on the dashboard below the project name.

Get project ID from Google Cloud Console

Refer to the official documentation on creating and managing projects in GCP for more details.

To get the subscription ID:

  1. Go to the Pub/Sub section in the Cloud Console. (From the left menu navigation, go to Products & solutions > Categories - Analytics > Pub/Sub)
  2. From the left menu, click Subscriptions and choose the subscription you want to use, or create a new one.
  3. The Subscription ID will be displayed as shown below:

Get subscription ID from Google Cloud Console

To get the Service Account JSON:

  1. From the left menu, navigate to the IAM & Admin section.
  2. Select Service Accounts from the left menu.
  3. Click on an existing service account you want to use or create a new one.
  4. Under the Keys sub-tab, click Add Key.

Get Service Account JSON from Google Cloud Console

  1. Choose the option Create new key.
  2. Choose the key type as JSON and click Create to generate the JSON key.

Get Service Account JSON key from Google Cloud Console

Integrating with GCP Pub Sub as a Message Broker

Once you have the required configuration credentials from GCP Pub Sub, let’s integrate with Orkes Conductor.

  1. Navigate to Integrations from the left menu on the Conductor cluster.
  2. Click + New integration from the top-right corner of your window.
  3. Under the Message Broker section, choose GCP Pub Sub.
  4. Click +Add and provide the following parameters:

Integration configuration for GCP Pub Sub

ParemetersDescription
Integration NameA name to identify your integration.
Project IDProvide the project ID containing the topic. Refer to the previous section on how to get the project ID.
Subscription IDProvide the subscription ID. Refer to the previous section on how to get the subscription ID.
Upload Service Account JSONUpload the Service Account JSON file, which is a key file containing the credentials for authenticating the Orkes Conductor cluster with the GCP Pub Sub services. Refer to the previous section on how to generate the service account JSON.
DescriptionProvide a description of the integration.
  1. You can toggle on the Active button to activate the integration instantly.
  2. Click Save.

IBM MQ

Steps to integrate IBM MQ with Orkes Conductor.

Get Configuration Credentials from IBM MQ

Before beginning the integration process, you must obtain certain configuration parameters from the IBM MQ console, such as hostname, port, queue manager, channel, etc.

You can get the host name & port while setting up IBM MQ.

Refer to the official IBM documentation for more configuration details.

Integrating with IBM MQ as a Message Broker

Once you have the required configuration credentials from IBM MQ, let’s integrate with Orkes Conductor.

  1. Navigate to Integrations from the left menu on the Conductor cluster.
  2. Click + New integration from the top-right corner of your window.
  3. Under the Message Broker section, choose IBM MQ.
  4. Click +Add and provide the following parameters:

Integration configuration for IBM MQ

ParametersDescription
Integration NameA name to identify your integration.
Host NameThe hostname or IP address of the IBM MQ server.
PortThe port number on which the IBM MQ server is configured to listen for incoming connections. The default port for IBM MQ is 1414, but it usually varies with the required connection.
Queue ManagerSpecify the queue manager to which Orkes Conductor will connect.

The queue manager should already be configured in your IBM MQ environment. Check out IBM MQ’s official documentation on configuring queue manager.
ChannelIBM MQ uses channels to establish connections between clients and queue managers. Specify the channel name the Conductor will use to communicate with IBM MQ.
UserThe username to authenticate the connection with the IBM MQ server.
PasswordThe password associated with the username to authenticate the connection with the IBM MQ server.
DescriptionProvide a description of the integration.
  1. You can toggle on the Active button to activate the integration instantly.
  2. Click Save.

Configuring CDC Parameters in Conductor Workflows

The next step is to configure CDC parameters in the workflow.

To set CDC parameters:

  1. Create a workflow definition.
  2. Set the following fields as specified:
    • Set "workflowStatusListenerEnabled" to true.
    • Set "workflowStatusListenerSink" to the integration added in the previous step.

For example, if AMQP is configured (with the integration name “amqp-test”) to a “queue_name,” the sink becomes amqp:amqp-test:queue-name.

"workflowStatusListenerEnabled": true,
"workflowStatusListenerSink": "amqp:amqp-test:queue-name"

The Conductor UI also supports enabling this directly:

Enabling workflow status listener via UI

tip

The “Workflow listener sink” drop-down field lists the integrations added to the cluster. The topic or queue name must be added manually.

For example, if an AMQP integration is added with the name “amqp-test” the drop-down shows:

Drop-down listing the integrations for workflow listener sink

Choose the integration and add the queue name to this so that the workflow listener sink is updated as follows:

"workflowStatusListenerSink": "amqp:amqp-test:queue-name"
  1. Save the workflow definition.

Execute Workflow

The next step is to execute the workflow. Workflows can be run in different ways, such as using SDKs, APIs, or Conductor UI.

To run a workflow using Conductor UI:

  1. Click the Run Workflow button from the left menu.
  2. Choose the workflow name and version.
  3. Click Run Workflow at the top-right corner.
  4. Click on the workflow execution ID generated to view the execution.

Steps to run workflow from Conductor UI

The workflow execution begins, and upon any workflow state change, the details are sent to the configured eventing system. To be more specific, an event is triggered when the workflow state transitions from 'Running' to any other state.

Workflow in running state

Verifying Changes in Eventing Systems

The final step is to verify that the workflow status changes are reflected in the configured eventing systems.

The settings for each of the eventing systems vary, so ensure to verify under the configured topic/queue name.