Skip to main content

Query Processor

The Query Processor task is used to execute queries across different data sources.

Conductor supports querying from two primary sources:

  • Conductor Search API—This query type retrieves workflow execution data from the Conductor Search API using various parameters.
  • Conductor Metrics (Prometheus)—This query type retrieves workflow and task performance metrics as well as system statistics through Prometheus.

Task parameters

Configure these parameters for the Query Processor task.

ParameterDescriptionRequired/ Optional
inputParameters. queryTypeThe type of query. Supported types:
  • CONDUCTOR_API—For querying using Conductor Search API.
  • metrics—For querying using Prometheus metrics.
Required
ParameterDescriptionRequired/ Optional
inputParameters. workflowNamesThe names of the workflows to query. Can be a string or an array.Optional.
inputParameters. correlationIdsThe correlation IDs of the workflows to query. Can be a string or array.Optional.
inputParameters. statusesThe statuses of the workflows to query. Can be a string or array.Optional.
inputParameters. startTimeFromThe beginning of the start time range for the query, in minutes from the current time. For example, setting this to 15 means the query will include data starting from 15 minutes ago.Optional.
inputParameters. startTimeToThe end of the start time range for the query in minutes from the current time. Setting this to 0 means the query will include data up to the current time.Optional.
inputParameters. endTimeFromThe beginning of the end time range for the query, measured in minutes from the current time.Optional.
inputParameters. endTimeToThe end of the end time range for the query, measured in minutes from the current time.Optional.
inputParameters. freeTextFree text search parameter.Optional.

Task configuration

This is the task configuration for a Query Processor task.

 {
"name": "query_processor",
"taskReferenceName": "query_processor_ref",
"inputParameters": {
"workflowNames": [
"workflow-1"
],
"statuses": [
"FAILED"
],
"correlationIds": [],
"queryType": "CONDUCTOR_API",
"startTimeFrom": 15,
"endTimeFrom": 15,
"startTimeTo": 0,
"endTimeTo": 0,
"freeText": "your_input_here"
},
"type": "QUERY_PROCESSOR",
}

Task output

The Query Processor task will return the following parameters.

ParameterDescription
resultA key value map containing the workflow query details.
totalHitsTotal number of hits or results matching the query criteria.
countThe number of workflows returned in the current response.
workflowsUrlURL linking to the queried workflow executions in the Conductor UI, with specific query parameters.
workflowsAn array containing detailed information about each workflow returned by the query.

Adding a Query Processor task in UI

To add a Query Processor task:

  1. In your workflow, select the (+) icon and add a Query Processor task.
  2. Select the Query type as Conductor Search API or Conductor Metrics (Prometheus).
  3. For Conductor Search API, set the following parameters:
    • Workflow name
    • Correlation ids
    • Statuses
    • Start time from - to (in mins)
    • End time from - to (in mins)
    • Free text search
  4. For Conductor Metrics (Prometheus), set the following parameters:
    • PromQL code
    • Start time from - to (in mins)
    • Step

Adding Query Processor task

Examples

Here are some examples for using the Query Processor task.

CONDUCTOR_API

In this example, the Query Processor task searches the Conductor Search API for completed http_workflow workflows.

{
"name": "search_query_task",
"taskReferenceName": "search_query_task_ref",
"inputParameters": {
"workflowNames": [
"http_workflow"
],
"statuses": [
"COMPLETED"
],
"correlationIds": [],
"queryType": "CONDUCTOR_API"
},
"type": "QUERY_PROCESSOR"
}

The query will return the details of the workflow executions, such as the execution time, inputs and outputs, workflow ID, status, and so on.

{
"result": {
"totalHits": 1,
"count": 1,
"workflowsUrl": "https://cluster-name.orkesconductor.com?rowsPerPage=200&workflowType=http_workflow&start=0",
"workflows": [
{
"updateTime": "2024-08-08T09:01:52.967Z",
"priority": 0,
"version": 1,
"failedReferenceTaskNames": "",
"output": "{http_11={response={headers={Strict-Transport-Security=[max-age=15724800; includeSubDomains], Connection=[keep-alive], Content-Length=[182], Date=[Thu, 08 Aug 2024 09:01:52 GMT], Content-Type=[application/json]}, reasonPhrase=200 OK, body={randomInt=1507, hostName=orkes-api-sampler-67dfc8cf58-chmzf, randomString=uxhurkppwwkxzyjstuko, queryParams={}, sleepFor=0 ms, apiRandomDelay=0 ms, statusCode=200}, statusCode=200}}, http_22={response={headers={Strict-Transport-Security=[max-age=15724800; includeSubDomains], Connection=[keep-alive], Content-Length=[181], Date=[Thu, 08 Aug 2024 09:01:52 GMT], Content-Type=[application/json]}, reasonPhrase=200 OK, body={randomInt=338, hostName=orkes-api-sampler-67dfc8cf58-chmzf, randomString=jpoggtvjpgxieqtobrqp, queryParams={}, sleepFor=0 ms, apiRandomDelay=0 ms, statusCode=200}, statusCode=200}}",
"executionTime": 664,
"outputSize": 2531,
"input": "{}",
"failedTaskNames": [],
"inputSize": 2,
"workflowType": "http_workflow",
"startTime": "2024-08-08T09:01:52.302Z",
"endTime": "2024-08-08T09:01:52.966Z",
"workflowId": "da852039-5564-11ef-963f-56aaccfcba93",
"status": "COMPLETED"
}
]
}
}

METRICS

In this example, the Query Processor task searches the Prometheus for the number of workflows started within the past minute. The query resolution is set at an interval of 10 seconds.

{
"name": "query_processor",
"taskReferenceName": "query_processor_ref",
"inputParameters": {
"metricsQuery": "workflow_start_request_seconds_count{workflowName=\"indexed_qna_slack\"}",
"metricsStart": "1",
"metricsEnd": "0",
"metricsStep": "10",
"queryType": "METRICS"
},
"type": "QUERY_PROCESSOR"
}

The query will return the following results. The values array contains the queried metrics at each given timestamp.

{
"result": {
"data": {
"result": [
{
"metric": {
"container": "conductor",
"cluster_name": "someCluster",
"endpoint": "default-app-port",
"instance": "00.00.0.000:0000",
"pod": "somePod",
"__name__": "workflow_start_request_seconds_count",
"service": "conductor-app",
"namespace": "someNameSpace",
"workflowName": "indexed_qna_slack",
"job": "conductor-app"
},
"values": [
[
1723110950,
"1390"
],
[
1723110960,
"1390"
],
[
1723110970,
"1390"
],
[
1723110980,
"1390"
],
[
1723110990,
"1390"
],
[
1723111000,
"1390"
],
[
1723111010,
"1390"
]
]
}
],
"resultType": "matrix"
},
"status": "success"
}
}

Using Query Processor task to monitor failure workflows