 RealtimeTrigger
RealtimeTrigger
Trigger a flow on message consumption in real-time from an AWS SQS queue, creating one execution per message.
If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.aws.sqs.Trigger instead.
type: "io.kestra.plugin.aws.sqs.RealtimeTrigger"Examples
Consume a message from an SQS queue in real-time.
id: sqs
namespace: company.team
tasks:
- id: log
  type: io.kestra.plugin.core.log.Log
  message: "{{ trigger.data }}"
triggers:
- id: realtime_trigger
  type: io.kestra.plugin.aws.sqs.RealtimeTrigger
  accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
  secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
  region: "{{ secret('AWS_DEFAULT_REGION') }}"
  queueUrl: https://sqs.eu-central-1.amazonaws.com/000000000000/test-queue
Use AWS SQS Realtime Trigger to push events into DynamoDB
id: sqs_realtime_trigger
namespace: company.team
tasks:
  - id: insert_into_dynamoDB
    type: io.kestra.plugin.aws.dynamodb.PutItem
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
    region: eu-central-1
    tableName: orders
    item:
      order_id: "{{ trigger.data | jq('.order_id') | first }}"
      customer_name: "{{ trigger.data | jq('.customer_name') | first }}"
      customer_email: "{{ trigger.data | jq('.customer_email') | first }}"
      product_id: "{{ trigger.data | jq('.product_id') | first }}"
      price: "{{ trigger.data | jq('.price') | first }}"
      quantity: "{{ trigger.data | jq('.quantity') | first }}"
      total: "{{ trigger.data | jq('.total') | first }}"
triggers:
  - id: realtime_trigger
    type: io.kestra.plugin.aws.sqs.RealtimeTrigger
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
    region: eu-central-1
    queueUrl: https://sqs.eu-central-1.amazonaws.com/000000000000/orders
    serdeType: JSON
