JetStream Push Consumers with the NATS.io Java Library
Scott Fauerbach — June 3, 2021
JetStream Push Consumers with the NATS.io Java Library
The last entry in this series talked about the consumer options that are available when subscribing to messages. This entry will demonstrate the basics of a push subscription.
Push
A push subscription is where the server is in control and sends messages to the client. It can be made durable or ephemeral based on your use case. Here are the API method calls used for creating a push subscription:
JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subject, PushSubscribeOptions options) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
- subject - every subscription needs a subject
- options - configure PushSubscribeOptions or use the default configuration
- queue - multiple consumers using the same queue name will each get a unique portion of the messages in the stream.
- dispatcher - necessary if you want to handle messages asynchronously
- handler - the asynchronous handler
- autoAck - for asynchronous handling, the message can be acknowledged for you before your own handler is called.
PushSubscribeOptions
The PushSubscribeOptions
allows you to identify the stream name and
is a helper for the most common push ConsumerConfiguration
options, durable name and the deliver subject.
Setting those in the PushSubscribeOptions
builder will
create a ConsumerConfiguration
with those values unless you have provided your own ConsumerConfiguration
,
in which case the values set in the PushSubscribeOptions
builder will take precedence.
Builder
// set the stream name
public Builder stream(String stream)
// set the deliver subject
public Builder deliverSubject(String deliverSubject)
// set the durable name
public Builder durable(String durable)
// set the configuration object
public Builder configuration(ConsumerConfiguration configuration)
Synchronous
You can handle a push subscription message synchronously…
Connection nc = Nats.connect("nats://demo.nats.io")
JetStream js = nc.jetStream();
...
JetStreamSubscription sub = js.subscribe("my-subject");
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
while (keepGoing)
// get the next message waiting a maximum of 1 second for it to arrive
handleMessage(sub.nextMessage(Duration.ofSeconds(1)));
}
...
void handleMessage(Message msg) {
if (msg == null) {
// the server had no message for us.
// Maybe sleep here or do some housekeeping
}
else {
if (msg.isJetStream()) {
// do something with the message
// don't forget to ack based on your consumer AckPolicy configuration
// or async auto ack setting
msg.ack();
}
else if (msg.isStatusMessage()) {
// status messages include heartbeat and flow control depending on
// your consumer configuration
System.out.println("Status " + msg.getStatus());
}
}
}
Asynchronous
Or asynchronously in the thread that the dispatcher will be run in…
MessageHandler handler = (Message msg) -> {
// see handleMessage in above example
handleMessage(msg)
};
// create a dispatcher without a default handler.
Dispatcher dispatcher = nc.createDispatcher();
// create a subscription
// dispatcher is the object that routes messages asynchronously
// handler is the function that processes the message
JetStreamSubscription sub = js.subscribe("my-subject", dispatcher, handler, false);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
// do other stuff and make sure you keep the program running since the handler is running in a separate thread
About the Author
Scott Fauerbach is a member of the engineering team at Synadia Communications .
Back to Blog