NATS uses cookies to ensure you get the best experience on our website. Continuing to use this site assumes compliance with our Privacy Policy.
Edit on GitHub

Receiving Messages

Receiving messages with NATS can be very library dependent. Some languages, like Go or Java, can provide synchronous and asynchronous APIs, while others may only support one type of subscription. In all cases, the process of subscribing involves having the client library tell the NATS server that an application is interested in a particular subject. Under the covers, the client library will assign a unique id to each subscription. This id is used when the server sends messages to a specific subscription. Each subscription gets a unique id, so if the same connection is used multiple times for the same subject, the server will send multiple copies of the same message. When an application is done with a subscription it unsubscribes which tells the server to stop sending messages.

Synchronous Subscriptions

Synchronous subscriptions require the application to poll for messages. This type of subscription is easy to set-up and use, but requires the application to deal with looping if multiple messages are expected. For example, to subscribe to the subject updates and receive a single message you could do:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Subscribe
sub, err := nc.SubscribeSync("updates")
if err != nil {
	log.Fatal(err)
}

// Wait for a message
msg, err := sub.NextMsg(10 * time.Second)
if err != nil {
	log.Fatal(err)
}

// Use the response
log.Printf("Reply: %s", msg.Data)

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Subscribe
Subscription sub = nc.subscribe("updates");

// Read a message
Message msg = sub.nextMessage(Duration.ZERO);

String str = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println(str);

// Close the connection
nc.close();
// node-nats subscriptions are always async.
# Asyncio NATS client currently does not have a sync subscribe API
# The Ruby NATS client subscriptions are all async.
// ts-nats subscriptions are always async.

Asynchronous Subscriptions

Asynchronous subscriptions use callbacks of some form to notify an application when a message arrives. These subscriptions are usually easier to work with, but do represent some form of internal work and resource usage by the library. Check your libraries documentation for any resource usage associated with asynchronous subscriptions. The following example is the same as the previous one only with asynchronous code:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Use a WaitGroup to wait for a message to arrive
wg := sync.WaitGroup{}
wg.Add(1)

// Subscribe
if _, err := nc.Subscribe("updates", func(m *nats.Msg) {
	wg.Done()
}); err != nil {
	log.Fatal(err)
}

// Wait for a message to come in
wg.Wait()

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);

// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(str);
    latch.countDown();
});

// Subscribe
d.subscribe("updates");

// Wait for a message to come in
latch.await(); 

// Close the connection
nc.close();
nc.subscribe("updates", (msg) => {
    t.log(msg);
});
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

future = asyncio.Future()

async def cb(msg):
  nonlocal future
  future.set_result(msg)

await nc.subscribe("updates", cb=cb)
await nc.publish("updates", b'All is Well')
await nc.flush()

# Wait for message to come in
msg = await asyncio.wait_for(future, 1)

require 'nats/client'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  nc.subscribe("updates") do |msg|
    puts msg
    nc.close
  end

  nc.publish("updates", "All is Well")
end

nc.subscribe("updates", (err, msg) => {
    if(err) {
        console.log('error', err);
    } else {
        t.log(msg.data);
    }
});

Unsubscribing

The client libraries provide a means to unsubscribe a previous subscription request. This process requires an interaction with the server, so for an asynchronous subscription there may be a small window of time where a message comes through as the unsubscribe is processed by the library. Ignoring that slight edge case, the client library will clean up any outstanding messages and tell the server that the subscription is no longer used.

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Sync Subscription
sub, err := nc.SubscribeSync("updates")
if err != nil {
	log.Fatal(err)
}
if err := sub.Unsubscribe(); err != nil {
	log.Fatal(err)
}

// Async Subscription
sub, err = nc.Subscribe("updates", func(_ *nats.Msg) {})
if err != nil {
	log.Fatal(err)
}
if err := sub.Unsubscribe(); err != nil {
	log.Fatal(err)
}

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");
Dispatcher d = nc.createDispatcher((msg) -> {
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(str);
});