Properties
queueUrl *Requiredstring
The SQS queue URL. The queue must already exist.
accessKeyId string
Access Key Id in order to connect to AWS.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
clientRetryMaxAttempts integerstring
3The maximum number of attempts used by the SQS client's retry strategy.
conditions Non-dynamicDateTimeBetweenDayWeekDayWeekInMonthExecutionFlowExecutionLabelsExecutionNamespaceExecutionOutputsExecutionStatusExpressionFlowConditionFlowNamespaceConditionHasRetryAttemptMultipleConditionNotOrPublicHolidayTimeBetweenWeekend
List of conditions in order to limit the flow trigger.
endpointOverride string
The endpoint with which the SDK should communicate.
This property allows you to use a different S3 compatible storage backend.
maxNumberOfMessage integerstring
5The maximum number of messages returned from request made to SQS.
Increasing this value can reduce the number of requests made to SQS. Amazon SQS never returns more messages than this value (fewer messages might be returned). Valid values: 1 to 10. Setting this value to 1 would increase your AWS cost and latency because it requires more API requests to SQS. Note that Realtime Triggers always create one execution per message, regardless of the value of this property.
region string
AWS region with which the SDK should communicate.
secretKeyId string
Secret Key Id in order to connect to AWS.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
serdeType string
STRINGSTRINGJSONThe serializer/deserializer to use.
sessionToken string
AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stopAfter Non-dynamicarray
CREATEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTList of execution states after which a trigger should be stopped (a.k.a. disabled).
stsEndpointOverride string
The AWS STS endpoint with which the SDKClient should communicate.
stsRoleArn string
AWS STS Role.
The Amazon Resource Name (ARN) of the role to assume. If set the task will use the StsAssumeRoleCredentialsProvider. If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stsRoleExternalId string
AWS STS External Id.
A unique identifier that might be required when you assume a role in another account. This property is only used when an stsRoleArn is defined.
stsRoleSessionDuration string
PT15MdurationAWS STS Session duration.
The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an stsRoleArn is defined.
stsRoleSessionName string
AWS STS Session name.
This property is only used when an stsRoleArn is defined.
waitTime string
PT20SdurationThe duration for which the SQS client waits for a message.
Outputs
data *Requiredstring
The message data.
deduplicationId string
The message deduplication ID.
delaySeconds integer
The message delay in seconds.
groupId string
The message group ID.
Definitions
io.kestra.core.models.triggers.TimeWindow
deadline string
partial-timeSLA daily deadline
Use it only for DAILY_TIME_DEADLINE SLA.
endTime string
partial-timeSLA daily end time
Use it only for DAILY_TIME_WINDOW SLA.
startTime string
partial-timeSLA daily start time
Use it only for DAILY_TIME_WINDOW SLA.
type string
DURATION_WINDOWDAILY_TIME_DEADLINEDAILY_TIME_WINDOWDURATION_WINDOWSLIDING_WINDOWThe type of the SLA
The default SLA is a sliding window (DURATION_WINDOW) with a window of 24 hours.
window string
durationThe duration of the window
Use it only for DURATION_WINDOW or SLIDING_WINDOW SLA.
See ISO_8601 Durations for more information of available duration value.
The start of the window is always based on midnight except if you set windowAdvance parameter. Eg if you have a 10 minutes (PT10M) window,
the first window will be 00: 00 to 00: 10 and a new window will be started each 10 minutes
windowAdvance string
durationThe window advance duration
Use it only for DURATION_WINDOW SLA.
Allow to specify the start time of the window
Eg: you want a window of 6 hours (window=PT6H), by default the check will be done between: 00: 00 and 06: 00, 06: 00 and 12: 00, 12: 00 and 18: 00, and 18: 00 and 00: 00.
If you want to check the window between 03: 00 and 09: 00, 09: 00 and 15: 00, 15: 00 and 21: 00, and 21: 00 and 3: 00, you will have to shift the window of 3 hours by settings windowAdvance: PT3H
Condition for a specific flow of an execution.
flowId *Requiredstring
The flow id.
namespace *Requiredstring
The namespace of the flow.
type *Requiredobject
Condition for a flow namespace.
namespace *Requiredstring
The namespace of the flow or the prefix if prefix is true.
type *Requiredobject
prefix boolean
falseIf we must look at the flow namespace by prefix (checked using startsWith). The prefix is case sensitive.
Condition for a specific flow. Note that this condition is deprecated, use `io.kestra.plugin.core.condition.ExecutionFlow` instead.
flowId *Requiredstring
The flow id.
namespace *Requiredstring
The namespace of the flow.
type *Requiredobject
Condition to allow events between two specific times.
type *Requiredobject
after string
timeThe time to test must be after this one.
Must be a valid ISO 8601 time with offset.
before string
timeThe time to test must be before this one.
Must be a valid ISO 8601 time with offset.
date string
{{ trigger.date }}The time to test.
Can be any variable or any valid ISO 8601 time. By default, it will use the trigger date.
Condition that checks labels of an execution.
labels *Requiredarrayobject
List of labels to match in the execution.
type *Requiredobject
Condition based on the outputs of an upstream execution.
expression *Requiredbooleanstring
type *Requiredobject
Condition to allow events on weekend.
type *Requiredobject
date string
{{ trigger.date }}The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
Condition to have at least one condition validated.
conditions *RequiredDateTimeBetweenDayWeekDayWeekInMonthExecutionFlowExecutionLabelsExecutionNamespaceExecutionOutputsExecutionStatusExpressionFlowConditionFlowNamespaceConditionHasRetryAttemptMultipleConditionNotOrPublicHolidayTimeBetweenWeekend
1The list of conditions to validate.
If any condition is true, it will allow the event's execution.
type *Requiredobject
Condition for an execution namespace.
namespace *Requiredstring
String against which to match the execution namespace depending on the provided comparison.
type *Requiredobject
comparison string
EQUALSPREFIXSUFFIXComparison to use when checking if namespace matches. If not provided, it will use EQUALS by default.
prefix booleanstring
falseWhether to look at the flow namespace by prefix. Shortcut for comparison: PREFIX.
Only used when comparison is not set
Run a flow if the list of preconditions is met in a time window.
conditions *Requiredobject
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1A unique id for the condition
type *Requiredobject
resetOnSuccess boolean
trueWhether to reset the evaluation results of SLA conditions after a first successful evaluation within the given time period.
By default, after a successful evaluation of the set of SLA conditions, the evaluation result is reset, so, the same set of conditions needs to be successfully evaluated again in the same time period to trigger a new execution.
This means that to create multiple executions, the same set of conditions needs to be evaluated to true multiple times.
You can disable this by setting this property to false so that, within the same period, each time one of the conditions is satisfied again after a successful evaluation, it will trigger a new execution.
timeWindow TimeWindow
{
  "type": "DURATION_WINDOW"
}Define the time period (or window) for evaluating preconditions.
You can set the type of sla to one of the following values:
- DURATION_WINDOW: this is the default- type. It uses a start time (- windowAdvance) and end time (- window) that are moving forward to the next interval whenever the evaluation time reaches the end time, based on the defined duration- window. For example, with a 1-day window (the default option:- window: PT1D), the SLA conditions are always evaluated during 24h starting at midnight (i.e. at time 00: 00: 00) each day. If you set- windowAdvance: PT6H, the window will start at 6 AM each day. If you set- windowAdvance: PT6Hand you also override the- windowproperty to- PT6H, the window will start at 6 AM and last for 6 hours — as a result, Kestra will check the SLA conditions during the following time periods: 06: 00 to 12: 00, 12: 00 to 18: 00, 18: 00 to 00: 00, and 00: 00 to 06: 00, and so on.
- SLIDING_WINDOW: this option also evaluates SLA conditions over a fixed time- window, but it always goes backward from the current time. For example, a sliding window of 1 hour (- window: PT1H) will evaluate executions for the past hour (so between now and one hour before now). It uses a default window of 1 day.
- DAILY_TIME_DEADLINE: this option declares that some SLA conditions should be met "before a specific time in a day". With the string property- deadline, you can configure a daily cutoff for checking conditions. For example,- deadline: "09: 00: 00"means that the defined SLA conditions should be met from midnight until 9 AM each day; otherwise, the flow will not be triggered.
- DAILY_TIME_WINDOW: this option declares that some SLA conditions should be met "within a given time range in a day". For example, a window from- startTime: "06: 00: 00"to- endTime: "09: 00: 00"evaluates executions within that interval each day. This option is particularly useful for declarative definition of freshness conditions when building data pipelines. For example, if you only need one successful execution within a given time range to guarantee that some data has been successfully refreshed in order for you to proceed with the next steps of your pipeline, this option can be more useful than a strict DAG-based approach. Usually, each failure in your flow would block the entire pipeline, whereas with this option, you can proceed with the next steps of the pipeline as soon as the data is successfully refreshed at least once within the given time range.
Condition to exclude other conditions.
conditions *RequiredDateTimeBetweenDayWeekDayWeekInMonthExecutionFlowExecutionLabelsExecutionNamespaceExecutionOutputsExecutionStatusExpressionFlowConditionFlowNamespaceConditionHasRetryAttemptMultipleConditionNotOrPublicHolidayTimeBetweenWeekend
1The list of conditions to exclude.
If any condition is true, it will prevent the event's execution.
type *Requiredobject
Condition to execute tasks on a specific day of the week relative to the current month (first, last, ...)
dayInMonth *Requiredstring
FIRSTLASTSECONDTHIRDFOURTHAre you looking for the first or the last day in the month?
dayOfWeek *Requiredstring
MONDAYTUESDAYWEDNESDAYTHURSDAYFRIDAYSATURDAYSUNDAYThe day of week.
type *Requiredobject
date string
{{ trigger.date }}The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
Condition based on variable expression.
expression *Requiredstring
type *Requiredobject
Condition to allow events on a particular day of the week.
dayOfWeek *Requiredstring
MONDAYTUESDAYWEDNESDAYTHURSDAYFRIDAYSATURDAYSUNDAYThe day of week.
type *Requiredobject
date string
{{ trigger.date }}The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
Condition based on execution status.
type *Requiredobject
in array
CREATEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTList of states that are authorized.
notIn array
CREATEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTList of states that aren't authorized.
Condition to allow events between two specific datetime values.
type *Requiredobject
after string
date-timeThe date to test must be after this one.
Must be a valid ISO 8601 datetime with the zone identifier (use 'Z' for the default zone identifier).
before string
date-timeThe date to test must be before this one.
Must be a valid ISO 8601 datetime with the zone identifier (use 'Z' for the default zone identifier).
date string
{{ trigger.date }}The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
Condition that matches if any taskRun has retry attempts.
type *Requiredobject
in array
CREATEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTList of states that are authorized.
notIn array
CREATEDRUNNINGPAUSEDRESTARTEDKILLINGSUCCESSWARNINGFAILEDKILLEDCANCELLEDQUEUEDRETRYINGRETRIEDSKIPPEDBREAKPOINTList of states that aren't authorized.
Condition to allow events on public holidays.
type *Requiredobject
country string
ISO 3166-1 alpha-2 country code. If not set, it uses the country code from the default locale.
It uses the Jollyday library for public holiday calendar that supports more than 70 countries.
date string
{{ trigger.date }}The date to test.
Can be any variable or any valid ISO 8601 datetime. By default, it will use the trigger date.
subDivision string
ISO 3166-2 country subdivision (e.g., provinces and states) code.
It uses the Jollyday library for public holiday calendar that supports more than 70 countries.
