Skip to main content

Integrating Confluent Kafka with Orkes Conductor

This developer guide includes the steps to integrate Confluent Kafka with Orkes Conductor. This integration lets you connect the Confluent Kafka cluster to Conductor to publish and receive messages from queues/topics.

note

Confluent Kafka configuration is deprecated. Please use Apache Kafka for all future configurations.

Get Configuration Credentials from Confluent Kafka

Before beginning the integration process in Orkes Conductor, you must get specific configuration credentials such as the Bootstrap server, Schema registry URLs & API keys from Confluent Cloud.

To obtain the API Keys:

  1. From the Confluent Cloud portal, choose the cluster to be integrated with Orkes Conductor and navigate to Cluster Overview > API Keys.
  2. Create a new key by clicking Create Key/+Add key and selecting the required access (Global access/Granular access).
  3. Note down the values for the Key and Secret.

Generating API Keys from Confluent Cloud

To get the Bootstrap server:

  1. Navigate to Cluster Overview > Cluster Settings > Endpoints and copy the Bootstrap server.

Getting Bootstrap token from Confluent Cloud

  1. Then, navigate to Topics to see the list of topics on this cluster and identify the Topic name to use for this integration.

Topics in Confluent Cloud

To get the Schema registry server, API key & secret (This is only required if you are integrating with a schema registry):

  1. Go to Clients > Add new client.
  2. Under the “Copy the configuration snippet for your clients” section, copy the Schema Registry URL & download the Schema Registry API key. The downloaded file will have the Schema Registry API key and secret.

Getting Schema Registry URL

Integrating with Confluent Kafka as a Message Broker

Now, you have the required configuration credentials from Confluent Kafka. Let’s integrate with Orkes Conductor.

  1. Navigate to Integrations from the left menu on the Orkes Conductor cluster.
  2. Click + New integration from the top-right corner.
  3. Under the Message Broker section, choose Confluent Kafka.
  4. Click +Add and provide the following parameters:

Integration configuration for Confluent Kafka

ParemetersDescription
Integration NameA name to identify the integration.
Bootstrap ServerThe bootstrap server for the Confluent Kafka cluster. Refer to the previous section on how to get the bootstrap server.
Sending ProtocolChoose the required sending protocol for the integration. It can take two values:
  • String - Messages are sent as simple string data.
  • AVRO - Messages are serialized using AVRO.
If you are integrating with a schema registry, choose AVRO as the sending protocol.
Connection SecurityChoose the security mechanism for connecting to the Kafka cluster. It can take values:
  • SASL_SSL / PLAIN - Secure connection using SASL (Simple Authentication and Security Layer) with SSL encryption.
  • SASL_SSL / SCRAM-SHA-256 / JKS - Secure connection using SASL with SCRAM-SHA-256 authentication and SSL encryption.
UsernameIf authentication is enabled (SASL_SSL), provide the username to authenticate with the Kafka cluster.

Note: For AVRO configuration, provide the API key copied previously as the username.
PasswordThe password associated with the username to authenticate the connection.

Note: For AVRO configuration, provide the API secret copied previously as the password.
Schema Registry URLThe Schema Registry URL from the Confluent Kafka console. Refer to the previous section on how to get this.

Note: This field is only applicable if the Sending Protocol is chosen as AVRO.
Schema Registry Auth TypeThe authentication mechanism for connecting to the schema registry. It can be of the following types:
  • Password in URL
  • Schema Registry User Info (Key/Password)
  • NONE
Note: This field is only applicable if the Sending Protocol is chosen as AVRO.
Schema Registry API KeyThe Schema Registry API Key from the Confluent Kafka console. Refer to the previous section on how to get this.

Note: This field is only applicable if the Sending Protocol is chosen as AVRO.
Schema Registry API SecretThe Schema Registry API Secret from the Confluent Kafka console. Refer to the previous section on how to get this.

Note: This field is only applicable if the Sending Protocol is chosen as AVRO.
Value Subject Name StrategyDefines the strategy for constructing the subject name under which the AVRO schema will be registered in the schema registry. It can take the following values:
  • io.confluent.kafka.serializers.subject.TopicNameStrategy
  • io.confluent.kafka.serializers.subject.RecordNameStrategy
  • io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Note: This field is only applicable for AVRO configuration.
Choose Trust Store fileIf SSL encryption is enabled, upload the Java JKS trust store file with CAs.
Trust Store PasswordThe password for the trust store file.
Consumer Group IDEnter the Consumer Group ID from Kafka. This unique identifier helps manage message processing, load balancing, and fault tolerance within consumer groups.
DescriptionA description of the integration.
  1. You can toggle on the Active button to activate the integration instantly.
  2. Click Save.

Creating Event Handlers in Orkes Conductor

The integration is created successfully now. The next step is to create an event handler in Orkes Conductor.