// Sync Subscription
Subscription sub = nc.subscribe("updates");
sub.unsubscribe();

// Async Subscription
d.subscribe("updates");
d.unsubscribe("updates");

// Close the connection
nc.close();
// set up a subscription to process a request
let sub = nc.subscribe(NATS.createInbox(), (msg, reply) => {
    if (msg.reply) {
        nc.publish(reply, new Date().toLocaleTimeString());
    }
});

// without arguments the subscription will cancel when the server receives it
// you can also specify how many messages are expected by the subscription
nc.unsubscribe(sub);
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

future = asyncio.Future()

async def cb(msg):
  nonlocal future
  future.set_result(msg)

sid = await nc.subscribe("updates", cb=cb)
await nc.publish("updates", b'All is Well')

# Remove interest in subject
await nc.unsubscribe(sid)

# Won't be received...
await nc.publish("updates", b'...')

require 'nats/client'
require 'fiber'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  Fiber.new do
    f = Fiber.current

    sid = nc.subscribe("time") do |msg, reply|
      f.resume Time.now
    end

    nc.publish("time", 'What is the time?', NATS.create_inbox)

    # Use the response
    msg = Fiber.yield
    puts "Reply: #{msg}"

    nc.unsubscribe(sid)

    # Won't be received
    nc.publish("time", 'What is the time?', NATS.create_inbox)

  end.resume
end

// set up a subscription to process a request
let sub = await nc.subscribe(createInbox(), (err, msg) => {
    if (msg.reply) {
        nc.publish(msg.reply, new Date().toLocaleTimeString());
    } else {
        t.log('got a request for the time, but no reply subject was set.');
    }
});

// without arguments the subscription will cancel when the server receives it
// you can also specify how many messages are expected by the subscription
sub.unsubscribe();

Unsubscribing After a Specified Number of Messages

NATS provides a special form of unsubscribe that is configured with a message count and takes effect when that many messages are sent to a subscriber. This mechanism is very useful if only a single message is expected. There are a few important things to know about auto unsubscribe. First, the message count you provide is the total message count for a subscriber. So if you unsubscribe with a count of 1, the server will stop sending messages to that subscription after it has received one. Or if the subscriber has already received one or more messages, the unsubscribe will be immediate. This action based on history can be confusing if you try to auto unsubscribe on a long running subscription, but is logical for a new one.

Auto unsubscribe is based on the total messages sent to a subscriber, not just the new ones.

Second, auto unsubscribe can also result in some tricky edge cases if a server cluster is used. The client will tell the server of the unsubscribe count when the application requests it. But if the client disconnects before the count is reached, it may have to tell another server of the remaining count. This dance between previous server notifications and new notifications on reconnect can result in unplanned behavior.

Finally, most of the client libraries also track the max message count after an auto unsubscribe request. Which means that the client will stop allowing messages to flow even if the server has miscounted due to reconnects or some other failure in the client library.

The following example shows unsubscribe after a single message:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Sync Subscription
sub, err := nc.SubscribeSync("updates")
if err != nil {
	log.Fatal(err)
}
if err := sub.AutoUnsubscribe(1); err != nil {
	log.Fatal(err)
}

// Async Subscription
sub, err = nc.Subscribe("updates", func(_ *nats.Msg) {})
if err != nil {
	log.Fatal(err)
}
if err := sub.AutoUnsubscribe(1); err != nil {
	log.Fatal(err)
}

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");
Dispatcher d = nc.createDispatcher((msg) -> {
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(str);
});

// Sync Subscription
Subscription sub = nc.subscribe("updates");
sub.unsubscribe(1);

// Async Subscription
d.subscribe("updates");
d.unsubscribe("updates", 1);

