Reader
Read messages from Pulsar topic(s) without subscription.
type: "io.kestra.plugin.pulsar.Reader"Examples
id: pulsar_reader
namespace: company.team
tasks:
- id: reader
type: io.kestra.plugin.pulsar.Reader
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
Properties
deserializer *Requiredstring
STRINGSTRINGJSONBYTESDeserializer used for the value.
topic *Requiredobject
Pulsar topic(s) where to consume messages from.
Can be a string or a list of strings to consume from multiple topics.
uri *Requiredstring
Connection URLs.
You need to specify a Pulsar protocol URL.
- Example of localhost:
pulsar://localhost: 6650 - If you have multiple brokers:
pulsar://localhost: 6650,localhost: 6651,localhost: 6652 - If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com: 6651
authenticationToken string
Authentication token.
Authentication token that can be required by some providers such as Clever Cloud.
maxDuration string
durationThe maximum duration waiting for new record.
It's not a hard limit and is evaluated every second.
maxRecords integerstring
The maximum number of records to fetch before stopping.
It's not a hard limit and is evaluated every second.
messageId string
Position the reader on a particular message.
The first message read will be the one immediately after the specified message.
If no since or messageId are provided, we start at the beginning of the topic.
pollDuration string
PT2SdurationDuration waiting for record to be polled.
If no records are available, the maximum wait to wait for a new record.
schemaString string
JSON string of the topic's schema
Required for connecting with topics with a defined schema and strict schema checking
schemaType string
NONENONEAVROJSONThe schema type of the topic
Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.
since string
durationThe initial reader positioning can be set at specific timestamp by providing total rollback duration.
So, broker can find a latest message that was published before given duration. eg: since set to 5 minutes (PT5M) indicates that broker should find message published 5 minutes in the past, and set the initial position to that messageId.
tlsOptions Non-dynamicAbstractPulsarConnection-TlsOptions
TLS authentication options.
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
Outputs
messagesCount integer
Number of messages consumed.
uri string
uriURI of a Kestra internal storage file containing the consumed messages.
Definitions
io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions
ca string
The ca certificate.
Must be a base64-encoded pem file.
cert string
The client certificate.
Must be a base64-encoded pem file.
key string
The key certificate.
Must be a base64-encoded pem file.