NATS uses cookies to ensure you get the best experience on our website. Continuing to use this site assumes compliance with our Privacy Policy.
Edit on GitHub

Sending Messages

NATS sends and receives messages composed of a target subject, an optional reply subject and an array of bytes. Some libraries may provide helpers to convert other data formats to and from bytes, but the NATS server will treat all messages as opaque byte arrays. All of the NATS clients are designed to make sending a message simple. For example, to send the string “All is Well” to the “updates” subject as a UTF-8 string of bytes you would do:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

if err := nc.Publish("updates", []byte("All is Well")); err != nil {
	log.Fatal(err)
}
// Make sure the message goes through before we close
nc.Flush()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));

// Make sure the message goes through before we close
nc.flush(Duration.ZERO);
nc.close();
let nc = NATS.connect({url: "nats://demo.nats.io:4222", preserveBuffers: true});
let buf = Buffer.allocUnsafe(12);
buf.fill("All is well");
nc.publish('updates', buf);
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

await nc.publish("updates", b'All is Well')

require 'nats/client'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  nc.publish("updates", "All is Well")
end
let nc = await connect({
    url: "nats://demo.nats.io:4222",
    payload: Payload.BINARY
});

let buf = Buffer.allocUnsafe(12);
buf.fill("All is Well");
nc.publish('updates', buf);

Reply-To

The optional reply-to field when publishing a message can be used on the receiving side to respond. The reply-to subject is often called an inbox, and some libraries may provide a method for generating unique inbox subjects. For example to send a request to the subject time, with no content for the messages, you might:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Create a unique subject name
uniqueReplyTo := nats.NewInbox()

// Listen for a single response
sub, err := nc.SubscribeSync(uniqueReplyTo)
if err != nil {
	log.Fatal(err)
}

// Send the request
if err := nc.PublishRequest("time", uniqueReplyTo, nil); err != nil {
	log.Fatal(err)
}

// Read the reply
msg, err := sub.NextMsg(time.Second)
if err != nil {
	log.Fatal(err)
}

// Use the response
log.Printf("Reply: %s", msg.Data)

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Create a unique subject name
String uniqueReplyTo = NUID.nextGlobal();

// Listen for a single response
Subscription sub = nc.subscribe(uniqueReplyTo);
sub.unsubscribe(1);

// Send the request
nc.publish("time", uniqueReplyTo, null);

// Read the reply
Message msg = sub.nextMessage(Duration.ofSeconds(1));

// Use the response
System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));

// Close the connection
nc.close();
// set up a subscription to process the request
nc.subscribe('time', (msg, reply) => {
    if(reply) {
        nc.publish(reply, new Date().toLocaleTimeString());
    }
});

// create a subscription subject that the responding send replies to
let inbox = NATS.createInbox();
nc.subscribe(inbox, {max: 1}, (msg) => {
    t.log('the time is', msg);
    nc.close();
});

nc.publish('time', "", inbox);
nc = NATS()

future = asyncio.Future()

async def sub(msg):
  nonlocal future
  future.set_result(msg)

await nc.connect(servers=["nats://demo.nats.io:4222"])
await nc.subscribe("time", cb=sub)

unique_reply_to = new_inbox()
await nc.publish_request("time", unique_reply_to, b'')

# Use the response
msg = await asyncio.wait_for(future, 1)
print("Reply:", msg)

require 'nats/client'
require 'fiber'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  Fiber.new do
    f = Fiber.current

    nc.subscribe("time") do |msg, reply|
      f.resume msg
    end

    nc.publish("time", 'example', NATS.create_inbox)

    # Use the response
    msg = Fiber.yield
    puts "Reply: #{msg}"

  end.resume
end

// set up a subscription to process the request
await nc.subscribe('time', (err, msg) => {
    if (err) {
        // this example is running inside of a promise
        reject();
        return;
    }
    if (msg.reply) {
        nc.publish(msg.reply, new Date().toLocaleTimeString());
    }
});

// create a subscription subject that the responding send replies to
let inbox = createInbox();
await nc.subscribe(inbox, (err, msg) => {
    t.log('the time is', msg.data);
    // this example is running inside of a promise
    nc.close();
    resolve();
}, {max: 1});

nc.publish('time', "", inbox);

Request-Reply

The pattern of sending a message and receiving a response is encapsulated in most client libraries into a request method. Under the covers this method will publish a message with a unique reply-to subject and wait for the response before returning. In the older versions of some libraries a completely new reply-to subject is created each time. In newer versions, a subject hierarchy is used so that a single subscriber in the client library listens for a wildcard, and requests are sent with a unique child subject of a single subject.

The primary difference between the request method and publishing with a reply-to is that the library is only going to accept one response, and in most libraries the request will be treated as a synchronous action. The library may provide a way to set the timeout. For example, updating the previous publish example we may request time with a one second timeout:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Send the request
msg, err := nc.Request("time", nil, time.Second)
if err != nil {
	log.Fatal(err)
}

// Use the response
log.Printf("Reply: %s", msg.Data)

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Send the request
Message msg = nc.request("time", null, Duration.ofSeconds(1));

// Use the response
System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));

// Close the connection
nc.close();
nc.requestOne('time', (msg) => {
    t.log('the time is', msg);
    nc.close();
});
nc = NATS()