// Close the connection
nc.close();
// `max` specifies the number of messages that the server will forward.
// The server will auto-cancel.
let opts = {max: 10};
let sub = nc.subscribe(NATS.createInbox(), opts, (msg) => {
    t.log(msg);
});

// another way after 10 messages
let sub2 = nc.subscribe(NATS.createInbox(), (err, msg) => {
    t.log(msg.data);
});
// if the subscription already received 10 messages, the handler
// won't get any more messages
nc.unsubscribe(sub2, 10);
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

async def cb(msg):
  print(msg)

sid = await nc.subscribe("updates", cb=cb)
await nc.auto_unsubscribe(sid, 1)
await nc.publish("updates", b'All is Well')

# Won't be received...
await nc.publish("updates", b'...')

require 'nats/client'
require 'fiber'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  Fiber.new do
    f = Fiber.current

    nc.subscribe("time", max: 1) do |msg, reply|
      f.resume Time.now
    end

    nc.publish("time", 'What is the time?', NATS.create_inbox)

    # Use the response
    msg = Fiber.yield
    puts "Reply: #{msg}"

    # Won't be received
    nc.publish("time", 'What is the time?', NATS.create_inbox)

  end.resume
end

// `max` specifies the number of messages that the server will forward.
// The server will auto-cancel.
let opts = {max: 10};
let sub = await nc.subscribe(createInbox(), (err, msg) => {
    t.log(msg.data);
}, opts);

// another way after 10 messages
let sub2 = await nc.subscribe(createInbox(), (err, msg) => {
    t.log(msg.data);
});
// if the subscription already received 10 messages, the handler
// won't get any more messages
sub2.unsubscribe(10);

Replying to a Message

Incoming messages have an optional reply-to field. If that field is set, it will contain a subject to which a reply is expected. In the publishing examples we sent a request for the current time. The following code will listen for that request and respond with the time.

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Subscribe
sub, err := nc.SubscribeSync("time")
if err != nil {
	log.Fatal(err)
}

// Read a message
msg, err := sub.NextMsg(10 * time.Second)
if err != nil {
	log.Fatal(err)
}

// Get the time
timeAsBytes := []byte(time.Now().String())

// Send the time
nc.Publish(msg.Reply, timeAsBytes)

// Flush and close the connection
nc.Flush()
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Subscribe
Subscription sub = nc.subscribe("time");

// Read a message
Message msg = sub.nextMessage(Duration.ZERO);

// Get the time
Calendar cal = Calendar.getInstance();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
byte[] timeAsBytes = sdf.format(cal.getTime()).getBytes(StandardCharsets.UTF_8);

// Send the time
nc.publish(msg.getReplyTo(), timeAsBytes);

// Flush and close the connection
nc.flush(Duration.ZERO);
nc.close();
// set up a subscription to process a request
nc.subscribe('time', (msg, reply) => {
    if (msg.reply) {
        nc.publish(msg.reply, new Date().toLocaleTimeString());
    }
});
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

future = asyncio.Future()

async def cb(msg):
  nonlocal future
  future.set_result(msg)

await nc.subscribe("time", cb=cb)

await nc.publish_request("time", new_inbox(), b'What is the time?')
await nc.flush()

# Read the message
msg = await asyncio.wait_for(future, 1)

# Send the time
time_as_bytes = "{}".format(datetime.now()).encode()
await nc.publish(msg.reply, time_as_bytes)

require 'nats/client'
require 'fiber'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  Fiber.new do
    f = Fiber.current

    nc.subscribe("time") do |msg, reply|
      f.resume Time.now
    end

    nc.publish("time", 'What is the time?', NATS.create_inbox)

    # Use the response
    msg = Fiber.yield
    puts "Reply: #{msg}"

  end.resume
end

// set up a subscription to process a request
await nc.subscribe('time', (err, msg) => {
    if (msg.reply) {
        nc.publish(msg.reply, new Date().toLocaleTimeString());
    } else {
        t.log('got a request for the time, but no reply subject was set.');
    }
});

