Integrating Confluent Kafka with Orkes Conductor
This developer guide includes the steps to integrate Confluent Kafka with Orkes Conductor. This integration lets you connect the Confluent Kafka cluster to Conductor to publish and receive messages from queues/topics.
Confluent Kafka configuration is deprecated. Please use Apache Kafka for all future configurations.
Get Configuration Credentials from Confluent Kafka
Before beginning the integration process in Orkes Conductor, you must get specific configuration credentials such as the Bootstrap server, Schema registry URLs & API keys from Confluent Cloud.
To obtain the API Keys:
- From the Confluent Cloud portal, choose the cluster to be integrated with Orkes Conductor and navigate to Cluster Overview > API Keys.
- Create a new key by clicking Create Key/+Add key and selecting the required access (Global access/Granular access).
- Note down the values for the Key and Secret.
To get the Bootstrap server:
- Navigate to Cluster Overview > Cluster Settings > Endpoints and copy the Bootstrap server.
- Then, navigate to Topics to see the list of topics on this cluster and identify the Topic name to use for this integration.
To get the Schema registry server, API key & secret (This is only required if you are integrating with a schema registry):
- Go to Clients > Add new client.
- Under the “Copy the configuration snippet for your clients” section, copy the Schema Registry URL & download the Schema Registry API key. The downloaded file will have the Schema Registry API key and secret.
Integrating with Confluent Kafka as a Message Broker
Now, you have the required configuration credentials from Confluent Kafka. Let’s integrate with Orkes Conductor.
- Navigate to Integrations from the left menu on the Orkes Conductor cluster.
- Click + New integration from the top-right corner.
- Under the Message Broker section, choose Confluent Kafka.
- Click +Add and provide the following parameters:
Paremeters | Description |
---|---|
Integration Name | A name to identify the integration. |
Bootstrap Server | The bootstrap server for the Confluent Kafka cluster. Refer to the previous section on how to get the bootstrap server. |
Sending Protocol | Choose the required sending protocol for the integration. It can take two values:
|
Connection Security | Choose the security mechanism for connecting to the Kafka cluster. It can take values:
|
Username | If authentication is enabled (SASL_SSL), provide the username to authenticate with the Kafka cluster. Note: For AVRO configuration, provide the API key copied previously as the username. |
Password | The password associated with the username to authenticate the connection. Note: For AVRO configuration, provide the API secret copied previously as the password. |
Schema Registry URL | The Schema Registry URL from the Confluent Kafka console. Refer to the previous section on how to get this. Note: This field is only applicable if the Sending Protocol is chosen as AVRO. |
Schema Registry Auth Type | The authentication mechanism for connecting to the schema registry. It can be of the following types:
|
Schema Registry API Key | The Schema Registry API Key from the Confluent Kafka console. Refer to the previous section on how to get this. Note: This field is only applicable if the Sending Protocol is chosen as AVRO. |
Schema Registry API Secret | The Schema Registry API Secret from the Confluent Kafka console. Refer to the previous section on how to get this. Note: This field is only applicable if the Sending Protocol is chosen as AVRO. |
Value Subject Name Strategy | Defines the strategy for constructing the subject name under which the AVRO schema will be registered in the schema registry. It can take the following values:
|
Choose Trust Store file | If SSL encryption is enabled, upload the Java JKS trust store file with CAs. |
Trust Store Password | The password for the trust store file. |
Consumer Group ID | Enter the Consumer Group ID from Kafka. This unique identifier helps manage message processing, load balancing, and fault tolerance within consumer groups. |
Description | A description of the integration. |
- You can toggle on the Active button to activate the integration instantly.
- Click Save.
Creating Event Handlers in Orkes Conductor
The integration is created successfully now. The next step is to create an event handler in Orkes Conductor.
To do this:
- Navigate to Definitions > Event Handler from the left menu on the Orkes Conductor cluster.
- Click + Define event handler option from the top-right corner.
- Create an event handler with the following configurations:
Event Handler Parameters | Description |
---|---|
Name | A name to identify the event handler definition. |
Event | The event integration you have created in the following format: Type : Config Name : Topic Name Example: kafka_confluent:john-test:topic_0 Note: The drop-down lists the integrations you’ve added to the Conductor cluster automatically. You can choose that and add the topic name you want to publish/receive messages. |
Condition | The ECMAScript to control the message processing if required. Check out the event handler documentation for more details. |
Actions | Choose the required actions to be carried out on receiving the events from Kafka. It can take the following values:
|
Active | Set this to true or false. It determines if the event handler is running or not. |
A sample JSON for the event handler is as follows:
{
"name": "John-Test",
"event": "kafka_confluent:John-Test:topic_0",
"condition": "",
"actions": [
{
"action": "start_workflow",
"start_workflow": {
"name": "http-sample-test",
"version": 1,
"correlationId": "",
"input": {}
},
"expandInlineJSON": false
}
],
"active": true,
"evaluatorType": "javascript"
}
RBAC - Governance on who can use Integrations
Once the integration is added, the next step is determining who can access these integrations.
The permissions can be granted to applications/groups within the Orkes Conductor cluster.
To provide explicit permission to Groups:
- From the left menu on your Orkes Conductor cluster, navigate to Access Control > Groups.
- Create a new group or choose an existing group.
- Under the Permissions section, click +Add Permission.
- From the Integrations sub-tab, choose the integration with required permissions.
- Click Add Permissions. This ensures that all the group members can access these integration models in their workflows.
Similarly, you can also provide permissions to applications.
Creating Workflow in Orkes Conductor
This step involves creating a workflow with an event task in Orkes Conductor. Here, we are utilizing the Kafka queue as a sink for the event.
You can quickly build a workflow from UI in Orkes Conductor.
For this,
- Navigate to Definitions > Workflow, and click the + Define Workflow button.
- Create a workflow with an event task with the Sink in the format kafka_confluent:John-Test:topic_0, where “John-Test” is the integration name and “topic_0” is the topic to which Conductor should send/receive messages.
- Under the Input parameters, you need to provide the following parameters:
- _schema - Provide the topic name along with the subject.
- To get the subject name, navigate to Home > Environment > Choose your environment on your Confluent console.
- Under the Schema Registry sub-tab, you can get the subject name.
- Provide this subject name as the input parameter “_schema” value while defining the workflow.
- _schema - Provide the topic name along with the subject.
- Add all fields in your topic’s schema as the input parameters in the workflow definition.
- You can get your schema for the topic by navigating to the “Schema” subtab from your topic and clicking on “Evolve schema”.
- For example, The topic I’ve chosen here is topic_0, whose Schema is as follows:
{
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The string is a unicode character sequence.",
"name": "my_field3",
"type": "string"
}
],
"name": "sampleRecord",
"namespace": "com.mycorp.mynamespace",
"type": "record"
}
- Ensure that all the “fields” here are mapped into the workflow definition.
Ensure to use a unique “name” & “namespace” to avoid any conflicts.
So, in this example, the input parameter is as follows (including the schema fields & schema subject name):
Here’s the JSON for the workflow:
{
"name": "John-test",
"description": "Sample Workflow for Confluent Kafka Integration",
"version": 1,
"tasks": [
{
"name": "event",
"taskReferenceName": "event_ref",
"inputParameters": {
"_schema": "topic_0-value",
"my_field3": "Some-Value-71gfy"
},
"type": "EVENT",
"sink": "kafka_confluent:John-Test:topic_0"
}
],
"schemaVersion": 2,
"ownerEmail": "devrel@orkes.io"
}
Executing Workflow in Orkes Conductor
The workflow can be run using different methods. You can use the Run Workflow button for quick testing, as shown in the image below:
Upon successful execution, you can verify the message's delivery through the Confluent Cloud portal. Navigate to the topic and check for messages to verify that the message is consumed successfully.
The action added in the event handler definition was to start the workflow http-sample-test. You can verify this from the Executions > Workflow page.