async def sub(msg):
  await nc.publish(msg.reply, b'response')

await nc.connect(servers=["nats://demo.nats.io:4222"])
await nc.subscribe("time", cb=sub)

# Send the request
try:
  msg = await nc.request("time", b'', timeout=1)
  # Use the response
  print("Reply:", msg)
except asyncio.TimeoutError:
  print("Timed out waiting for response")

require 'nats/client'
require 'fiber'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  nc.subscribe("time") do |msg, reply|
    nc.publish(reply, "response")
  end

  Fiber.new do
    # Use the response
    msg = nc.request("time", "")
    puts "Reply: #{msg}"
  end.resume
end

let msg = await nc.request('time', 1000);
t.log('the time is', msg.data);
nc.close();

Ultimately you can build your own request-reply using publish-subscribe if you need a different semantic or timing.

Publishing, Caches and Flush

For performance reasons, most if not all, of the client libraries will cache outgoing data. This may be as simple as a byte buffer that stores up a few messages before being pushed to the network. It is the libraries job to make sure messages flow in a high performance manner. But there may be times when an application needs to know that a message has “hit the wire.” In this case, applications can use a flush call to tell the library to move data through the system.

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

if err := nc.Publish("updates", []byte("All is Well")); err != nil {
	log.Fatal(err)
}
// Sends a PING and wait for a PONG from the server, up to the given timeout.
// This gives guarantee that the server has processed above message.
if err := nc.FlushTimeout(time.Second); err != nil {
	log.Fatal(err)
}
Connection nc = Nats.connect("nats://demo.nats.io:4222");

nc.publish("updates", "All is Well".getBytes(StandardCharsets.UTF_8));
nc.flush(Duration.ofSeconds(1)); // Flush the message queue

nc.close();
let nc = NATS.connect({url: "nats://demo.nats.io:4222"});
let start = Date.now();
nc.flush(() => {
    t.log('round trip completed in', Date.now() - start, 'ms');
});

nc.publish('foo');
// function in flush is optional
nc.flush();
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

await nc.publish("updates", b'All is Well')

# Sends a PING and wait for a PONG from the server, up to the given timeout.
# This gives guarantee that the server has processed above message.
await nc.flush(timeout=1)

require 'nats/client'
require 'fiber'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  nc.subscribe("updates") do |msg|
    puts msg
  end

  nc.publish("updates", "All is Well")

  nc.flush do
    # Sends a PING and wait for a PONG from the server, up to the given timeout.
    # This gives guarantee that the server has processed above message at this point.
  end
end

let nc = await connect({
    url: "nats://demo.nats.io:4222"
});

// you can use flush to trigger a function in your
// application once the round-trip to the server finishes
let start = Date.now();
nc.flush(() => {
    t.log('round trip completed in', Date.now() - start, 'ms');
});

nc.publish('foo');

// another way, simply wait for the promise to resolve
await nc.flush();

nc.close();

Flush and Ping/Pong

Many of the client libraries use the PING/PONG interaction built into the NATS protocol to insure that flush pushed all of the cached messages to the server. When an application calls flush, in this case, the library will put a PING on the outgoing queue of messages, and wait for the server to send PONG before saying that the flush was successful.

Sending Structured Data

Some client libraries provide helpers to send structured data while others depend on the application to perform any encoding and decoding and just take byte arrays for sending. The following example shows how to send JSON but this could easily be altered to send a protocol buffer, YAML or some other format. JSON is a text format so we also have to encode the string in most languages to bytes. We are using UTF-8, the JSON standard encoding.

Take a simple stock ticker that sends the symbol and price of each stock:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
	log.Fatal(err)
}
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
	nc.Close()
	log.Fatal(err)
}
defer ec.Close()

// Define the object
type stock struct {
	Symbol string
	Price  int
}

// Publish the message
if err := ec.Publish("updates", &stock{Symbol: "GOOG", Price: 1200}); err != nil {
	log.Fatal(err)
}
// Make sure the message goes through before we close
ec.Flush()
class StockForJsonPub {
    public String symbol;
    public float price;
}

public class PublishJSON {
    public static void main(String[] args) {
        try {
            Connection nc = Nats.connect("nats://demo.nats.io:4222");

            // Create the data object
            StockForJsonPub stk = new StockForJsonPub();
            stk.symbol="GOOG";
            stk.price=1200;

            // use Gson to encode the object to JSON
            GsonBuilder builder = new GsonBuilder();
            Gson gson = builder.create();
            String json = gson.toJson(stk);

            // Publish the message
            nc.publish("updates", json.getBytes(StandardCharsets.UTF_8));

            // Make sure the message goes through before we close
            nc.flush(Duration.ZERO);
            nc.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
let nc = NATS.connect({url: "nats://demo.nats.io:4222", json: true});
nc.publish('updates', {ticker: 'GOOG', price: 1200});
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())

require 'nats/client'
require 'json'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  nc.publish("updates", {"symbol": "GOOG", "price": 1200}.to_json)
end
let nc = await connect({
    url: "nats://demo.nats.io:4222",
    payload: Payload.JSON
});

nc.publish('updates', {ticker: 'GOOG', price: 1200});