Table of Contents

Share on:

Share on LinkedInShare on FacebookShare on Twitter
Playground Orkes

Ready to build reliable applications 10x faster?

PRODUCT

Orkes Operators: Branching and Conditionals

Karl Goeltner
Software Engineer
April 28, 2025
6 min read

Smart workflows don’t just follow a straight line—they make decisions.

Orkes Conductor provides a set of declarative operators that let your workflows respond to input, handle conditions, and dynamically route execution without hardcoding logic into your services.

In this article, we’ll walk through four core decision-making operators:

  • Switch for branching logic based on runtime conditions
  • Dynamic for generating tasks on the fly based on data
  • Set Variable for setting and updating internal state during execution
  • Terminate for gracefully ending workflows early when needed

With these operators, you can model flexible, maintainable decision paths that react to real-world complexity—all without writing imperative glue code.

Switch

The Switch task is a control flow operator that enables conditional branching within a workflow, similar to a switch-case or if-else statement in code. It evaluates an expression at runtime and routes execution to a matching case, with support for a default path if no match is found. This makes it ideal for directing workflow logic based on dynamic values such as user input, AI-powered decisions, or status flags.

Example implementation

Let’s say you're building an order processing workflow that needs to handle different shipping methods. Based on the shippingType selected by the user (standard, express, or overnight), the workflow should branch into a different fulfillment path. A Switch task can evaluate the shippingType and route the workflow accordingly.

Here’s the workflow visualized:

High-level diagram of the Switch workflow vs the actual workflow diagram in Conductor.
Workflow using a Switch operator.

Here’s the code snippet for creating the workflow in code:

def register_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
    # 1) Task to fetch order details
    fetch_order_details = HttpTask(
        task_ref_name="fetch_order_details",
        http_input={
            "uri": "https://jsonplaceholder.typicode.com/posts/${workflow.input.orderId}",
            "method": "GET",
            "headers": {
                "Content-Type": "application/json"
            }
        }
    )

    # 2) SetVariable tasks for each shipping method
    standard_shipping = SetVariableTask(task_ref_name="standard_shipping_var")
    standard_shipping.input_parameters.update({
        "selected_shipping_method": "standard"
    })

    express_shipping = SetVariableTask(task_ref_name="express_shipping_var")
    express_shipping.input_parameters.update({
        "selected_shipping_method": "express"
    })

    overnight_shipping = SetVariableTask(task_ref_name="overnight_shipping_var")
    overnight_shipping.input_parameters.update({
        "selected_shipping_method": "overnight"
    })

    # 3) Define the Switch task to route based on shippingType
    switch_shipping = SwitchTask(
        task_ref_name="switch_shipping_method",
        case_expression="${workflow.input.shippingType}"
    )
    switch_shipping \
        .switch_case("standard", [standard_shipping]) \
        .switch_case("express", [express_shipping]) \
        .switch_case("overnight", [overnight_shipping]) \
        .default_case([])

    workflow = ConductorWorkflow(
        name="order_processing_workflow",
        executor=workflow_executor
    )
    workflow.version = 1
    workflow.add(fetch_order_details)
    workflow.add(switch_shipping)

    workflow.register(True)

    return workflow

Check out the full sample code for the Switch Workflow.

Dynamic

The Dynamic task in Orkes Conductor enables workflows to determine and execute tasks at runtime based on input data, rather than defining them statically in the workflow definition. This is especially useful for cases where the number or type of tasks isn’t known in advance—such as processing a variable-length list of items or executing a dynamic sequence of operations. The task determines a llist of task(s) to be executed, allowing workflows to adapt their structure on the fly while remaining fully managed and observable within Conductor.

Example implementation

In a shipping workflow, the courier used to fulfill an order—such as FedEx or UPS—depends on the destination address, which isn’t known until runtime. A Dynamic task allows the workflow to decide which shipping task to execute on the fly, based on the result of a previous task that determines the best courier. For instance, if the shipping_info task outputs "ship_via_fedex", the dynamic task will execute the ship_via_fedex task; if it returns "ship_via_ups", the workflow dynamically routes to ship_via_ups. This eliminates hardcoded branching and makes the workflow adaptable to real-time decision-making.

