natsin Processor

The natsin processor is used to subscribe to messages from NATS.

There are two types of natsin processors:

  • natsin_pull - used for JetStream to pull messages from a Stream via a Consumer.
  • natsin_push - used for both Core and Jetstream to have messages pushed.

natsin_pull Settings

SettingDescription
service.type=natsin_pullset the processor type
schedulehow often to check for new messages
conn.namethe name of the nats connection to use
create.streamat startup, create (or update) the stream if it doesn’t exist
create.consumerat startup, create (or update) the consumer if it doesn’t exist
js.prefixset the js prefix
log.headerslist of headers from a message to log. use * to log them all
message.filterboolean statement to filter messages based on header values - see below for example

Example message.filter

message.filter = "content-source = 'sales'"     \
                 " AND content-type = 'order'"  \
                 " AND Field01 = 'invoice'"     \
                 " AND Field02 != 'NA'"         \
                 " AND Field03 != 'N'"          \
                 " AND Field04 in ('x', 'y', 'z')"

natsin_push Settings

natsin_push can be used for JetStream as well as Core for request/reply messaging.

SettingDescription
service.type=natsin_pushset the processor type
schedulehow often to check for new messages
conn.namethe name of the nats connection to use
create.streamat startup, create (or update) the stream if it doesn’t exist
create.consumerat startup, create (or update) the consumer if it doesn’t exist
js.prefixset the js prefix
log.headerslist of headers from a message to log. use * to log them all
message.filterboolean statement to filter messages based on header values - see below for example

Create Streams and Consumers at Dynamically Runtime

You can have TEDI automatically create the Streams and Consumers for you.

Stream SettingDescription
js.stream.namethe name of the stream
js.stream.descriptionshort description
js.stream.subjectsfilter subjects
js.stream.retentionretention policy
js.stream.max_consumerslimit consumers
js.stream.max_msgslimit messages
js.stream.max_byteslimit Stream size
js.stream.discard_policyset the discard policy
js.stream.max_ageage of the messages
js.stream.max_msgs_per_subjectlimit messages by subject
js.stream.max_msg_sizelimit message size
js.stream.storagestorage type
js.stream.num_replicasreplica count
js.stream.no_ackdo not require a message acknowledgements
js.stream.duplicate_windowdupe checking
js.stream.placement.clustercluster name to place stream
js.stream.placement.tagsset cluster placement tags

Consumer SettingDescription
js.consumer.stream_namename of the Stream
js.consumer.durable_namename of the Consumer
js.consumer.descriptionshort description
js.consumer.deliver_policyset the deliver policy
js.consumer.opt_start_seqset the starting sequence number
js.consumer.opt_start_time_layoutstart-time date format
js.consumer.opt_start_time_valuestart date/time
js.consumer.ack_policyset the ack policy
js.consumer.ack_waitmax time to wait for acks
js.consumer.max_delivermax deliver attempts
js.consumer.replay_policyset the replay policy
js.consumer.filter_subjectset the filter subjects
js.consumer.sample_freqtelemetry sampling frequency
js.consumer.max_waitingdefines the max inflight pull requests
js.consumer.max_ack_pendingmax outstanding acks
js.consumer.headers_onlywill instruct the consumer to only deliver headers and no payloads
js.consumer.max_batchsets the max messages per fetch
js.consumer.max_expiressets the maximum pull consumer request expiration that a Fetch() can request (using the Fetch’s timeout value)
js.consumer.max_bytessets the maximum pull consumer request bytes that a Fetch() can receive