NAME EV::Nats - High-performance asynchronous NATS client using EV SYNOPSIS use EV::Nats; my $nats = EV::Nats->new( host => '127.0.0.1', port => 4222, on_error => sub { warn "nats error: @_" }, on_connect => sub { warn "connected to NATS" }, ); # Subscribe my $sid = $nats->subscribe('foo.>', sub { my ($subject, $payload, $reply) = @_; print "[$subject] $payload\n"; }); # Subscribe with queue group $nats->subscribe('worker.>', sub { my ($subject, $payload, $reply) = @_; }, 'workers'); # Publish $nats->publish('foo.bar', 'hello world'); # Request/reply $nats->request('service.echo', 'ping', sub { my ($response, $err) = @_; die $err if $err; print "reply: $response\n"; }, 5000); # 5s timeout # Unsubscribe $nats->unsubscribe($sid); EV::run; DESCRIPTION EV::Nats is a high-performance asynchronous NATS client that implements the NATS client protocol in pure XS with EV event loop integration. No external C NATS library is required. Features: * Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG) * Request/reply with automatic inbox management * Queue group subscriptions for load balancing * Wildcard subjects ("*" and ">") * Headers support (HPUB/HMSG) * Automatic PING/PONG keep-alive * Automatic reconnection with subscription and queue group restore * Fire-and-forget publish (no callback overhead) * Token, user/pass authentication * TCP keepalive and connect timeout * Write coalescing via ev_prepare (batches writes per event loop iteration) * O(1) subscription lookup via hash table * Graceful drain (unsubscribe all, flush, then disconnect) * Server pool with cluster URL failover from INFO connect_urls * Optional TLS via OpenSSL (auto-detected at build time) * Reconnect jitter to prevent thundering herd * Per-connection stats counters (msgs/bytes in/out) * JetStream API (EV::Nats::JetStream) * Key-Value store (EV::Nats::KV) * Object store with chunking (EV::Nats::ObjectStore) * NKey/JWT authentication (Ed25519 via OpenSSL) * Slow consumer detection with configurable threshold * Publish batching API ("batch") * Lame duck mode (leaf node graceful shutdown) notification Note: DNS resolution via "getaddrinfo" is blocking. Use numeric IP addresses for latency-sensitive applications. METHODS new(%options) Create a new EV::Nats instance. Connects automatically if "host" is given. my $nats = EV::Nats->new( host => '127.0.0.1', port => 4222, on_error => sub { die @_ }, ); Options: host => 'Str' port => 'Int' (default 4222) Server hostname and port. If "host" is provided, connection starts immediately. on_error => $cb->($errstr) Error callback. Default: "croak". on_connect => $cb->() Called when connection is fully established (after CONNECT/PONG handshake). on_disconnect => $cb->() Called on disconnect. user => 'Str' pass => 'Str' Username/password authentication. Values are JSON-escaped in the CONNECT command. token => 'Str' Token authentication. name => 'Str' Client name sent in CONNECT. verbose => $bool (default 0) Request +OK acknowledgments from server. pedantic => $bool (default 0) Enable strict subject checking. echo => $bool (default 1) Receive messages published by this client. no_responders => $bool (default 0) Enable no-responders notification for requests. reconnect => $bool (default 0) Enable automatic reconnection. reconnect_delay => $ms (default 2000) Delay between reconnect attempts. max_reconnect_attempts => $num (default 60) Maximum reconnect attempts. 0 = unlimited. connect_timeout => $ms Connection timeout. 0 = no timeout. ping_interval => $ms (default 120000) Interval for client-initiated PING. 0 = disabled. max_pings_outstanding => $num (default 2) Max unanswered PINGs before declaring stale connection. priority => $num (-2 to +2) EV watcher priority. keepalive => $seconds TCP keepalive interval. path => 'Str' Unix socket path. Mutually exclusive with "host". loop => EV::Loop EV loop to use. Default: "EV::default_loop". connect($host, [$port]) Connect to NATS server. Port defaults to 4222. connect_unix($path) Connect via Unix domain socket. disconnect Graceful disconnect. is_connected Returns true if connected. publish($subject, [$payload], [$reply_to]) Publish a message. Alias: "pub". $nats->publish('foo', 'hello'); $nats->publish('foo', 'hello', 'reply.subject'); hpublish($subject, $headers, [$payload], [$reply_to]) Publish with headers. Alias: "hpub". $nats->hpublish('foo', "NATS/1.0\r\nX-Key: val\r\n\r\n", 'body'); subscribe($subject, $cb, [$queue_group]) Subscribe to a subject. Returns subscription ID. Alias: "sub". my $sid = $nats->subscribe('foo.*', sub { my ($subject, $payload, $reply, $headers) = @_; }); Queue groups are preserved across reconnects. Callback receives: $subject - actual subject the message was published to $payload - message body $reply - reply-to subject (undef if none) $headers - raw headers string (only for HMSG) subscribe_max($subject, $cb, $max_msgs, [$queue_group]) Subscribe and auto-unsubscribe after $max_msgs messages in one call. unsubscribe($sid, [$max_msgs]) Unsubscribe. With $max_msgs, auto-unsubscribes after receiving that many messages. Auto-unsub state is restored on reconnect. Alias: "unsub". request($subject, $payload, $cb, [$timeout_ms]) Request/reply. Uses automatic inbox subscription. Alias: "req". $nats->request('service', 'data', sub { my ($response, $err) = @_; die $err if $err; print "got: $response\n"; }, 5000); Callback receives "($response, $error)". Error is set on timeout ("request timeout") or no responders ("no responders"). drain([$cb]) Graceful shutdown: sends UNSUB for all subscriptions, flushes pending writes with a PING fence, fires $cb when the server confirms with PONG, then disconnects. No new messages will be received after drain is initiated. $nats->drain(sub { print "drained, safe to exit\n"; }); ping Send PING to server. flush Send PING as a write fence; the subsequent PONG guarantees all prior messages were processed by the server. server_info Returns raw INFO JSON string from server. max_payload([$limit]) Get/set max payload size. waiting_count Number of writes queued locally (during connect/reconnect). skip_waiting Cancel all waiting writes. reconnect($enable, [$delay_ms], [$max_attempts]) Configure reconnection. reconnect_enabled Returns true if reconnect is enabled. connect_timeout([$ms]) Get/set connect timeout. ping_interval([$ms]) Get/set PING interval. max_pings_outstanding([$num]) Get/set max outstanding PINGs. priority([$num]) Get/set EV watcher priority. keepalive([$seconds]) Get/set TCP keepalive. batch($coderef) Batch multiple publishes into a single write. Suppresses per-publish write scheduling; all buffered data is flushed after the coderef returns. $nats->batch(sub { $nats->publish("foo.$_", "msg-$_") for 1..1000; }); slow_consumer($bytes_threshold, [$cb]) Enable slow consumer detection. When the write buffer exceeds $bytes_threshold bytes, $cb is called with the current buffer size. $nats->slow_consumer(1024*1024, sub { my ($pending_bytes) = @_; warn "slow consumer: ${pending_bytes}B pending\n"; }); on_lame_duck([$cb]) Get/set callback for lame duck mode. Fired when the server signals it's shutting down (leaf node / rolling restart). Use this to migrate to another server. nkey_seed($seed) Set NKey seed for Ed25519 authentication (requires OpenSSL at build time). The seed is a base32-encoded NATS NKey. The server nonce from INFO is automatically signed during CONNECT. $nats->nkey_seed('SUAM...'); Or via constructor: "nkey_seed => 'SUAM...'". jwt($token) Set user JWT for authentication. Combined with "nkey_seed" for NATS decentralized auth. tls($enable, [$ca_file], [$skip_verify]) Configure TLS (requires OpenSSL at build time). $nats->tls(1); # system CA $nats->tls(1, '/path/to/ca.pem'); # custom CA $nats->tls(1, undef, 1); # skip verification Or via constructor: "tls => 1, tls_ca_file => $path". stats Returns a hash of connection statistics: my %s = $nats->stats; # msgs_in, msgs_out, bytes_in, bytes_out reset_stats Reset all stats counters to zero. on_error([$cb]) on_connect([$cb]) on_disconnect([$cb]) Get/set handler callbacks. BENCHMARKS Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12, 100-byte payloads ("bench/benchmark.pl"): 100K msgs 200K msgs PUB fire-and-forget 4.7M 5.0M msgs/sec PUB + SUB (loopback) 1.8M 1.6M msgs/sec PUB + SUB (8B payload) 2.2M 1.9M msgs/sec REQ/REP (pipelined, 128) 334K msgs/sec Connected-path publish appends directly to the write buffer with no per-message allocation. Write coalescing via "ev_prepare" batches all publishes per event-loop iteration into a single write() syscall. Run "perl bench/benchmark.pl" for full results. Set "BENCH_MESSAGES", "BENCH_PAYLOAD", "BENCH_HOST", "BENCH_PORT" to customize. NATS PROTOCOL This module implements the NATS client protocol directly in XS. The protocol is text-based with CRLF-delimited control lines and binary payloads. Connection flow: server sends INFO, client sends CONNECT + PING, server responds with PONG to confirm. All subscriptions (including queue groups and auto-unsub state) are automatically restored on reconnect. Request/reply uses a single wildcard inbox subscription ("_INBOX..*") for all requests, with unique suffixes per request. CAVEATS * DNS resolution via "getaddrinfo" is blocking. Use numeric IP addresses for latency-sensitive applications. * TLS requires OpenSSL headers at build time (auto-detected). * NKey auth requires OpenSSL with Ed25519 support (1.1.1+). * The module handles all data as bytes. Encode UTF-8 strings before passing them. ENVIRONMENT TEST_NATS_HOST, TEST_NATS_PORT Set these to run the test suite against a NATS server (default: 127.0.0.1:4222). SEE ALSO EV, NATS protocol , nats-server AUTHOR vividsnow LICENSE This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.