JetStream Consumers with the NATS.io Java Library
Scott Fauerbach — May 21, 2021
The previous entries in this series showed us how to create JetStream streams and publish messages. This entry will start building up to subscribing by describing the configuration for consumers.
Push vs Pull Subscription
Consumers, once created, can subscribe as either a
pull mode. The server sends messages to push consumers while pull consumers have to ask for messages.
This is mentioned here first because consumer configuration options may apply differently depending on the type of subscription.
Durable vs Ephemeral
A consumer can be durable or ephemeral.
A durable consumer is intended to be long-lived and the server will keep track of where the consumer is in the stream. So if the consumer stops then restarts, it will automatically restart where it left off and the configuration used to initialize the consumer will be remembered. A durable consumer is required when making a pull subscription and is optional when making a push subscription.
An ephemeral consumer will only be tracked by the server while it is active and showing interest in the stream. Again, ephemeral consumers are only allowed in push subscriptions.
For durable consumers, some options cannot be changed once the consumer is created. For ephemeral consumers, this doesn’t really apply since they are created new every time.
ConsumerConfiguration object has a builder to simplify setting options. Here is the builder skeleton showing all the builder methods.
ConsumerConfiguration c = ConsumerConfiguration.builder() .durable(string) .deliverPolicy(enum) .startSequence(long) .startTime(ZonedDateTime) .deliverSubject(string) .ackPolicy(enum) .ackWait(duration) .maxAckPending(long) .replayPolicy(enum) .maxDeliver(long) .filterSubject(string) .rateLimit(long) .sampleFrequency(string) .idleHeartbeat(duration) .flowControl(boolean) .build();
As you can see, there are several options. In general, the defaults will do just fine. The Durable name is probably the most common option used, so in the Java client we gave several ways to set it when making a subscription.
In the Java client repo , there is a JetStream Example Directory that covers many of these options.
By default, a consumer is ephemeral. To make the consumer durable, set the name.
In the Java client, there are two ways to set the durable name. You can set it in the Consumer Configuration, or
you can leverage the helper method in the
PullSubscribeOptions builders, which will
either add it to the Consumer Configuration you supply or create a default one with the durable. The value in
the Consumer Configuration object takes precedence over the value supplied in either of the Subscribe options builders.
Several of the examples demonstrate use of durable. NatsJsPushSub is a good place to start.
Deliver Policy / Start Sequence / Start Time
When a consumer is first created, it can specify where in the stream it wants to start receiving messages.
This is the
DeliverPolicy and it’s options are as follows:
|All||All is the default policy. The consumer will start receiving from the earliest available message.|
|Last||The consumer will start receiving messages with the last message added to the stream, so the very last message in the stream when the server realizes the consumer is ready.|
|New||The consumer will only start receiving messages that were created after the consumer was created.|
|ByStartSequence||The consumer is required to specify the |
|ByStartTime||The consumer is required to specify the |
The Deliver Subject is the subject to deliver observed messages and is not allowed for pull subscriptions. For durable consumers, an internal deliver subject will be created if one is not supplied.
See the NatsJsPushSubDeliverSubject example.
Explicit is the default Ack policy and means that each individual message must be acknowledged.
For pull consumers,
Explicit is the only allowed option.
For push consumers, you can choose
None, which means you do not have to ack any messages,
All which means whenever you choose to ack a message, all the previous messages received are automatically acknowledged.
You must ack within the Ack Wait window or it will be as if the ack was not received.
Ack Wait is the time in nanoseconds that the server will wait for an ack for any individual message. If an ack is not received in time, the message will be redelivered.
Max Ack Pending
If you have received this number of messages without acking them, the server will stop sending messages. If the server gets 1 or more acks, it will resume sending that number of messages. If acks don’t occur in the Ack Wait period, then the server will resume starting at the messages that have not been acked.
The replay policy applies when the deliver policy is
ByStartTime since those deliver policies begin reading the stream at a position other than the end.
If the policy is
Original, the messages in the stream will be pushed to the client at the same rate they were originally received, simulating the original timing of messages.
If the policy is
Instant (the default), the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages.
The maximum number of times a specific message will be delivered. Applies to any message that is re-sent due to ack policy.
When consuming from a stream with a wildcard subject, the Filter Subject allows you to select a subset of the full wildcard subject to receive messages from.
See the NatsJsPushSubFilterSubject example.
Used to throttle the delivery of messages to the consumer, in bits per second.
Sets the percentage of acknowledgements that should be sampled for observability, 0-100.
This value is a string and for example allows both
30% as valid values.
If the idle heartbeat period is set, the server will send a status message to the client when the period has elapsed but it has not received any new messages.
This lets the client know that it’s still there, but just isn’t receiving messages.
The Java client’s
Status object has a helper method
isHeartbeat() that can help you identify this specific status message.
See the NatsJsPushSubHeartbeat example.
Flow control is another way for the consumer to manage back pressure. Instead of relying on the rate limit, it relies on the pending limits of max messages and/or max bytes.
If the server sends the number of messages or bytes without receiving an ack, it will send a status message letting you know it has reached this limit.
Once flow control is tripped, the server will not start sending messages again until the client tells the server, even if all messages have been acknowledged.
The Java client’s
Status object has a helper method
isFlowControl() that can help you identify this specific status message.
See the NatsJsPushSubFlowControl example.
About the Author
Scott Fauerbach is a member of the engineering team at Synadia Communications .
Back to Blog