Skip to main content

Query Processor

The Query Processor task is used to execute queries across different data sources.

Conductor supports querying from two primary sources:

  • Conductor Search API—This query type retrieves workflow execution data from the Conductor Search API using various parameters.
  • Conductor Metrics (Prometheus)—This query type retrieves workflow and task performance metrics as well as system statistics through Prometheus.

Task parameters

Configure these parameters for the Query Processor task.

ParameterDescriptionRequired/ Optional
inputParameters. queryTypeThe type of query. Supported types:
  • CONDUCTOR_API—For querying using Conductor Search API.
  • metrics—For querying using Prometheus metrics.
Required
ParameterDescriptionRequired/ Optional
inputParameters. workflowNamesThe names of the workflows to query. Can be a string or an array.Optional.
inputParameters. correlationIdsThe correlation IDs of the workflows to query. Can be a string or an array.Optional.
inputParameters. statusesThe statuses of the workflows to query. Can be a string or an array. Supported values:
  • RUNNING
  • COMPLETED
  • FAILED
  • TIMED_OUT
  • TERMINATED
  • PAUSED
Optional.
inputParameters. startTimeFromThe beginning of the start time range for the query, in minutes from the current time. For example, setting this to 15 means the query will include data starting from 15 minutes ago.Optional.
inputParameters. startTimeToThe end of the start-time range for the query in minutes from the current time. Setting this to 0 means the query will include data up to the current time.Optional.
inputParameters. endTimeFromThe beginning of the end-time range for the query, measured in minutes from the current time.Optional.
inputParameters. endTimeToThe end of the end time range for the query, measured in minutes from the current time.Optional.
inputParameters. freeTextFree text search parameter.Optional.

The following are generic configuration parameters that can be applied to the task and are not specific to the Query Processor task.

Schema parameters

You can enforce input/output validation for the task using the following parameters. Refer to Schema Validation for a full guide.

ParameterDescriptionRequired/ Optional
taskDefinition.enforceSchemaWhether to enforce schema validation for task inputs/outputs. Set to true to enable validation.Optional.
taskDefinition.inputSchemaThe name and type of the input schema to be associated with the task.Required if enforceSchema is set to true.
taskDefinition.outputSchemaThe name and type of the output schema to be associated with the task.Required if enforceSchema is set to true.

Task configuration

This is the task configuration for a Query Processor task.

{
"name": "query_processor",
"taskReferenceName": "query_processor_ref",
"inputParameters": {
"workflowNames": [
"<WORKFLOW-NAME>"
],
"statuses": [
"<STATUS>"
],
"correlationIds": [],
"queryType": "CONDUCTOR_API",
"startTimeFrom": 15,
"endTimeFrom": 15,
"startTimeTo": 0,
"endTimeTo": 0,
"freeText": "your_input_here"
},
"type": "QUERY_PROCESSOR"
}

Task output

The Query Processor task will return the following parameters.

ParameterDescription
resultA key value map containing the workflow query details.
totalHitsTotal number of hits or results matching the query criteria.
countThe number of workflows returned in the current response.
workflowsUrlURL linking to the queried workflow executions in the Conductor UI, with specific query parameters.
workflowsAn array containing detailed information about each workflow returned by the query.

Adding a Query Processor task in UI

To add a Query Processor task:

  1. In your workflow, select the (+) icon and add a Query Processor task.
  2. Select the Query type as Conductor Search API or Conductor Metrics (Prometheus).
  3. For Conductor Search API, set the following parameters:
    • Workflow name
    • Correlation ids
    • Statuses
    • Start time from - to (in mins)
    • End time from - to (in mins)
    • Free text search
  4. For Conductor Metrics (Prometheus), set the following parameters:
    • PromQL code
    • Start time from - to (in mins)
    • Step

Adding Query Processor task

Examples

Here are some examples for using the Query Processor task.

Using CONDUCTOR_API

This example shows how to use the Query Processor task to retrieve completed executions of a workflow by querying the Conductor Search API.

You will build and run a workflow that queries completed executions of a particular workflow from the last 15 minutes.

Prerequisites
  • An existing workflow that needs to be queried.
  • At least one completed execution of the workflow within the last 15 minutes

Step 1 : Create a workflow in Orkes Conductor

  1. Go to Definitions > Workflow, from the left navigation menu on your Conductor cluster.
  2. Select + Define workflow.
  3. In the Code tab, paste the following code.
{
"name": "WorkflowTest",
"description": "Sample workflow",
"version": 1,
"tasks": [
{
"name": "query_processor",
"taskReferenceName": "query_processor_ref",
"inputParameters": {
"workflowNames": [
"<YOUR-WORKFLOW-NAME-TO-BE-MONITORED>"
],
"statuses": [
"COMPLETED"
],
"correlationIds": [],
"queryType": "CONDUCTOR_API",
"startTimeFrom": 15,
"startTimeTo": 0
},
"type": "QUERY_PROCESSOR"
}
],
"schemaVersion": 2
}
  1. Save the workflow.

Replace <YOUR-WORKFLOW-NAME-TO-BE-MONITORED> with the name of the workflow you want to query. In this example, the Query Processor task retrieves completed executions of WorkflowTest from the last 15 minutes.

Select Execute to run the workflow. When the workflow runs, the Query Processor task queries the Conductor Search API and returns execution details for matching workflows.

The output includes information such as:

  • Workflow ID
  • Execution status
  • Start and end times
  • Execution duration
  • Workflow inputs and outputs

Example using Conductor API method

Using METRICS

In this example, the Query Processor task searches the Prometheus for the number of workflows started within the past minute. The query resolution is set at an interval of 10 seconds.

Task Configuration

{
"name": "query_processor",
"taskReferenceName": "query_processor_ref",
"inputParameters": {
"metricsQuery": "workflow_start_request_seconds_count{workflowName=\"indexed_qna_slack\"}",
"metricsStart": "1",
"metricsEnd": "0",
"metricsStep": "10",
"queryType": "METRICS"
},
"type": "QUERY_PROCESSOR"
}

Example Output

The query returns a Prometheus time series response. The values array contains the queried metrics at each given timestamp.

{
"result": {
"data": {
"result": [
{
"metric": {
"container": "conductor",
"cluster_name": "someCluster",
"endpoint": "default-app-port",
"instance": "00.00.0.000:0000",
"pod": "somePod",
"__name__": "workflow_start_request_seconds_count",
"service": "conductor-app",
"namespace": "someNameSpace",
"workflowName": "indexed_qna_slack",
"job": "conductor-app"
},
"values": [
[1723110950, "1390"],
[1723110960, "1390"],
[1723110970, "1390"],
[1723110980, "1390"],
[1723110990, "1390"],
[1723111000, "1390"],
[1723111010, "1390"]
]
}
],
"resultType": "matrix"
},
"status": "success"
}
}

Using Query Processor task to monitor failure workflows