To do this:

  1. Navigate to Definitions > Event Handler from the left menu on the Orkes Conductor cluster.
  2. Click + Define event handler option from the top-right corner.
  3. Create an event handler with the following configurations:

Configuring Event Handler for Confluent Kafka Integration

Event Handler ParametersDescription
NameA name to identify the event handler definition.
EventThe event integration you have created in the following format:

Type : Config Name : Topic Name

Example: kafka_confluent:john-test:topic_0

Note: The drop-down lists the integrations you’ve added to the Conductor cluster automatically. You can choose that and add the topic name you want to publish/receive messages.
ConditionThe ECMAScript to control the message processing if required. Check out the event handler documentation for more details.
ActionsChoose the required actions to be carried out on receiving the events from Kafka. It can take the following values:
  • Complete Task
  • Terminate Workflow
  • Update Variables
  • Fail Task
  • Start Workflow
Each type of action requires and supports a certain set of input parameters. Check out the event handler documentation for more details.
ActiveSet this to true or false. It determines if the event handler is running or not.

A sample JSON for the event handler is as follows:

{
"name": "John-Test",
"event": "kafka_confluent:John-Test:topic_0",
"condition": "",
"actions": [
{
"action": "start_workflow",
"start_workflow": {
"name": "http-sample-test",
"version": 1,
"correlationId": "",
"input": {}
},
"expandInlineJSON": false
}
],
"active": true,
"evaluatorType": "javascript"
}

RBAC - Governance on who can use Integrations

Once the integration is added, the next step is determining who can access these integrations.

The permissions can be granted to applications/groups within the Orkes Conductor cluster.

To provide explicit permission to Groups:

  1. From the left menu on your Orkes Conductor cluster, navigate to Access Control > Groups.
  2. Create a new group or choose an existing group.
  3. Under the Permissions section, click +Add Permission.
  4. From the Integrations sub-tab, choose the integration with required permissions.
  5. Click Add Permissions. This ensures that all the group members can access these integration models in their workflows.

Configuring RBAC for Confluent Kafka Integration

Similarly, you can also provide permissions to applications.

Creating Workflow in Orkes Conductor

This step involves creating a workflow with an event task in Orkes Conductor. Here, we are utilizing the Kafka queue as a sink for the event.

You can quickly build a workflow from UI in Orkes Conductor.

For this,

  1. Navigate to Definitions > Workflow, and click the + Define Workflow button.
  2. Create a workflow with an event task with the Sink in the format kafka_confluent:John-Test:topic_0, where “John-Test” is the integration name and “topic_0” is the topic to which Conductor should send/receive messages.

Event task in Orkes Conductor

  1. Under the Input parameters, you need to provide the following parameters:
    • _schema - Provide the topic name along with the subject.
      • To get the subject name, navigate to Home > Environment > Choose your environment on your Confluent console.
      • Under the Schema Registry sub-tab, you can get the subject name.

        Identifying subject name in Confluent Kafka

      • Provide this subject name as the input parameter “_schema” value while defining the workflow.
  2. Add all fields in your topic’s schema as the input parameters in the workflow definition.
    • You can get your schema for the topic by navigating to the “Schema” subtab from your topic and clicking on “Evolve schema”.

      Identifying Schema parameters

    • For example, The topic I’ve chosen here is topic_0, whose Schema is as follows:
     {
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The string is a unicode character sequence.",
"name": "my_field3",
"type": "string"
}
],
"name": "sampleRecord",
"namespace": "com.mycorp.mynamespace",
"type": "record"
}
  • Ensure that all the “fields” here are mapped into the workflow definition.
note

Ensure to use a unique “name” & “namespace” to avoid any conflicts.

So, in this example, the input parameter is as follows (including the schema fields & schema subject name):

Input Parameters for Confluent Kafka Event

Here’s the JSON for the workflow:

{
"name": "John-test",
"description": "Sample Workflow for Confluent Kafka Integration",
"version": 1,
"tasks": [
{
"name": "event",
"taskReferenceName": "event_ref",
"inputParameters": {
"_schema": "topic_0-value",
"my_field3": "Some-Value-71gfy"
},
"type": "EVENT",
"sink": "kafka_confluent:John-Test:topic_0"
}
],
"schemaVersion": 2,
"ownerEmail": "devrel@orkes.io"
}

Executing Workflow in Orkes Conductor

The workflow can be run using different methods. You can use the Run Workflow button for quick testing, as shown in the image below:

Running workflow from Orkes Conductor UI

Upon successful execution, you can verify the message's delivery through the Confluent Cloud portal. Navigate to the topic and check for messages to verify that the message is consumed successfully.

Verifying the consumed message from Confluent Kafka

The action added in the event handler definition was to start the workflow http-sample-test. You can verify this from the Executions > Workflow page.

Starting workflow on consuming events