Building Workflows
In Conductor, all workflow definitions are stored as JSON. There are three ways to create a Conductor workflow:
- Workflow as code—Using the Conductor SDKs, define the workflow in your preferred language (Python, Java, JavaScript, C#, Go, Clojure) and register it to the Conductor server. Once registered, the JSON workflow definition is generated and saved.
- Workflow as JSON—Write the workflow in JSON and register it to the Conductor server using the /api/metadata/workflow endpoint.
- Visual workflow editor—Using Conductor UI, define your workflow visually, which is formatted as JSON under the hood.
Static vs dynamic workflows
With Conductor, you can create workflows statically (ahead of time) or dynamically (at runtime).
Dynamic workflows are created using the workflow as code approach, allowing you to write your workflow and task workers in your preferred language and execute it immediately. This is useful in cases where workflows cannot be defined ahead of time, as the number of tasks and flow control depend on real-time factors.
To learn more about creating dynamic workflows, refer to the SDKs: Python, Java, JavaScript, C#, Go, and Clojure.
Tasks in workflows
A workflow definition consists of a collection of tasks and operators and specifies the order and execution of the defined tasks. Conductor provides a set of system tasks and operators, but you can also write your own custom worker tasks.
For control flow
The control structures and operations in your Conductor workflow are implemented as tasks. Here are the tasks available for managing the flow of execution:
- Conditional flow
- Switch—Execute tasks conditionally, like an if…else… statement.
- Looping flow
- Do While—Execute tasks repeatedly, like a do…while… statement.
- Parallel flows
- Fork—Execute a static number of tasks in parallel.
- Dynamic Fork—Execute a dynamic number of tasks in parallel.
- Join—Join the forks after a Fork or Dynamic Fork before proceeding to the next task.
- Start Workflow—Asynchronously start another workflow.
- Jumps or state changes in flow
- Terminate—Terminate the current workflow, like a return statement.
- Sub Workflow—Synchronously start another workflow, like a subroutine.
- Terminate Workflow—Terminate another ongoing workflow.
- Update Task—Update the status of another ongoing task.
- State querying
- Get Workflow—Get the execution details of another ongoing workflow.
- Waits in flow
- Wait—Pause the current workflow until a set time, duration, or signal is received.
- Wait for Webhook—Pause the current workflow for an incoming webhook signal.
- Human–Pause the current workflow for user input before proceeding to the next task.
- Dynamic tasks in flow
- Dynamic—Execute a task dynamically, like a function pointer.
For assigning variables
In general, variables are bounded within each task and passed along in the workflow as necessary. However, you can also handle variables or secrets at a global environment or workflow level.
- Set Variable—Create or update workflow variables.
- Update Secret—Create or update secrets in your Conductor cluster.
For execution logic
In most common cases, you can make use of existing Conductor features instead of creating a custom worker from scratch. These include tasks for data transformation, user journeys, and LLM chaining.
Use Case | Task to Use |
---|---|
Call an API or HTTP endpoint | HTTP |
Poll an API or HTTP endpoint | HTTP Poll |
Publish or consume events | Event |
Clean or transform JSON data | JSON JQ |
Modify SQL databases | JDBC |
Execute JavaScript scripts | Inline |
Evaluate and retrieve data in spreadsheets | Business Rule |
Get authorized using a signed JWT | Get Signed JWT |
Orchestrate human input in the loop | Human |
Query data from Conductor Search API or Metrics | Query Processor |
Send alerts to Opsgenie | Opsgenie |
Retrieve text or JSON content from a URL | Get Document |
Generate text embeddings | Generate Embeddings |
Store text embeddings in a vector database | Store Embeddings |
Generate and store text embeddings in a vector database | Index Text |
Chunk, generate, and store text embeddings in a vector database | Index Document |
Retrieve data from a vector database | Get Embeddings |
Retrieve data from a vector database based on a search query | Search Index |
Generate text from an LLM based on a defined prompt | Text Complete |
Generate text from an LLM based on a user query and additional system/assistant instructions | Chat Complete |
Task definitions
All system or custom tasks can be registered to the Conductor server as a task definition. For custom tasks, this is a must to do before the task can be used in a workflow.
All task definitions are also stored as JSON. It specifies general implementation details, like expected task input and output keys, as well as failure-handling configurations, like rate limits, retries, and timeouts. These parameters can be updated in real time without needing to redeploy your application.
For more information on how to configure a task’s failure-handling policy, refer to Error Handling.
Task configuration
The task configuration is part of the workflow definition. It specifies the workflow-specific implementation details, like the task reference name, task type, task input parameters, and so on.
Although each task type has its own unique configuration, all tasks share several parameters in common.
Common configuration parameters
Parameter | Description | Required/ Optional |
---|---|---|
name | Name of the task. The default value is the same as the task type. The name can be changed to something descriptive. To use a given task definition, the task name here must match the task definition name (case-sensitive). | Required. |
taskReferenceName | Reference name for the task. Must be a unique value in a given workflow. | Required. |
type | The task type. For example, HTTP, SIMPLE. | Required. |
inputParameters | Map of the task’s input parameters. | Depends on task type. |
optional | Whether the task is optional. If set to true, the workflow continues to the next task even if this task fails or remains incomplete. | Optional. |
asyncComplete | Whether the task is completed asynchronously. The default value is false. Supported values:
| Optional. |
startDelay | The time in seconds to wait before making the task available for worker polling. The default value is 0. | Optional. |
onStateChange | Configuration for publishing an event or starting another task when this task status changes. | Optional. |
Task-specific configuration parameters
Refer to the Task Reference to learn more about the task configuration for each task type.
Passing data between tasks
Data can be passed from one task to another by using dynamic inputs in a task’s inputParameters
. In Conductor, dynamic inputs are formatted as JSONPath expressions. To learn more about using dynamic variables, refer to Dynamic Task Inputs.
Task reuse
Since task workers typically perform a unit of work as part of a larger workflow, Conductor’s infrastructure is built to enable task reusability out of the box. Once a task is defined in Conductor, it can be reused numerous times:
- In the same workflow, using different task reference names.
- Across various workflows.
When reusing tasks, it's important to think of situations that a multi-tenant system faces. By default, all the work assigned to this worker goes into the same task queue. This could result in your worker not being polled quickly if there is a noisy neighbor in the ecosystem. You can tackle this situation by scaling up the number of workers to handle the task load, or even using task-to-domain to route the task load into separate queues.
Workflow parameters
The workflow definition contains configuration parameters that define the workflow behaviour, such as the list of task configuration, the workflow inputs and outputs, and the workflow failure policies.
Parameter | Description | Required/ Optional |
---|---|---|
name | Workflow name.
| Required. |
description | Workflow description. | Required. |
version | Workflow version. | Optional. |
tasks | Array of task configuration objects. | Required. |
inputParameters | Array of workflow input keys. | Optional. |
outputParameters | JSON template used to generate the workflow output. If unspecified, the workflow output will be the output of the last executed task. | Optional. |
enforceSchema | Whether to enforce input schema validation. Set to true to enable validation or false to disable. | Optional. |
inputSchema | The schema parameters to be used as input schema for the workflow definition. Learn more about creating and using schema. | Required if ‘enforceSchema’ is set to `true’. |
schemaVersion | The current version of the Conductor schema. Must be 2. | Required. |
restartable | Whether the workflow can be restarted after completion. The default value is true. Set to false if restarting a completed workflow causes side effects. | Optional. |
workflowStatusListenerEnabled | Whether to enable status callback for workflow state changes. Learn more about enabling CDC. The default value is false. | Optional. |
workflowStatusListenerSink | The sink where workflow state changes are sent. | Required if workflowStatusListener is set to true . |
ownerEmail | The email ID of the workflow creator. | Required. |
timeoutPolicy | The workflow timeout policy. | Optional. |
timeoutSeconds | The timeout, in seconds, after which the workflow will be set as TIMED_OUT if it hasn't reached a terminal state. Set to 0 for no timeouts. | Required. |
failureWorkflow | The compensation workflow to trigger upon failure of the current workflow execution. Useful for cleanup, notifications, or post actions. | Optional. |
rateLimitConfig | A map of the workflow rate limit configuration. | Optional. |
rateLimitKey | A unique identifier to group workflow executions for rate limiting. | Optional. |
concurrentExecLimit | The number of workflow executions that can run concurrently for a given key. | Optional. |
More on configuring workflow parameters
Refer to these topics to learn more:
- Configure a workflow status listener: Enabling CDC
- Configure a workflow’s failure-handling policy: Error Handling