Wildcards

There is no special code to subscribe with a wildcard subject. The main technique that may come in to play is to use the subject provided with the incoming message to determine what to do with the message.

For example, you can subscribe using * and then act based on the actual subject.

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Use a WaitGroup to wait for 2 messages to arrive
wg := sync.WaitGroup{}
wg.Add(2)

// Subscribe
if _, err := nc.Subscribe("time.*.east", func(m *nats.Msg) {
	log.Printf("%s: %s", m.Subject, m.Data)
	wg.Done()
}); err != nil {
	log.Fatal(err)
}

// Wait for the 2 messages to come in
wg.Wait()

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Use a latch to wait for 2 messages to arrive
CountDownLatch latch = new CountDownLatch(2);

// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
    String subject = msg.getSubject();
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(subject + ": " + str);
    latch.countDown();
});

// Subscribe
d.subscribe("time.*.east");

// Wait for messages to come in
latch.await();

// Close the connection
nc.close();
nc.subscribe('time.us.*', (msg, reply, subject) => {
    // converting timezones correctly in node requires a library
    // this doesn't take into account *many* things.
    let time = "";
    switch (subject) {
        case 'time.us.east':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
            break;
        case 'time.us.central':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
            break;
        case 'time.us.mountain':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
            break;
        case 'time.us.west':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
            break;
        default:
            time = "I don't know what you are talking about Willis";
    }
    t.log(subject, time);
});
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

# Use queue to wait for 2 messages to arrive
queue = asyncio.Queue()
async def cb(msg):
  await queue.put_nowait(msg)

await nc.subscribe("time.*.east", cb=cb)

# Send 2 messages and wait for them to come in
await nc.publish("time.A.east", b'A')
await nc.publish("time.B.east", b'B')

msg_A = await queue.get()
msg_B = await queue.get()

print("Msg A:", msg_A)
print("Msg B:", msg_B)

require 'nats/client'
require 'fiber'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  Fiber.new do
    f = Fiber.current

    nc.subscribe("time.*.east") do |msg, reply|
      f.resume Time.now
    end

    nc.publish("time.A.east", "A")
    nc.publish("time.B.east", "B")

    # Use the response
    msg_A = Fiber.yield
    puts "Msg A: #{msg_A}"

    msg_B = Fiber.yield
    puts "Msg B: #{msg_B}"

  end.resume
end

await nc.subscribe('time.us.*', (err, msg) => {
    // converting timezones correctly in node requires a library
    // this doesn't take into account *many* things.
    let time = "";
    switch (msg.subject) {
        case 'time.us.east':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
            break;
        case 'time.us.central':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
            break;
        case 'time.us.mountain':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
            break;
        case 'time.us.west':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
            break;
        default:
            time = "I don't know what you are talking about Willis";
    }
    console.log(msg.subject, time);
});

or do something similar with >:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Use a WaitGroup to wait for 4 messages to arrive
wg := sync.WaitGroup{}
wg.Add(4)

// Subscribe
if _, err := nc.Subscribe("time.>", func(m *nats.Msg) {
	log.Printf("%s: %s", m.Subject, m.Data)
	wg.Done()
}); err != nil {
	log.Fatal(err)
}

// Wait for the 4 messages to come in
wg.Wait()

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Use a latch to wait for 4 messages to arrive
CountDownLatch latch = new CountDownLatch(4);

// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
    String subject = msg.getSubject();
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(subject + ": " + str);
    latch.countDown();
});

// Subscribe
d.subscribe("time.>");

// Wait for messages to come in
latch.await();

// Close the connection
nc.close();
nc.subscribe('time.>', (msg, reply, subject) => {
    // converting timezones correctly in node requires a library
    // this doesn't take into account *many* things.
    let time = "";
    switch (subject) {
        case 'time.us.east':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
            break;
        case 'time.us.central':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
            break;
        case 'time.us.mountain':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
            break;
        case 'time.us.west':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
            break;
        default:
            time = "I don't know what you are talking about Willis";
    }
    t.log(subject, time);
});
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