Here’s the workflow visualized:

High-level diagram of the Dynamic workflow vs the actual workflow diagram in Conductor.
Workflow using a Dynamic operator.

Here’s the code snippet for creating the workflow in code:

def register_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
    # 1) Task to simulate courier selection
    shipping_info = HttpTask(
        task_ref_name="shipping_info_ref",
        http_input={
            "uri": "https://jsonplaceholder.typicode.com/posts",
            "method": "POST",
            "headers": {
                "Content-Type": "application/json"
            },
            "body": {
                "order_id": "${workflow.input.order_id}",
                "destination": "${workflow.input.address}"
            }
        }
    )

    # 2) Dynamic task that picks the shipping method based on workflow input
    dynamic_shipping = DynamicTask(
        dynamic_task="${workflow.input.shipping_service}",
        task_reference_name="dynamic_shipping"
    )

    workflow = ConductorWorkflow(
        name="Shipping_Flow",
        executor=workflow_executor
    )
    workflow.version = 1
    workflow.add(shipping_info)
    workflow.add(dynamic_shipping)
    workflow.register(overwrite=True)

    return workflow


# Worker for FedEx
@worker_task(task_definition_name="ship_via_fedex")
def ship_via_fedex() -> dict:
    print("📦 Shipping via FedEx")
    return {"status": "FedEx shipment created"}


# Worker for UPS
@worker_task(task_definition_name="ship_via_ups")
def ship_via_ups() -> dict:
    print("📦 Shipping via UPS")
    return {"status": "UPS shipment created"}

Check out the full sample code for the Dynamic Workflow.

Set Variable

The Set Variable task in Orkes Conductor is used to define or update variables within the scope of a workflow execution. It allows you to store values—such as computed results, flags, or counters—that can be referenced later by other tasks. This task is especially useful for tracking workflow state, managing intermediate data, or preparing inputs for conditional logic or dynamic tasks. Variables set using this task are stored in the workflow’s context and persist throughout its execution.

Example implementation

In an e-commerce order processing workflow, the Set Variable task is used to store intermediate values like the subtotal, discount, and tax during price calculation. For instance, after the calculate_subtotal task, a Set Variable task updates the subtotal variable, which is then modified by the apply_discount task and used in the final price calculation. These variables are passed to subsequent tasks like process_payment, ensuring the correct amount is charged.

Here’s the workflow visualized:

High-level diagram of the Set Variable workflow vs the actual workflow diagram in Conductor.
Workflow using a Set Variable operator.

Here’s the code snippet for creating the workflow in code:

def register_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
    # 1) Calculate subtotal
    calc_subtotal = SimpleTask(
        task_def_name="calculate_subtotal",
        task_reference_name="calculate_subtotal"
    )

    # 2) Store subtotal
    set_subtotal = SetVariableTask(task_ref_name="set_subtotal")
    set_subtotal.input_parameters.update({
        "subtotal": "${calculate_subtotal.output.subtotal}"
    })

    # 3) Apply discount
    apply_discount = SimpleTask(
        task_def_name="apply_discount",
        task_reference_name="apply_discount"
    )

    # 4) Store discounted price
    set_discounted_price = SetVariableTask(task_ref_name="set_discounted_price")
    set_discounted_price.input_parameters.update({
        "discounted_price": "${apply_discount.output.discounted_total}"
    })

    # 5) Process payment
    process_payment = SimpleTask(
        task_def_name="process_payment",
        task_reference_name="process_payment",
        input_parameters={
            "amount": "${workflow.variables.discounted_price}"
        }
    )

    workflow = ConductorWorkflow(
        name="ecommerce_order_processing",
        executor=workflow_executor
    )
    workflow.version = 1
    workflow.add(calc_subtotal)
    workflow.add(set_subtotal)
    workflow.add(apply_discount)
    workflow.add(set_discounted_price)
    workflow.add(process_payment)
    workflow.register(overwrite=True)

    return workflow


