Skip to main content

Dynamic Fork

The Dynamic fork task is used when the number of forks is to be determined at the run-time. Whereas in a regular fork-join task, the number of forks is defined during the workflow creation.

The tasks to be executed can be defined in two ways:

  1. Using an array to run a simple task OR HTTP task OR a sub workflow.
    1. Sub Workflows can be used when running more than one task in the fork per array item.
  2. Using input parameters.

Definitions - Using Arrays

The dynamic fork is used to run parallel executions of the task with dynamism. Think of this as Conductor’s equivalent of stream parallel processing in Java:

arrayItems.stream().parallel().forEach(item -> process(item));

Here, each array item is passed to a method called process. Conductor allows us to do the same and covers several types of processes.

  1. Simple Task - When we need to run a simple custom worker task.
  2. HTTP Task - When we need to run the system HTTP workers.
  3. Sub Workflows - Use this when we want to run more than one task or a series of steps that can be a full-fledged complex flow.
  4. Other Conductor Task Types - This can also be used for other task types such as EVENT, WAIT, etc.
Running Simple Tasks using Dynamic Fork

Run a simple task for each of the inputs provided.

AttributeDescription
forkTaskNameSpecify the name of the simple task to execute.
forkTaskInputsArray of inputs - a task will be executed for each input.

In this example, each task will be executed with the following input:

    {
"inputText" : "value1",
"inputNumber" : 1,
"index": 0
}
tip

In the task, there will be a value called index, which is inserted by the system to represent the array index for the object.

Example:

    {
"name": "dynamic_workflow_array_simple",
"description": "Dynamic workflow array - run simple task",
"tasks": [
{
"name": "dynamic_workflow_array_simple",
"taskReferenceName": "dynamic_workflow_array_simple_ref",
"inputParameters": {
"forkTaskName": "update_fruit_list_task",
"forkTaskInputs": [
{
"inputText" : "value1",
"inputNumber" : 1
},
{
"inputText" : "value2",
"inputNumber" : 2
},
{
"inputText" : "value3",
"inputNumber" : 3
}
]
},
"type": "FORK_JOIN_DYNAMIC"
},
{
"name": "dynamic_workflow_array_simple_join",
"taskReferenceName": "dynamic_workflow_array_simple_join_ref",
"type": "JOIN"
}
]
}

We can also use simple values or a mix of complex and simple objects.

    [
"apple", "orange", "kiwi"
]

When using simple values, it will be passed with the key input and an index representing the element's index in the array.

    {
"input" : "apple", // Value
"__index" : 0 // Index of the element in the source array
}

Running HTTP Tasks using Dynamic Fork

To run HTTP, we will use the same parameters as running SIMPLE tasks; as shown above, the value of forkTaskName will be HTTP, and the inputs we provide will be what the HTTP task expects.

tip

method has a default value of GET and need not be specified if the HTTP call is GET.

Example:

    {
"name": "dynamic_workflow_array_http",
"description": "Dynamic workflow array - run HTTP tasks",
"tasks": [
{
"name": "dynamic_workflow_array_http",
"taskReferenceName": "dynamic_workflow_array_http_ref",
"inputParameters": {
"forkTaskName": "HTTP",
"forkTaskInputs": [
{
"uri" : "https://orkes-api-tester.orkesconductor.com/get"
},
{
"uri" : "https://orkes-api-tester.orkesconductor.com/get",
"method" : "GET"
}
]
},
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "dynamicTasksInput"
},
{
"name": "dynamic_workflow_array_http_join",
"taskReferenceName": "dynamic_workflow_array_http_join_ref",
"type": "JOIN"
}
],
}

Running Sub Workflows using Dynamic Fork

Run a sub-workflow for each of the inputs provided.

AttributeDescription
forkTaskWorkflowSpecify the name of the sub-workflow to be executed.
forkTaskWorkflowVersionOptional version of the workflow to run.
forkTaskInputsArray of inputs - a task will be executed for each input.

note

forkTaskWorkflow - When this value is present, Conductor treats this as a dynamic fork that runs sub workflows.

Example:

    {
"name": "dynamic_workflow_array_sub_workflow",
"description": "Dynamic workflow array - run sub workflow tasks",
"tasks": [
{
"name": "dynamic_workflow_array_sub_workflow",
"taskReferenceName": "dynamic_workflow_array_sub_workflow_ref",
"inputParameters": {
"forkTaskWorkflow": "extract_user",
"forkTaskInputs": [
{
"input" : "value1"
},
{
"input" : "value2"
}
]
},
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "dynamicTasksInput"
},
{
"name": "dynamic_workflow_array_sub_workflow_join",
"taskReferenceName": "dynamic_workflow_array_sub_workflow_join_ref",
"type": "JOIN"
}
]
}

Definitions - Using Input Parameters

    {
"name": "dynamic",
"taskReferenceName": "dynamic_ref",
"inputParameters": {
"dynamicTasks": "",
"dynamicTasksInput": ""
},
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "dynamicTasksInput"
}
  • A FORK_JOIN_DYNAMIC can only have one task per fork. A sub-workflow can be utilized if there is a need for multiple tasks per fork.

Input Parameters

AttributeDescription
dynamicForkTasksParamThis JSON array lists the tasks in each fork that is to be created. Each entry corresponds to a separate fork.
dynamicForkTasksInputParamNameThis is a JSON array where the keys are the taskReferenceName for each fork, and the values are the inputParameters for each task.

The JOIN task will run after all the dynamic tasks, collecting the output for all tasks. All the tasks must be completed before the JOIN completes the fork.

Output Parameters

AttributeDescription
joinOnThis is the output configuration of the JOIN task used in conjunction with the DYNAMIC_FORK_JOIN task. The output of the JOIN task is a map, where the keys are task reference names of the tasks being joined, and the corresponding outputs of those tasks.

Examples

{
"name": "dynamic",
"taskReferenceName": "dynamic_ref",
"inputParameters": {
"dynamicTasks": [
{
"name":"image_convert_resize",
"taskReferenceName": "image_convert_resize_png_300x300_0"
// ...
},
{
"name":"image_convert_resize",
"taskReferenceName": "image_convert_resize_png_200x200_1"
// ...
}
],
"dynamicTasksInput": {
"image_convert_resize_png_300x300_0" : {
"outputWidth": 300,
"outputHeight": 300
},
"image_convert_resize_png_200x200_1" : {
"outputWidth": 200,
"outputHeight": 200
}
}
},
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "dynamicTasksInput"
}