# Use queue to wait for 4 messages to arrive
queue = asyncio.Queue()
async def cb(msg):
  await queue.put(msg)

await nc.subscribe("time.>", cb=cb)

# Send 2 messages and wait for them to come in
await nc.publish("time.A.east", b'A')
await nc.publish("time.B.east", b'B')
await nc.publish("time.C.west", b'C')
await nc.publish("time.D.west", b'D')

for i in range(0, 4):
  msg = await queue.get()
  print("Msg:", msg)

await nc.close()

require 'nats/client'
require 'fiber'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  Fiber.new do
    f = Fiber.current

    nc.subscribe("time.>") do |msg, reply|
      f.resume Time.now.to_f
    end

    nc.publish("time.A.east", "A")
    nc.publish("time.B.east", "B")
    nc.publish("time.C.west", "C")
    nc.publish("time.D.west", "D")

    # Use the response
    4.times do 
      msg = Fiber.yield
      puts "Msg: #{msg}"
    end
  end.resume
end

await nc.subscribe('time.>', (err, msg) => {
    // converting timezones correctly in node requires a library
    // this doesn't take into account *many* things.
    let time = "";
    switch (msg.subject) {
        case 'time.us.east':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
            break;
        case 'time.us.central':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
            break;
        case 'time.us.mountain':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
            break;
        case 'time.us.west':
            time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
            break;
        default:
            time = "I don't know what you are talking about Willis";
    }
    t.log(msg.subject, time);
});

The following example can be used to test these two subscribers. The * subscriber should receive at most 2 messages, while the > subscriber receives 4. More importantly the time.*.east subscriber won’t receive on time.us.east.atlanta because that won’t match.

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

zoneID, err := time.LoadLocation("America/New_York")
if err != nil {
	log.Fatal(err)
}
now := time.Now()
zoneDateTime := now.In(zoneID)
formatted := zoneDateTime.String()

nc.Publish("time.us.east", []byte(formatted))
nc.Publish("time.us.east.atlanta", []byte(formatted))

zoneID, err = time.LoadLocation("Europe/Warsaw")
if err != nil {
	log.Fatal(err)
}
zoneDateTime = now.In(zoneID)
formatted = zoneDateTime.String()

nc.Publish("time.eu.east", []byte(formatted))
nc.Publish("time.eu.east.warsaw", []byte(formatted))

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");
ZoneId zoneId = ZoneId.of("America/New_York");
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
String formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);

nc.publish("time.us.east", formatted.getBytes(StandardCharsets.UTF_8));
nc.publish("time.us.east.atlanta", formatted.getBytes(StandardCharsets.UTF_8));

zoneId = ZoneId.of("Europe/Warsaw");
zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
nc.publish("time.eu.east", formatted.getBytes(StandardCharsets.UTF_8));
nc.publish("time.eu.east.warsaw", formatted.getBytes(StandardCharsets.UTF_8));

nc.flush(Duration.ZERO);
nc.close();
nc.publish('time.us.east');
nc.publish('time.us.central');
nc.publish('time.us.mountain');
nc.publish('time.us.west');
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

await nc.publish("time.us.east", b'...')
await nc.publish("time.us.east.atlanta", b'...')

await nc.publish("time.eu.east", b'...')
await nc.publish("time.eu.east.warsaw", b'...')

await nc.close()

NATS.start do |nc|
   nc.publish("time.us.east", '...')
   nc.publish("time.us.east.atlanta", '...')

   nc.publish("time.eu.east", '...')
   nc.publish("time.eu.east.warsaw", '...')

   nc.drain
end
nc.publish('time.us.east');
nc.publish('time.us.central');
nc.publish('time.us.mountain');
nc.publish('time.us.west');

Queues