@worker_task(task_definition_name="calculate_subtotal")
def calculate_subtotal() -> dict:
    print("🧮 Calculating subtotal...")
    return {"subtotal": 200.0}


@worker_task(task_definition_name="apply_discount")
def apply_discount() -> dict:
    print("🏷️ Applying discount...")
    return {"discounted_total": 180.0}


@worker_task(task_definition_name="process_payment")
def process_payment(amount) -> dict:
    print(f"💰 Charging customer: {amount.input_data['amount']}")
    return {"status": "Payment successful"}

Check out the full sample code for the Set Variable Workflow.

Terminate

The Terminate task in Orkes Conductor allows you to immediately end a workflow, either successfully or with a failure, before all tasks are completed. This is useful for scenarios where certain conditions are met, such as an error or a manual intervention, that require halting the workflow early. The task can be configured with an optional message, like a termination reason, providing context. This helps ensure that workflows end gracefully, preventing unnecessary tasks from being executed when they are no longer needed.

Example implementation

In a shipping workflow, the Terminate task ends the workflow if an invalid shipping provider is provided. During the workflow, a decision is made to ship with a specific provider based on user input. If the input does not match the available options (e.g., FedEx or UPS), the workflow enters the default case of a Switch task and triggers a Terminate task, halting the workflow with a "FAILED" status and a relevant reason.

Here’s the workflow visualized:

High-level diagram of the Terminate workflow vs the actual workflow diagram in Conductor.
Workflow using a Terminate operator.

Here’s the code snippet for creating the workflow in code:

def register_shipping_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
    # 1) Decide based on shipping provider
    switch_provider = SwitchTask(
        task_ref_name="switch_provider",
        case_expression="${workflow.input.shipping_provider}"
    )

    # 2a) FedEx path
    fedex_task = SimpleTask(
        task_def_name="ship_with_fedex",
        task_reference_name="ship_with_fedex"
    )

    # 2b) UPS path
    ups_task = SimpleTask(
        task_def_name="ship_with_ups",
        task_reference_name="ship_with_ups"
    )

    # 3) Default path — Terminate the workflow if the provider is invalid
    terminate_task = TerminateTask(
        task_ref_name="terminate_invalid_provider",
        status=WorkflowStatus.FAILED,
        termination_reason="${workflow.input.termination_reason}"
    )

    # 4) Configure Switch task cases
    switch_provider.switch_case("FEDEX", [fedex_task])
    switch_provider.switch_case("UPS", [ups_task])
    switch_provider.default_case([terminate_task])

    workflow = ConductorWorkflow(
        name="shipping_workflow_with_validation",
        executor=workflow_executor
    )
    workflow.version = 1
    workflow.add(switch_provider)
    workflow.register(overwrite=True)

    return workflow


@worker_task(task_definition_name="ship_with_fedex")
def ship_with_fedex() -> dict:
    print("📦 Shipping with FedEx...")
    return {"status": "Shipped via FedEx"}


@worker_task(task_definition_name="ship_with_ups")
def ship_with_ups() -> dict:
    print("📦 Shipping with UPS...")
    return {"status": "Shipped via UPS"}

Check out the full sample code for the Terminate Workflow.

Wrap up

Branching and conditionals bring workflows to life, allowing them to adapt in real time based on input, state, or external data. With tasks like Switch, Dynamic, SetVariable, and Terminate, Conductor makes decision logic clear, declarative, and maintainable. No tangled logic, no hidden conditionals—just clean, visible control flow.

Next up:

Orkes Conductor is an enterprise-grade Unified Application Platform for process automation, API and microservices orchestration, agentic workflows, and more. Check out the full set of features, or try it yourself using our free Developer Playground.

Related Blogs

Control the Flow: Building Dynamic Workflows with Orkes Operators

Apr 28, 2025

Control the Flow: Building Dynamic Workflows with Orkes Operators