Skip to main content



import ""


func WaitForWorkflowCompletionUntilTimeout

func WaitForWorkflowCompletionUntilTimeout(executionChannel WorkflowExecutionChannel, timeout time.Duration) (workflow *model.Workflow, err error)

WaitForWorkflowCompletionUntilTimeout Helper method to wait on the channel until the timeout for the workflow execution to complete

type RunningWorkflow

type RunningWorkflow struct {
WorkflowId string
WorkflowExecutionChannel WorkflowExecutionChannel
Err error
CompletedWorkflow *model.Workflow

func NewRunningWorkflow

func NewRunningWorkflow(workflowId string, workflowExecutionChannel WorkflowExecutionChannel, err error) *RunningWorkflow

func (*RunningWorkflow) WaitForCompletionUntilTimeout

func (rw *RunningWorkflow) WaitForCompletionUntilTimeout(timeout time.Duration) (workflow *model.Workflow, err error)

type WorkflowExecutionChannel

type WorkflowExecutionChannel chan *model.Workflow

type WorkflowExecutor

type WorkflowExecutor struct {
// contains filtered or unexported fields

func NewWorkflowExecutor

func NewWorkflowExecutor(apiClient *client.APIClient) *WorkflowExecutor

NewWorkflowExecutor Create a new workflow executor

func (*WorkflowExecutor) DeleteQueueConfiguration

func (e *WorkflowExecutor) DeleteQueueConfiguration(queueConfiguration queue.QueueConfiguration) (*http.Response, error)

DeleteQueueConfiguration Delete queue configuration permanently from the system Returns nil if no error occurred

func (*WorkflowExecutor) ExecuteWorkflow

func (e *WorkflowExecutor) ExecuteWorkflow(startWorkflowRequest *model.StartWorkflowRequest, waitUntilTask string) (run *model.WorkflowRun, err error)

ExecuteWorkflow start a workflow and wait until the workflow completes or the waitUntilTask completes Returns the output of the workflow

func (*WorkflowExecutor) GetByCorrelationIds

func (e *WorkflowExecutor) GetByCorrelationIds(workflowName string, includeClosed bool, includeTasks bool, correlationIds ...string) (map[string][]model.Workflow, error)

GetByCorrelationIds Given the list of correlation ids, find and return workflows Returns a map with key as correlationId and value as a list of Workflows When IncludeClosed is set to true, the return value also includes workflows that are completed otherwise only running workflows are returned

func (*WorkflowExecutor) GetQueueConfiguration

func (e *WorkflowExecutor) GetQueueConfiguration(queueConfiguration queue.QueueConfiguration) (map[string]interface{}, *http.Response, error)

GetQueueConfiguration Get queue configuration if present Returns queue configuration if present

func (*WorkflowExecutor) GetTask

func (e *WorkflowExecutor) GetTask(taskId string) (task *model.Task, err error)

GetTask by task Id returns nil if no such task is found by the id

func (*WorkflowExecutor) GetWorkflow

func (e *WorkflowExecutor) GetWorkflow(workflowId string, includeTasks bool) (*model.Workflow, error)

GetWorkflow Get workflow execution by workflow Id. If includeTasks is set, also fetches all the task details. Returns nil if no workflow is found by the id

func (*WorkflowExecutor) GetWorkflowStatus

func (e *WorkflowExecutor) GetWorkflowStatus(workflowId string, includeOutput bool, includeVariables bool) (*model.WorkflowState, error)

GetWorkflowStatus Get the status of the workflow execution. This is a lightweight method that returns only overall state of the workflow

func (*WorkflowExecutor) MonitorExecution

func (e *WorkflowExecutor) MonitorExecution(workflowId string) (workflowMonitor WorkflowExecutionChannel, err error)

MonitorExecution monitors the workflow execution Returns the channel with the execution result of the workflow Note: Channels will continue to grow if the workflows do not complete and/or are not taken out

func (*WorkflowExecutor) Pause

func (e *WorkflowExecutor) Pause(workflowId string) error

Pause the execution of a running workflow. Any tasks that are currently running will finish but no new tasks are scheduled until the workflow is resumed

func (*WorkflowExecutor) PutQueueConfiguration

func (e *WorkflowExecutor) PutQueueConfiguration(queueConfiguration queue.QueueConfiguration) (*http.Response, error)

GetQueueConfiguration Create or update a queue configuration Returns nil if no error occurred

func (*WorkflowExecutor) ReRun

func (e *WorkflowExecutor) ReRun(workflowId string, reRunRequest model.RerunWorkflowRequest) (id string, error error)

ReRun a completed workflow from a specific task (ReRunFromTaskId) and optionally change the input Also update the completed tasks with new input (ReRunFromTaskId) if required

func (*WorkflowExecutor) RegisterWorkflow

func (e *WorkflowExecutor) RegisterWorkflow(overwrite bool, workflow *model.WorkflowDef) error

RegisterWorkflow Registers the workflow on the server. Overwrites if the flag is set. If the 'overwrite' flag is not set and the workflow definition differs from the one on the server, the call will fail with response code 409

func (*WorkflowExecutor) RemoveWorkflow

func (e *WorkflowExecutor) RemoveWorkflow(workflowId string) error

RemoveWorkflow Remove workflow execution permanently from the system Returns nil if no workflow is found by the id

func (*WorkflowExecutor) Restart

func (e *WorkflowExecutor) Restart(workflowId string, useLatestDefinition bool) error

Restart a workflow execution from the beginning with the same input. When called on a workflow that is not in a terminal status, this operation has no effect If useLatestDefinition is set, the restarted workflow fetches the latest definition from the metadata store

func (*WorkflowExecutor) Resume

func (e *WorkflowExecutor) Resume(workflowId string) error

Resume the execution of a workflow that is paused. If the workflow is not paused, this method has no effect

func (*WorkflowExecutor) Retry

func (e *WorkflowExecutor) Retry(workflowId string, resumeSubworkflowTasks bool) error

Retry a failed workflow from the last task that failed. When called the task in the failed state is scheduled again and workflow moves to RUNNING status. If resumeSubworkflowTasks is set and the last failed task was a sub-workflow the server restarts the subworkflow from the failed task. If set to false, the sub-workflow is re-executed.

func (e *WorkflowExecutor) Search(start int32, size int32, query string, freeText string) ([]model.WorkflowSummary, error)

Search searches for workflows

- Start: Start index - used for pagination

- Size: Number of results to return

- Query: Query expression.  In the format FIELD = 'VALUE' or FIELD IN (value1, value2)
Only AND operations are supported. e.g. workflowId IN ('a', 'b', 'c') ADN workflowType ='test_workflow'
AND startTime BETWEEN 1000 and 2000
Supported fields for Query are:workflowId,workflowType,status,startTime
- FreeText: Full text search. All the workflow input, output and task outputs upto certain limit (check with your admins to find the size limit)
are full text indexed and can be used to search

func (*WorkflowExecutor) SkipTasksFromWorkflow

func (e *WorkflowExecutor) SkipTasksFromWorkflow(workflowId string, taskReferenceName string, skipTaskRequest model.SkipTaskRequest) error

SkipTasksFromWorkflow Skips a given task execution from a current running workflow. When skipped the task's input and outputs are updated from skipTaskRequest parameter.

func (*WorkflowExecutor) StartWorkflow

func (e *WorkflowExecutor) StartWorkflow(startWorkflowRequest *model.StartWorkflowRequest) (workflowId string, err error)

StartWorkflow Start workflows Returns the id of the newly created workflow

func (*WorkflowExecutor) StartWorkflows

func (e *WorkflowExecutor) StartWorkflows(monitorExecution bool, startWorkflowRequests ...*model.StartWorkflowRequest) []*RunningWorkflow

StartWorkflows Start workflows in bulk Returns RunningWorkflow struct that contains the workflowId, Err (if failed to start) and an execution channel which can be used to monitor the completion of the workflow execution. The channel is available if monitorExecution is set

func (*WorkflowExecutor) Terminate

func (e *WorkflowExecutor) Terminate(workflowId string, reason string) error

Terminate a running workflow. Reason must be provided that is captured as the termination resaon for the workflow.

func (e *WorkflowExecutor) TerminateWithFailure(workflowId string, reason string, triggerFailureWorkflow bool) error

Terminate a running workflow. Reason must be provided that is captured as the termination resaon for the workflow. triggerFailureWorkflow is a boolean flag which when set to true will trigger failureWorkflow upon termination, if avaliable.

func (*WorkflowExecutor) UpdateTask

func (e *WorkflowExecutor) UpdateTask(taskId string, workflowInstanceId string, status model.TaskResultStatus, output interface{}) error

UpdateTask update the task with output and status.

func (*WorkflowExecutor) UpdateTaskByRefName

func (e *WorkflowExecutor) UpdateTaskByRefName(taskRefName string, workflowInstanceId string, status model.TaskResultStatus, output interface{}) error

UpdateTaskByRefName Update the execution status and output of the task and status

func (*WorkflowExecutor) WaitForRunningWorkflowsUntilTimeout

func (e *WorkflowExecutor) WaitForRunningWorkflowsUntilTimeout(timeout time.Duration, runningWorkflows ...*RunningWorkflow)

WaitForRunningWorkflowUntilTimeout Helper method to wait for running workflows until the timeout for the workflow execution to complete

type WorkflowMonitor

type WorkflowMonitor struct {
// contains filtered or unexported fields

func NewWorkflowMonitor

func NewWorkflowMonitor(workflowClient *client.WorkflowResourceApiService) *WorkflowMonitor