Tasks sending and receiving SQS messages
Amazon's Simple Queueing Service (SQS) is a handy way to send messages across systems. SQS support is included in Conductor but requires a few changes to the Conductor instance to work properly.
In this example, we will:
- Configure Conductor for SQS messaging
- Set up an SQS message queue at AWS
- Build a workflow with an EVENT task that sends messages to our SQS queue.
- Add an EVENT to Conductor that will complete a WAIT event in the same workflow.
In this way, this example follows the path of an SQS message from creation in Conductor to AWS and then back into Conductor.
Configuring Conductor for SQS
In your instance of Conductor, we will need to edit a few configurations:
- First edit
/server/src/main/resources/application.properties
(around line 60). The default queue type may be set to SQS, but we must enable the queues and establish which AWS account is to be used:
# Default event queue type to listen on for wait task
conductor.default-event-queue.type=sqs
conductor.event-queues.sqs.enabled=true
conductor.event-queues.sqs.authorized-accounts={your AWS account number}
- Second, in
annotations-processor/awssqs-event-queue/src/main/java/com/netflix/conductor/SQSEventQueueConfiguration.java
, add the following Bean:
@Bean
AWSCredentialsProvider createAWSCredentialsProvider() {
return new DefaultAWSCredentialsProviderChain();
}
AWS environmental variables
You'll need to set 4 AWS environmental variables on the machine running Conductor. From your AWS account, you can copy three of the credentials:
export AWS_ACCESS_KEY_ID="{key_id}"
export AWS_SECRET_ACCESS_KEY="{access_key}"
export AWS_SESSION_TOKEN="{token_value}}"
You'll also want to set your AWS region
export AWS_REGION = "{the region you are setting up the queue in}"
Creating an SQS Queue
In your AWS account, open Amazon SQS (search for it in the search bar) and choose Create queue
. Create a queue by entering a name and scrolling to the bottom to create queue
. Your new queue will appear in the queue list in a minute or so.
Building your workflow
Now we will build a workflow to send and receive the SQS messages. You can find the JSON on the Orkes Playground.
This workflow has two forks:
- On the left fork, there is just one task
task_1
. It is a WAIT task, waiting for a message from the SQS server.Note: for the SQS message to COMPLETE the task, we'll need to create a Conductor EVENT.
- The right fork has two tasks.
task_2
is also a WAIT task, but it just waits for 10s before completing and moving the workflow ahead. On the completion oftask_2
, the EVENT tasksend_sqs_messsage
fires.
The reason for the delay in task_2
is to ensure that task_1
is initialized and waiting for the event to come in before the SQS message is sent out.
Here's the definition of send_sqs_message
:
{
"name": "send_sqs_message",
"taskReferenceName": "send_sqs_message_ref",
"inputParameters": {},
"type": "EVENT",
"sink": "sqs:conductor_doug_testing"
}
When this fires, a message is sent to the conductor_doug_testing
SQS queue at AWS. We could send data on the inputParameters, if desired.
Here is the default payload sent to AWS in the SQS message
{
"workflowType":"event_testing",
"correlationId":null,
"workflowVersion":1,
"asyncComplete":false,
"workflowInstanceId":"266ac320-7179-4a11-908e-0758458037a1",
"sink":"sqs:conductor_doug_testing"
}
It is important to note that we have the workflow name and the workflowInstanceId for the specific invocation of the workflow.
We are almost there. If we run this workflow now - the right side of the FORK would complete, but there is no EVENT to tell Conductor how to complete task_1
.
Adding an Event
To add an event, use the POST /api/event/
endpoint. Here's the body used in this example:
{
"name": "complete_task_event",
"event": "sqs:conductor_doug_testing",
"actions": [
{
"action": "complete_task",
"complete_task": {
"workflowId": "${workflowInstanceId}",
"taskRefName": "task_1_ref",
"input": {
}
}
}
],
"active": true
}
We have named the event complete_task_event
and the event is a message from our SQS message queue sqs:conductor_doug_testing
. The action is complete_task
, and we want to complete task_ref_1 from the workflowInstanceId that is in the body of the SQS message.
Example
When we invoke the workflow:
Both task_1 and task_2 are IN_PROGRESS.
Once the WAIT poll task completes task_2, the SQS task fires and sends a message to AWS. You can poll for messages in your SQS queue (in your queue - click "send/receive messages" and there is a "poll for tasks" button) and see the message pass through:
This message triggers our EVENT, and the event COMPLETES task_1
.
There we have it. We have configured Conductor to send and receive SQS messages to our AWS instance. We built an EVENT task to send the messages and created an event listener to receive events from AWS and perform actions in our workflow (In this case, sending a COMPLETED status to a WAIT task.)