Using queues, from a subscription standpoint, is super easy. The application simply includes a queue name with the subscription.

digraph g { rankdir=LR publisher [shape=box, style="rounded", label="PUB updates"]; subject [shape=circle, label="gnatsd"]; sub1 [shape=box, style="rounded", label="SUB updates workers"]; sub2 [shape=box, style="rounded", label="SUB updates workers"]; sub3 [shape=box, style="rounded", label="SUB updates workers"]; publisher -> subject [label="msgs 1,2,3"]; subject -> sub1 [label="msg 2"]; subject -> sub2 [label="msg 1"]; subject -> sub3 [label="msg 3"]; }

For example, to subscribe to the queue workers with the subject updates:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Use a WaitGroup to wait for 10 messages to arrive
wg := sync.WaitGroup{}
wg.Add(10)

// Create a queue subscription on "updates" with queue name "workers"
if _, err := nc.QueueSubscribe("updates", "worker", func(m *nats.Msg) {
	wg.Done()
}); err != nil {
	log.Fatal(err)
}

// Wait for messages to come in
wg.Wait()

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Use a latch to wait for 10 messages to arrive
CountDownLatch latch = new CountDownLatch(10);

// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(str);
    latch.countDown();
});

// Subscribe
d.subscribe("updates", "workers");

// Wait for a message to come in
latch.await(); 

// Close the connection
nc.close();
nc.subscribe('updates', {queue: "workers"}, (msg) => {
    t.log('worker got message', msg);
});
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

future = asyncio.Future()

async def cb(msg):
  nonlocal future
  future.set_result(msg)

await nc.subscribe("updates", queue="workers", cb=cb)
await nc.publish("updates", b'All is Well')

msg = await asyncio.wait_for(future, 1)
print("Msg", msg)

require 'nats/client'
require 'fiber'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  Fiber.new do
    f = Fiber.current

    nc.subscribe("updates", queue: "worker") do |msg, reply|
      f.resume Time.now
    end

    nc.publish("updates", "A")

    # Use the response
    msg = Fiber.yield
    puts "Msg: #{msg}"
  end.resume
end

await nc.subscribe('updates', (err, msg) => {
    t.log('worker got message', msg.data);
}, {queue: "workers"});

If you run this example with the publish examples that send to updates, you will see that one of the instances gets a message while the others you run won’t. But the instance that receives the message will change.

Draining Connections and Subscriptions

A new feature in the NATS client libraries is the ability to drain connections or subscriptions. Closing a connection, or unsubscribing from a subscription are generally considered immediate requests. Which means the library will halt messages in any pending queue or cache for subscribers. When you drain a subscription or connection, it will process any cached/pending messages before closing.

For a connection the process is essentially:

  1. Drain subscriptions
  2. Stop new messages from being published
  3. Flush any remaining messages
  4. Close
wg := sync.WaitGroup{}
wg.Add(1)

errCh := make(chan error, 1)

// To simulate a timeout, you would set the DrainTimeout()
// to a value less than the time spent in the message callback,
// so say: nats.DrainTimeout(10*time.Millisecond).

nc, err := nats.Connect("demo.nats.io",
	nats.DrainTimeout(10*time.Second),
	nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
		errCh <- err
	}),
	nats.ClosedHandler(func(_ *nats.Conn) {
		wg.Done()
	}))
if err != nil {
	log.Fatal(err)
}

// Subscribe, but add some delay while processing.
if _, err := nc.Subscribe("foo", func(_ *nats.Msg) {
	time.Sleep(200 * time.Millisecond)
}); err != nil {
	log.Fatal(err)
}

// Publish a message
if err := nc.Publish("foo", []byte("hello")); err != nil {
	log.Fatal(err)
}

// Drain the connection, which will close it when done.
if err := nc.Drain(); err != nil {
	log.Fatal(err)
}

// Wait for the connection to be closed.
wg.Wait()

