Enabling CDC (Change Data Capture)
Change Data Capture (CDC) is a design pattern for tracking changes in the source data and replicating the changes to the target systems.
This document outlines Orkes Conductor’s CDC functionality, which enables sending workflow state updates to eventing systems like Kafka, AWS SQS, AMQP, etc.
The major steps for enabling CDC on Conductor workflow include:
- Add Eventing Integration in Orkes Conductor.
- Configuring CDC Parameters in Conductor Workflows.
- Execute Workflows.
- Verify Changes in Eventing Systems.
Add Eventing Integration in Orkes Conductor
The first step in enabling workflow CDC is to add the required integration in Orkes Conductor.
Supported integrations include:
AMQP
Steps to integrate AMQP with Orkes Conductor.
Get Configuration Credentials from AMQP
Before beginning the integration process in Orkes Conductor, you must obtain specific configuration credentials from AMQP, such as protocol, username, password, host, port, and virtual host.
Refer to the official AMQP documentation on how to get these configuration parameters.
Integrating with AMQP as a Message Broker
Now, you have the required configuration credentials from AMQP. Let’s integrate with Orkes Conductor.
- Navigate to Integrations from the left menu on the Conductor cluster.
- Click + New integration from the top-right corner of your window.
- Under the Message Broker section, choose AMQP.
- Click +Add and provide the following parameters:
Paremeters | Description |
---|---|
Integration Name | A name to identify your integration. |
Protocol | Specify the communication protocol to be used. It can be ‘amqp’ or ‘amqps’ (Recommended for secure connections). |
Username | Specify the username to authenticate and authorize the connection. |
Password | Specify the password associated with the provided username. |
Host | The hostname of the server where the message broker is running. |
Port | The port number on the host where the message broker is running. The default port for AMQPS is 5671, and the default port for AMQP is 5672. |
Virtual Host | Specify the virtual host namespace. In AMQP, a virtual host is a namespace that allows multiple messaging environments to coexist within a single broker. |
Description | Provide a description of the integration. |
- You can toggle on the Active button to activate the integration instantly.
- Click Save.
AWS SQS
Steps to integrate AWS SQS with Orkes Conductor.
Get Configuration Credentials from AWS SQS
Before beginning the integration process in Orkes Conductor, you must get specific configuration credentials from your Amazon SQS account.
- AWS Account ID & region where the SQS is located.
- Amazon Resource Name (ARN) to identify & access the queue. ARN is generally of the format arn:aws:sqs:region:account-id:queue-name.
- External ID - When you assume a role belonging to another account in AWS, you need to provide the external ID, an ID that can be used in an IAM role trust policy to designate the person to assume the role. Learn more.
- Access Key & Secret from AWS SQS account.
Refer to the AWS SQS official documentation on how to get these credentials.
Integrating with AWS SQS as a Message Broker
Now, you have the required configuration credentials from AWS SQS. Let’s integrate with Orkes Conductor.
- Navigate to Integrations from the left menu on the Conductor cluster.
- Click + New integration from the top-right corner of your window.
- Under the Message Broker section, choose AWS SQS. Click +Add and provide the following parameters:
Parameters | Description |
---|---|
Name | Provide a name to identify your event handler definition. |
Connection Type | Choose the required connection type. Depending upon how the connection is to be established, it can take the following values:
|
Region | Provide the valid AWS region where the SQS is located. |
Account ID | Provide your AWS Account ID. This field is optional. Note: If ARN is not used for the “Sink” in the workflow definition, the account ID should be used. |
Role ARN | Specify the Amazon Resource Name (ARN) required for setting up the connection. Note: This field is applicable only if the Connection Type is chosen as Assume External Role. |
External ID | If applicable, provide the external ID for assuming the role. Note:This field is applicable only if the Connection Type is chosen as Assume External Role. |
Access Key | Provide the AWS Access Key. Note:This field is applicable only if the Connection Type is chosen as Access Key/Secret. |
Access Secret | Provide the AWS Access Secret. Note:This field is applicable only if the Connection Type is chosen as Access Key/Secret. |
Description | Provide a description of the integration. |
- You can toggle on the Active button to activate the integration instantly.
- Click Save.
Azure Service Bus
Steps to integrate Azure Service Bus with Orkes Conductor.
Get Configuration Credentials from Azure Service Bus
Before beginning the integration process in Orkes Conductor, you must get specific configuration credentials, such as the endpoint and namespace of the service bus.
- Refer to the official documentation on how to get the namespace.
- The endpoint is part of the connection string of the service bus. Refer to the official documentation on how to get the connection string (containing the endpoint) of the service bus.
Integrating with Azure Service Bus as a Message Broker
Now, you have the required configuration credentials from Azure Service Bus. Let’s integrate with Orkes Conductor.
- Navigate to Integrations from the left menu on the Conductor cluster.
- Click + New integration from the top-right corner of your window.
- Under the Message Broker section, choose Azure Service Bus.
- Click +Add and provide the following parameters:
Paremeters | Description |
---|---|
Integration Name | A name to identify your integration. |
Connection Type | Choose the required connection type. It can take the following values:
|
Endpoint | Provide the endpoint of the service bus. Refer to the previous section on how to get this. Note: This field is applicable only if the Connection Type is chosen as Connection String. |
Namespace | Provide the namespace of the service bus. Refer to the previous section on how to get this. Note: This field is applicable only if the Connection Type is chosen as Password Less. |
Description | Provide a description of the integration. |
- You can toggle on the Active button to activate the integration instantly.
- Click Save.
Kafka (Apache Kafka, Amazon MSK, Confluent Kafka)
Steps to integrate Kafka with Orkes Conductor.
Get Configuration Credentials from Apache Kafka
Before beginning the integration process in Orkes Conductor, you must obtain specific configuration credentials from the Kafka cluster, such as the Bootstrap server, Schema registry URL, and API keys.
The configuration steps vary depending on the type of Kafka cluster to be integrated.
Case - 1: Getting configuration credentials from self-managed Apache Kafka.
Refer to the official Apache Kafka documentation for setting up Apache Kafka locally. Get the bootstrap server and API keys & secrets for integrating with Conductor.
Case - 2: Getting configuration credentials from Confluent Kafka.
To obtain the API keys from Confluent Kafka:
- From the Confluent Cloud portal, choose the cluster to be integrated with Orkes Conductor and navigate to Cluster Overview > API Keys.
- Create a new key by clicking Create Key/+Add key and selecting the required access (Global access/Granular access).
- Note down the values for the key and secret.
To get the Bootstrap server from Confluent Kafka:
- Navigate to Cluster Overview > Cluster Settings > Endpoints and copy the Bootstrap server.
- Then, navigate to Topics to see the list of topics on this cluster and identify the Topic name to use for this integration.
To get the Schema registry server, API key & secret (This is only required if you are integrating with a schema registry):
- Go to Clients > Add new client.
- Under the “Copy the configuration snippet for your clients” section, copy the Schema Registry URL and download the Schema Registry API Key. The downloaded file will contain the Schema Registry API key and secret.
Case - 3: Getting configuration credentials from Amazon MSK.
To get the Bootstrap server:
- Login to Amazon MSK console.
- Once logged in, the table lists all the clusters under the account for the current region.
- Choose your cluster, and click View client information on the cluster summary page. This gives the bootstrap broker and the Apache ZooKeeper connection string.
Refer to the official Amazon MSK documentation for more details.
Integrating with Apache Kafka as a Message Broker
Once you have the configuration credentials from the Kafka cluster, let’s integrate with Orkes Conductor.
- Navigate to Integrations from the left menu on the Conductor cluster.
- Click + New integration from the top-right corner of your window.
- Under the Message Broker section, choose Apache Kafka.
- Click +Add and provide the following parameters:
Parameters | Description |
---|---|
Integration Name | A name to identify the integration. |
Bootstrap Server | Provide the bootstrap server of the Apache Kafka cluster. |
Sending Protocol | Choose the required sending protocol for the integration. It can take two values:
|
Connection Security | Choose the security mechanism for connecting to the Kafka cluster. It can take values:
|
Username | If authentication is enabled (SASL_SSL), provide the username to authenticate with the Kafka cluster. Note: For AVRO configuration, provide the API key copied previously as the username. |
Password | Provide the password associated with the username to authenticate the connection. Note: For AVRO configuration, provide the API secret copied previously as the password. |
Schema Registry URL | Provide the Schema Registry URL from the Apache Kafka console. Notes:
|
Schema Registry Auth Type | Specifies the authentication mechanism for connecting to the schema registry. It can be of the following types:
|
Schema Registry API Key | Provide the Schema Registry API Key from the Kafka console. Notes:
|
Schema Registry API Secret | Provide the Schema Registry API Secret from the Kafka console. Notes:
|
Value Subject Name Strategy | Defines the strategy for constructing the subject name under which the AVRO schema will be registered in the schema registry. It can take the following values:
|
Truststore type | If SSL encryption is enabled, provide the type and password for the trust store containing the CA certificates used to verify the Kafka broker's SSL certificate. It can be of the following types:
|
Trust Store Password | If the trust store type is JKS, provide the password for the trust store. Note: Not supported for Amazon MSK clusters. |
Consumer Group ID | Enter the Consumer Group ID from Kafka. This unique identifier helps manage message processing, load balancing, and fault tolerance within consumer groups. |
Description | Provide a description of the integration. |
- You can toggle on the Active button to activate the integration instantly.
- Click Save.
NATS Messaging
Steps to integrate NATS Messaging with Orkes Conductor.
Get Configuration Credentials from NATS Messaging
You must set up NATS messaging before integrating with Orkes Conductor. Refer to the NATS Messaging official documentation for more details.
Get the following credentials from the NATS server:
- Server name
- Username & password if you prefer to authenticate with credentials.
Integrating with NATS Messaging as a Message Broker
Once you have the required configuration credentials from NATS Messaging, let’s integrate with Orkes Conductor.
- Navigate to Integrations from the left menu on the Conductor cluster.
- Click + New integration from the top-right corner of your window.
- Under the Message Broker section, choose NATS Messaging.
- Click +Add and provide the following parameters:
Paremeters | Description |
---|---|
Integration Name | A name to identify your integration. |
Server | Provide the NATS server name to be integrated with Orkes Conductor. |
Connection Type | Choose the required connection type for the integration. It can take two values:
|
Authentication Type | Choose the required authentication type. You can opt for With Credentials or Without Credentials. |
Username | Provide the username for authentication. This field is required only if the Authentication Type is chosen as With Credentials. |
Password | Provide the password for authentication. This field is required only if the Authentication Type is chosen as With Credentials. |
Description | Provide a description of the integration. |
- You can toggle on the Active button to activate the integration instantly.
- Click Save.
GCP Pub Sub
Steps to integrate GCP Pub Sub with Orkes Conductor.
Get Configuration Credentials from GCP Pub Sub
Before beginning the integration process in Orkes Conductor, you must get specific configuration credentials such as project ID, subscription ID, and Service Account JSON from the GCP console.
To get the project ID:
- Login to Google Cloud Console and create a project.
- Click the drop-down menu on the top left of the console to select your desired project.
- The Project ID will be displayed on the dashboard below the project name.
Refer to the official documentation on creating and managing projects in GCP for more details.
To get the subscription ID:
- Go to the Pub/Sub section in the Cloud Console. (From the left menu navigation, go to Products & solutions > Categories - Analytics > Pub/Sub)
- From the left menu, click Subscriptions and choose the subscription you want to use, or create a new one.
- The Subscription ID will be displayed as shown below:
To get the Service Account JSON:
- From the left menu, navigate to the IAM & Admin section.
- Select Service Accounts from the left menu.
- Click on an existing service account you want to use or create a new one.
- Under the Keys sub-tab, click Add Key.
- Choose the option Create new key.
- Choose the key type as JSON and click Create to generate the JSON key.
Integrating with GCP Pub Sub as a Message Broker
Once you have the required configuration credentials from GCP Pub Sub, let’s integrate with Orkes Conductor.
- Navigate to Integrations from the left menu on the Conductor cluster.
- Click + New integration from the top-right corner of your window.
- Under the Message Broker section, choose GCP Pub Sub.
- Click +Add and provide the following parameters:
Paremeters | Description |
---|---|
Integration Name | A name to identify your integration. |
Project ID | Provide the project ID containing the topic. Refer to the previous section on how to get the project ID. |
Subscription ID | Provide the subscription ID. Refer to the previous section on how to get the subscription ID. |
Upload Service Account JSON | Upload the Service Account JSON file, which is a key file containing the credentials for authenticating the Orkes Conductor cluster with the GCP Pub Sub services. Refer to the previous section on how to generate the service account JSON. |
Description | Provide a description of the integration. |
- You can toggle on the Active button to activate the integration instantly.
- Click Save.
IBM MQ
Steps to integrate IBM MQ with Orkes Conductor.
Get Configuration Credentials from IBM MQ
Before beginning the integration process, you must obtain certain configuration parameters from the IBM MQ console, such as hostname, port, queue manager, channel, etc.
You can get the host name & port while setting up IBM MQ.
Refer to the official IBM documentation for more configuration details.
Integrating with IBM MQ as a Message Broker
Once you have the required configuration credentials from IBM MQ, let’s integrate with Orkes Conductor.
- Navigate to Integrations from the left menu on the Conductor cluster.
- Click + New integration from the top-right corner of your window.
- Under the Message Broker section, choose IBM MQ.
- Click +Add and provide the following parameters:
Parameters | Description |
---|---|
Integration Name | A name to identify your integration. |
Host Name | The hostname or IP address of the IBM MQ server. |
Port | The port number on which the IBM MQ server is configured to listen for incoming connections. The default port for IBM MQ is 1414, but it usually varies with the required connection. |
Queue Manager | Specify the queue manager to which Orkes Conductor will connect. The queue manager should already be configured in your IBM MQ environment. Check out IBM MQ’s official documentation on configuring queue manager. |
Channel | IBM MQ uses channels to establish connections between clients and queue managers. Specify the channel name the Conductor will use to communicate with IBM MQ. |
User | The username to authenticate the connection with the IBM MQ server. |
Password | The password associated with the username to authenticate the connection with the IBM MQ server. |
Description | Provide a description of the integration. |
- You can toggle on the Active button to activate the integration instantly.
- Click Save.
Configuring CDC Parameters in Conductor Workflows
The next step is to configure CDC parameters in the workflow.
To set CDC parameters:
- Create a workflow definition.
- Use API to create workflow definition.
Or - Use Conductor UI:
- Navigate to Definitions > Workflow on the Conductor cluster.
- Click +Define Workflow.
- Create a workflow by adding required tasks.
- Use API to create workflow definition.
- Set the following fields as specified:
- Set "workflowStatusListenerEnabled" to true.
- Set "workflowStatusListenerSink" to the integration added in the previous step.
For example, if AMQP is configured (with the integration name “amqp-test”) to a “queue_name,” the sink becomes amqp:amqp-test:queue-name.
"workflowStatusListenerEnabled": true,
"workflowStatusListenerSink": "amqp:amqp-test:queue-name"
The Conductor UI also supports enabling this directly:
The “Workflow listener sink” drop-down field lists the integrations added to the cluster. The topic or queue name must be added manually.
For example, if an AMQP integration is added with the name “amqp-test” the drop-down shows:
Choose the integration and add the queue name to this so that the workflow listener sink is updated as follows:
"workflowStatusListenerSink": "amqp:amqp-test:queue-name"
- Save the workflow definition.
Execute Workflow
The next step is to execute the workflow. Workflows can be run in different ways, such as using SDKs, APIs, or Conductor UI.
To run a workflow using Conductor UI:
- Click the Run Workflow button from the left menu.
- Choose the workflow name and version.
- Click Run Workflow at the top-right corner.
- Click on the workflow execution ID generated to view the execution.
The workflow execution begins, and upon any workflow state change, the details are sent to the configured eventing system. To be more specific, an event is triggered when the workflow state transitions from 'Running' to any other state.
Verifying Changes in Eventing Systems
The final step is to verify that the workflow status changes are reflected in the configured eventing systems. The settings for each of the eventing systems vary, so ensure to verify under the configured topic/queue name.