// Check if there was an error
select {
case e := <-errCh:
	log.Fatal(e)
default:
}

Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);

// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(str);
    latch.countDown();
});

// Subscribe
d.subscribe("updates");

// Wait for a message to come in
latch.await();

// Drain the connection, which will close it
CompletableFuture<Boolean> drained = nc.drain(Duration.ofSeconds(10));

// Wait for the drain to complete
drained.get();
import asyncio
from nats.aio.client import Client as NATS

async def example(loop):
    nc = NATS()

    await nc.connect("nats://127.0.0.1:4222", loop=loop)

    async def handler(msg):
        print("[Received] ", msg)
        await nc.publish(msg.reply, b'I can help')

        # Can check whether client is in draining state
        if nc.is_draining:
            print("Connection is draining")

    await nc.subscribe("help", "workers", cb=handler)
    await nc.flush()

    requests = []
    for i in range(0, 10):
        request = nc.request("help", b'help!', timeout=1)
        requests.append(request)

    # Wait for all the responses
    responses = []
    responses = await asyncio.gather(*requests)

    # Gracefully close the connection.
    await nc.drain()

    print("Received {} responses".format(len(responses)))
NATS.start(drain_timeout: 1) do |nc|
  NATS.subscribe('foo', queue: "workers") do |msg, reply, sub|
    nc.publish(reply, "ACK:#{msg}")
  end

  NATS.subscribe('bar', queue: "workers") do |msg, reply, sub|
    nc.publish(reply, "ACK:#{msg}")
  end

  NATS.subscribe('quux', queue: "workers") do |msg, reply, sub|
    nc.publish(reply, "ACK:#{msg}")
  end

  EM.add_timer(2) do
    next if NATS.draining?

    # Drain gracefully closes the connection.
    NATS.drain do
      puts "Done draining. Connection is closed."
    end
  end
end
let sub = await nc.subscribe('updates', (err, msg) => {
    t.log('worker got message', msg.data);
}, {queue: "workers"});
// [end drain_sub]
nc.flush();

await nc.drain();
// client must close when the connection drain resolves
nc.close();

Drain provides clients that use queue subscriptions with a way to bring down applications without losing any messages. A client can bring up a new queue member, drain and shut down the old queue member, all without losing messages sent to the old client. Without drain, there is the possibility of lost messages due to queue timing.

The mechanics of drain for a subscription are simpler:

  1. Unsubscribe at the server
  2. Process known messages
  3. Clean up

The API for drain can generally be used instead of unsubscribe:


	nc, err := nats.Connect("demo.nats.io")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	done := sync.WaitGroup{}
	done.Add(1)

	count := 0
	errCh := make(chan error, 1)

	msgAfterDrain := "not this one"

	// This callback will process each message slowly
	sub, err := nc.Subscribe("updates", func(m *nats.Msg) {
		if string(m.Data) == msgAfterDrain {
			errCh <- fmt.Errorf("Should not have received this message")
			return
		}
		time.Sleep(100 * time.Millisecond)
		count++
		if count == 2 {
			done.Done()
		}
	})

	// Send 2 messages
	for i := 0; i < 2; i++ {
		nc.Publish("updates", []byte("hello"))
	}

	// Call Drain on the subscription. It unsubscribes but
	// wait for all pending messages to be processed.
	if err := sub.Drain(); err != nil {
		log.Fatal(err)
	}

	// Send one more message, this message should not be received
	nc.Publish("updates", []byte(msgAfterDrain))

	// Wait for the subscription to have processed the 2 messages.
	done.Wait()

	// Now check that the 3rd message was not received
	select {
	case e := <-errCh:
		log.Fatal(e)
	case <-time.After(200 * time.Millisecond):
		// OK!
	}

Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);

// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(str);
    latch.countDown();
});

// Subscribe
d.subscribe("updates");

// Wait for a message to come in
latch.await();

// Messages that have arrived will be processed
CompletableFuture<Boolean> drained = d.drain(Duration.ofSeconds(10));

// Wait for the drain to complete
drained.get();

// Close the connection
nc.close();
// Drain subscription is not supported.
import asyncio
from nats.aio.client import Client as NATS

async def example(loop):
    nc = NATS()

    await nc.connect("nats://127.0.0.1:4222", loop=loop)

    async def handler(msg):
        print("[Received] ", msg)
        await nc.publish(msg.reply, b'I can help')

        # Can check whether client is in draining state
        if nc.is_draining:
            print("Connection is draining")

    sid = await nc.subscribe("help", "workers", cb=handler)
    await nc.flush()

    # Gracefully unsubscribe the subscription
    await nc.drain(sid)

# There is currently no API to drain a single subscription, the whole connection can be drained though via NATS.drain
let sub = await nc.subscribe('updates', (err, msg) => {
    t.log('worker got message', msg.data);
}, {queue: "workers"});

Because draining can involve messages flowing to the server, for a flush and asynchronous message processing, the timeout for drain should generally be higher than the timeout for a simple message request/reply or similar.

Receiving Structured Data

In the publishing examples, we showed how to send JSON through NATS but you can receive encoded data as well. Each client library may provide tools to help with this encoding. The core traffic to the NATS server will always be byte arrays.

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
	log.Fatal(err)
}
defer ec.Close()

// Define the object
type stock struct {
	Symbol string
	Price  int
}

wg := sync.WaitGroup{}
wg.Add(1)

// Subscribe
if _, err := ec.Subscribe("updates", func(s *stock) {
	log.Printf("Stock: %s - Price: %v", s.Symbol, s.Price)
	wg.Done()
}); err != nil {
	log.Fatal(err)
}

// Wait for a message to come in
wg.Wait()

// Close the connection
ec.Close()
class StockForJsonSub {
    public String symbol;
    public float price;

    public String toString() {
        return symbol + " is at " + price;
    }
}

public class SubscribeJSON {
    public static void main(String[] args) {

        try {
            Connection nc = Nats.connect("nats://demo.nats.io:4222");

            // Use a latch to wait for 10 messages to arrive
            CountDownLatch latch = new CountDownLatch(10);

            // Create a dispatcher and inline message handler
            Dispatcher d = nc.createDispatcher((msg) -> {
                Gson gson = new Gson();

                String json = new String(msg.getData(), StandardCharsets.UTF_8);
                StockForJsonSub stk = gson.fromJson(json, StockForJsonSub.class);
                
                // Use the object
                System.out.println(stk);

                latch.countDown();
            });

            // Subscribe
            d.subscribe("updates");

            // Wait for a message to come in
            latch.await(); 

            // Close the connection
            nc.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
let nc = NATS.connect({
    url: "nats://demo.nats.io:4222",
    json: true
});

nc.subscribe('updates', (msg) => {
    if(msg && msg.ticker === 'TSLA') {
        t.log('got message:', msg);
    }
});

import asyncio
import json
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout

async def run(loop):
    nc = NATS()

    await nc.connect(servers=["nats://127.0.0.1:4222"], loop=loop)

    async def message_handler(msg):
        data = json.loads(msg.data.decode())
        print(data)

    sid = await nc.subscribe("updates", cb=message_handler)
    await nc.flush()

    await nc.auto_unsubscribe(sid, 2)
    await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())
    await asyncio.sleep(1, loop=loop)
    await nc.close()
require 'nats/client'
require 'json'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  nc.subscribe("updates") do |msg|
    m = JSON.parse(msg)

    # {"symbol"=>"GOOG", "price"=>12}
    p m
  end
end
let nc = await connect({
    url: "nats://demo.nats.io:4222",
    payload: Payload.JSON
});

nc.subscribe('updates', (err, msg) => {
    t.log('got message:', msg.data ? msg.data : "no payload");
});