NAME EV::Kafka - High-performance asynchronous Kafka/Redpanda client using EV SYNOPSIS use EV::Kafka; my $kafka = EV::Kafka->new( brokers => '127.0.0.1:9092', acks => -1, on_error => sub { warn "kafka: @_" }, on_message => sub { my ($topic, $partition, $offset, $key, $value, $headers) = @_; print "$topic:$partition @ $offset $key = $value\n"; }, ); # Producer $kafka->connect(sub { $kafka->produce('my-topic', 'key', 'value', sub { my ($result, $err) = @_; say "produced at offset " . $result->{topics}[0]{partitions}[0]{base_offset}; }); }); # Consumer (manual assignment) $kafka->assign([{ topic => 'my-topic', partition => 0, offset => 0 }]); my $poll = EV::timer 0, 0.1, sub { $kafka->poll }; # Consumer group $kafka->subscribe('my-topic', group_id => 'my-group', on_assign => sub { ... }, on_revoke => sub { ... }, ); EV::run; DESCRIPTION EV::Kafka is a high-performance asynchronous Kafka client that implements the Kafka binary protocol in XS with EV event loop integration. It targets Redpanda and Apache Kafka (protocol version 0.11+). Two-layer architecture: * EV::Kafka::Conn (XS) -- single broker TCP connection with protocol encoding/decoding, correlation ID matching, pipelining, optional TLS and SASL/PLAIN authentication. * EV::Kafka::Client (Perl) -- cluster management with metadata discovery, broker connection pooling, partition leader routing, producer with key-based partitioning, consumer with manual assignment or consumer groups. Features: * Binary protocol implemented in pure XS (no librdkafka dependency) * Automatic request pipelining per broker connection * Metadata-driven partition leader routing * Producer: acks modes (-1/0/1), key-based partitioning (murmur2), headers, fire-and-forget (acks=0) * Consumer: manual partition assignment, offset tracking, poll-based message delivery * Consumer groups: JoinGroup/SyncGroup/Heartbeat, sticky partition assignment, offset commit/fetch, automatic rebalancing * TLS (OpenSSL) and SASL/PLAIN authentication * Automatic reconnection at the connection layer * Bootstrap broker failover (tries all listed brokers) ANYEVENT INTEGRATION AnyEvent has EV as one of its backends, so EV::Kafka can be used in AnyEvent applications seamlessly. NO UTF-8 SUPPORT This module handles all values as bytes. Encode your UTF-8 strings before passing them: use Encode; $kafka->produce($topic, $key, encode_utf8($val), sub { ... }); CLUSTER CLIENT METHODS new(%options) Create a new EV::Kafka client. Returns a blessed "EV::Kafka::Client" object. my $kafka = EV::Kafka->new( brokers => '10.0.0.1:9092,10.0.0.2:9092', acks => -1, on_error => sub { warn @_ }, ); Options: brokers => 'Str' Comma-separated list of bootstrap broker addresses (host:port). Default: "127.0.0.1:9092". client_id => 'Str' (default 'ev-kafka') Client identifier sent to brokers. tls => Bool Enable TLS encryption. tls_ca_file => 'Str' Path to CA certificate file for TLS verification. tls_skip_verify => Bool Skip TLS certificate verification. sasl => \%opts Enable SASL authentication. Supports PLAIN mechanism: sasl => { mechanism => 'PLAIN', username => 'user', password => 'pass' } acks => Int (default -1) Producer acknowledgment mode. -1 = all in-sync replicas, 0 = no acknowledgment (fire-and-forget), 1 = leader only. linger_ms => Int (default 5) Time in milliseconds to accumulate records before flushing a batch. Lower values reduce latency; higher values improve throughput. batch_size => Int (default 16384) Maximum batch size in bytes before a batch is flushed immediately. compression => 'Str' Compression type for produce batches: 'lz4' (requires liblz4), 'gzip' (requires zlib), or "undef" for none. idempotent => Bool (default 0) Enable idempotent producer. Calls "InitProducerId" on connect and sets producer_id/epoch/sequence in each RecordBatch for exactly-once delivery (broker-side deduplication). transactional_id => 'Str' Enable transactional producer. Implies idempotent. Required for "begin_transaction"/"commit_transaction"/"abort_transaction" and "send_offsets_to_transaction" (full EOS). partitioner => $cb->($topic, $key, $num_partitions) Custom partition selection function. Default: murmur2 hash of key, or round-robin for null keys. on_error => $cb->($errstr) Error callback. Default: "die". on_connect => $cb->() Called once after initial metadata fetch completes. on_message => $cb->($topic, $partition, $offset, $key, $value, $headers) Message delivery callback for consumer operations. fetch_max_wait_ms => Int (default 500) Maximum time the broker waits for "fetch_min_bytes" of data. fetch_max_bytes => Int (default 1048576) Maximum bytes per fetch response. fetch_min_bytes => Int (default 1) Minimum bytes before the broker responds to a fetch. metadata_refresh => Int (default 300) Metadata refresh interval in seconds (reserved, not yet wired). loop => EV::Loop EV loop to use. Default: "EV::default_loop". connect($cb) Connect to the cluster. Connects to the first available bootstrap broker, fetches cluster metadata, then fires "$cb-"($metadata)>. $kafka->connect(sub { my $meta = shift; # $meta->{brokers}, $meta->{topics} }); produce($topic, $key, $value, [\%opts,] [$cb]) Produce a message. Routes to the correct partition leader automatically. # with callback (acks=1 or acks=-1) $kafka->produce('topic', 'key', 'value', sub { my ($result, $err) = @_; }); # with headers $kafka->produce('topic', 'key', 'value', { headers => { 'h1' => 'v1' } }, sub { ... }); # fire-and-forget (acks=0) $kafka->produce('topic', 'key', 'value'); # explicit partition $kafka->produce('topic', 'key', 'value', { partition => 3 }, sub { ... }); produce_many(\@messages, $cb) Produce multiple messages with a single completion callback. Each message is an arrayref "[$topic, $key, $value]" or a hashref "{topic, key, value}". $cb fires when all messages are acknowledged. $kafka->produce_many([ ['my-topic', 'k1', 'v1'], ['my-topic', 'k2', 'v2'], ], sub { my $errors = shift; warn "some failed: @$errors" if $errors; }); flush([$cb]) Flush all accumulated produce batches and wait for all in-flight requests to complete. $cb fires when all pending responses have been received. assign(\@partitions) Manually assign partitions for consuming. $kafka->assign([ { topic => 'my-topic', partition => 0, offset => 0 }, { topic => 'my-topic', partition => 1, offset => 100 }, ]); seek($topic, $partition, $offset, [$cb]) Seek a partition to a specific offset. Use -2 for earliest, -1 for latest. Updates the assignment in-place. $kafka->seek('my-topic', 0, -1, sub { print "at latest\n" }); offsets_for($topic, $cb) Get earliest and latest offsets for all partitions of a topic. $kafka->offsets_for('my-topic', sub { my $offsets = shift; # { 0 => { earliest => 0, latest => 42 }, 1 => ... } }); lag($cb) Get consumer lag for all assigned partitions. $kafka->lag(sub { my $lag = shift; # { "topic:0" => { current => 10, latest => 42, lag => 32 } } }); error_name($code) Convert a Kafka numeric error code to its name. EV::Kafka::Client::error_name(3) # "UNKNOWN_TOPIC_OR_PARTITION" poll([$cb]) Fetch messages from assigned partitions. Calls "on_message" for each received record. $cb fires when all fetch responses have arrived. my $timer = EV::timer 0, 0.1, sub { $kafka->poll }; subscribe($topic, ..., %opts) Join a consumer group and subscribe to topics. The group protocol handles partition assignment automatically. $kafka->subscribe('topic-a', 'topic-b', group_id => 'my-group', session_timeout => 30000, # ms rebalance_timeout => 60000, # ms heartbeat_interval => 3, # seconds auto_commit => 1, # commit on unsubscribe (default) auto_offset_reset => 'earliest', # or 'latest' group_instance_id => 'pod-abc', # KIP-345 static membership on_assign => sub { my $partitions = shift; # [{topic, partition, offset}, ...] }, on_revoke => sub { my $partitions = shift; }, ); commit([$cb]) Commit current consumer offsets to the group coordinator. $kafka->commit(sub { my $err = shift; warn "commit failed: $err" if $err; }); unsubscribe([$cb]) Leave the consumer group (sends LeaveGroup for fast rebalance), stop heartbeat and fetch loop. If "auto_commit" is enabled, commits offsets before leaving. begin_transaction Start a transaction. Requires "transactional_id" in constructor. send_offsets_to_transaction($group_id, [$cb]) Commit consumer offsets within the current transaction via "TxnOffsetCommit". This is the key step for exactly-once consume-process-produce pipelines. $kafka->send_offsets_to_transaction('my-group', sub { my ($result, $err) = @_; }); commit_transaction([$cb]) Commit the current transaction. All produced messages and offset commits within the transaction become visible atomically. abort_transaction([$cb]) Abort the current transaction. All produced messages are discarded and offset commits are rolled back. close([$cb]) Graceful shutdown: stop timers, disconnect all broker connections. $kafka->close(sub { EV::break }); LOW-LEVEL CONNECTION METHODS "EV::Kafka::Conn" provides direct access to a single broker connection. Useful for custom protocols, debugging, or when cluster-level routing is not needed. my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef); $conn->on_error(sub { warn @_ }); $conn->on_connect(sub { ... }); $conn->connect('127.0.0.1', 9092, 5.0); connect($host, $port, [$timeout]) Connect to a broker. Timeout in seconds (0 = no timeout). disconnect Disconnect from broker. connected Returns true if the connection is ready (ApiVersions handshake complete). metadata(\@topics, $cb) Request cluster metadata. Pass "undef" for all topics. $conn->metadata(['my-topic'], sub { my ($result, $err) = @_; # $result->{brokers}, $result->{topics} }); produce($topic, $partition, $key, $value, [\%opts,] [$cb]) Produce a message to a specific partition. $conn->produce('topic', 0, 'key', 'value', sub { my ($result, $err) = @_; }); Options: "acks" (default 1), "headers" (hashref), "timestamp" (epoch ms, default now), "compression" ('none', 'lz4'; requires LZ4 at build time). produce_batch($topic, $partition, \@records, [\%opts,] [$cb]) Produce multiple records in a single RecordBatch. Each record is "{key, value, headers}". Options: "acks", "compression", "producer_id", "producer_epoch", "base_sequence". $conn->produce_batch('topic', 0, [ { key => 'k1', value => 'v1' }, { key => 'k2', value => 'v2' }, ], sub { my ($result, $err) = @_ }); fetch($topic, $partition, $offset, $cb, [$max_bytes]) Fetch messages from a partition starting at $offset. $conn->fetch('topic', 0, 0, sub { my ($result, $err) = @_; for my $rec (@{ $result->{topics}[0]{partitions}[0]{records} }) { printf "offset=%d key=%s value=%s\n", $rec->{offset}, $rec->{key}, $rec->{value}; } }); fetch_multi(\%topics, $cb, [$max_bytes]) Multi-partition fetch in a single request. Groups multiple topic-partitions into one Fetch call to the broker. $conn->fetch_multi({ 'topic-a' => [{ partition => 0, offset => 10 }, { partition => 1, offset => 20 }], 'topic-b' => [{ partition => 0, offset => 0 }], }, sub { my ($result, $err) = @_ }); Used internally by poll() to batch fetches by broker leader. list_offsets($topic, $partition, $timestamp, $cb) Get offsets by timestamp. Use -2 for earliest, -1 for latest. find_coordinator($key, $cb, [$key_type]) Find the coordinator broker. $key_type: 0=group (default), 1=transaction. join_group($group_id, $member_id, \@topics, $cb, [$session_timeout_ms, $rebalance_timeout_ms, $group_instance_id]) Join a consumer group. Pass $group_instance_id for KIP-345 static membership. sync_group($group_id, $generation_id, $member_id, \@assignments, $cb, [$group_instance_id]) Synchronize group state after join. heartbeat($group_id, $generation_id, $member_id, $cb, [$group_instance_id]) Send heartbeat to group coordinator. offset_commit($group_id, $generation_id, $member_id, \@offsets, $cb) Commit consumer offsets. offset_fetch($group_id, \@topics, $cb) Fetch committed offsets for a consumer group. api_versions Returns a hashref of supported API keys to max versions, or undef if not yet negotiated. my $vers = $conn->api_versions; # { 0 => 7, 1 => 11, 3 => 8, ... } on_error([$cb]) on_connect([$cb]) on_disconnect([$cb]) Set handler callbacks. Pass "undef" to clear. client_id($id) Set the client identifier. tls($enable, [$ca_file, $skip_verify]) Configure TLS. sasl($mechanism, [$username, $password]) Configure SASL authentication. auto_reconnect($enable, [$delay_ms]) Enable automatic reconnection with delay in milliseconds (default 1000). leave_group($group_id, $member_id, $cb) Send LeaveGroup to coordinator for fast partition rebalance. create_topics(\@topics, $timeout_ms, $cb) Create topics. Each element: "{name, num_partitions, replication_factor}". $conn->create_topics( [{ name => 'new-topic', num_partitions => 3, replication_factor => 1 }], 5000, sub { my ($res, $err) = @_ } ); delete_topics(\@topic_names, $timeout_ms, $cb) Delete topics by name. init_producer_id($transactional_id, $txn_timeout_ms, $cb) Initialize a producer ID for idempotent/transactional produce. Pass "undef" for non-transactional idempotent producer. add_partitions_to_txn($txn_id, $producer_id, $epoch, \@topics, $cb) Register partitions with the transaction coordinator. end_txn($txn_id, $producer_id, $epoch, $committed, $cb) Commit ("$committed=1") or abort ("$committed=0") a transaction. txn_offset_commit($txn_id, $group_id, $producer_id, $epoch, $generation, $member_id, \@offsets, $cb) Commit consumer offsets within a transaction (API 28). pending Number of requests awaiting broker response. state Connection state as integer (0=disconnected, 6=ready). UTILITY FUNCTIONS EV::Kafka::_murmur2($key) Kafka-compatible murmur2 hash. Returns a non-negative 31-bit integer. EV::Kafka::_crc32c($data) CRC32C checksum (Castagnoli). Used internally for RecordBatch integrity. EV::Kafka::_error_name($code) Convert Kafka error code to string name. RESULT STRUCTURES Produce result $result = { topics => [{ topic => 'name', partitions => [{ partition => 0, error_code => 0, base_offset => 42, }], }], }; Fetch result $result = { topics => [{ topic => 'name', partitions => [{ partition => 0, error_code => 0, high_watermark => 100, records => [{ offset => 42, timestamp => 1712345678000, key => 'key', # or undef value => 'value', # or undef headers => { h => 'v' }, # if present }], }], }], }; Metadata result $result = { controller_id => 0, brokers => [{ node_id => 0, host => '10.0.0.1', port => 9092 }], topics => [{ name => 'topic', error_code => 0, partitions => [{ partition => 0, leader => 0, error_code => 0, }], }], }; ERROR HANDLING Errors are delivered through two channels: Connection-level errors fire the "on_error" callback (or "croak" if none set). These include connection refused, DNS failure, TLS errors, SASL auth failure, and protocol violations. Request-level errors are delivered as the second argument to the request callback: "$cb->($result, $error)". If $error is defined, $result may be undef. Within result structures, per-partition "error_code" fields use Kafka numeric codes: 0 No error 1 OFFSET_OUT_OF_RANGE 3 UNKNOWN_TOPIC_OR_PARTITION 6 NOT_LEADER_OR_FOLLOWER 15 COORDINATOR_NOT_AVAILABLE 16 NOT_COORDINATOR 25 UNKNOWN_MEMBER_ID 27 REBALANCE_IN_PROGRESS 36 TOPIC_ALREADY_EXISTS 79 MEMBER_ID_REQUIRED When a broker disconnects mid-flight, all pending callbacks receive "(undef, "connection closed by broker")" or "(undef, "disconnected")". ENVIRONMENT VARIABLES These are used by tests and examples (not by the module itself): TEST_KAFKA_BROKER broker address for tests (host:port) KAFKA_BROKER broker address for examples KAFKA_HOST broker hostname for low-level examples KAFKA_PORT broker port for low-level examples KAFKA_TOPIC topic name for examples KAFKA_GROUP_ID consumer group for examples KAFKA_LIMIT message limit for consume example KAFKA_COUNT message count for fire-and-forget BENCH_BROKER broker for benchmarks BENCH_MESSAGES message count for benchmarks BENCH_VALUE_SIZE value size in bytes for benchmarks BENCH_TOPIC topic name for benchmarks QUICK START Minimal producer + consumer lifecycle: use EV; use EV::Kafka; my $kafka = EV::Kafka->new( brokers => '127.0.0.1:9092', acks => 1, on_error => sub { warn "kafka: @_\n" }, on_message => sub { my ($topic, $part, $offset, $key, $value) = @_; print "got: $key=$value\n"; }, ); $kafka->connect(sub { # produce $kafka->produce('test', 'k1', 'hello', sub { print "produced\n"; # consume from the beginning $kafka->assign([{topic=>'test', partition=>0, offset=>0}]); $kafka->seek('test', 0, -2, sub { my $t = EV::timer 0, 0.1, sub { $kafka->poll }; $kafka->{cfg}{_t} = $t; }); }); }); EV::run; COOKBOOK Produce JSON with headers use JSON::PP; my $json = JSON::PP->new->utf8; $kafka->produce('events', 'user-42', $json->encode({ action => 'click', page => '/home' }), { headers => { 'content-type' => 'application/json' } }, sub { ... } ); Consume from latest offset only $kafka->subscribe('live-feed', group_id => 'realtime', auto_offset_reset => 'latest', on_assign => sub { print "ready\n" }, ); Graceful shutdown $SIG{INT} = sub { $kafka->commit(sub { $kafka->unsubscribe(sub { $kafka->close(sub { EV::break }); }); }); }; At-least-once processing $kafka->subscribe('jobs', group_id => 'workers', auto_commit => 0, ); # in on_message: process, then commit on_message => sub { process($_[4]); $kafka->commit if ++$count % 100 == 0; }, Batch produce $kafka->produce_many([ ['events', 'k1', 'v1'], ['events', 'k2', 'v2'], ['events', 'k3', 'v3'], ], sub { my $errs = shift; print $errs ? "some failed\n" : "all done\n"; }); Exactly-once stream processing (EOS) my $kafka = EV::Kafka->new( brokers => '...', transactional_id => 'my-eos-app', acks => -1, on_message => sub { my ($t, $p, $off, $key, $value) = @_; my $result = process($value); $kafka->produce('output-topic', $key, $result); }, ); # consume-process-produce loop: $kafka->begin_transaction; $kafka->poll(sub { $kafka->send_offsets_to_transaction('my-group', sub { $kafka->commit_transaction(sub { $kafka->begin_transaction; # next transaction }); }); }); Topic administration my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef); $conn->on_connect(sub { $conn->create_topics( [{ name => 'new-topic', num_partitions => 6, replication_factor => 3 }], 10000, sub { ... } ); }); BENCHMARKS Measured on Linux with TCP loopback to Redpanda, 100-byte values, Perl 5.40.2, 50K messages ("bench/benchmark.pl"): Pipeline produce (acks=1) 68K msg/sec 7.4 MB/s Fire-and-forget (acks=0) 100K msg/sec 11.0 MB/s Fetch throughput 31K msg/sec 3.4 MB/s Sequential round-trip 19K msg/sec 54 us avg latency Metadata request 25K req/sec 41 us avg latency Throughput by value size (pipelined, acks=1): 10 bytes 61K msg/sec 0.9 MB/s 100 bytes 68K msg/sec 7.4 MB/s 1000 bytes 50K msg/sec 50.2 MB/s 10000 bytes 18K msg/sec 178.5 MB/s Pipeline produce throughput is limited by Perl callback overhead per message. Fire-and-forget mode ("acks=0") skips the response cycle entirely, reaching ~100K msg/sec. Sequential round-trip (one produce, wait for ack, repeat) measures raw broker latency at ~54 microseconds. The fetch path is sequential (fetch, process, fetch again) which introduces one round-trip per batch. With larger "max_bytes" and dense topics, fetch throughput increases proportionally. Run "perl bench/benchmark.pl" for throughput results. Set "BENCH_BROKER", "BENCH_MESSAGES", "BENCH_VALUE_SIZE", and "BENCH_TOPIC" to customize. Run "perl bench/latency.pl" for a latency histogram with percentiles (min, avg, median, p90, p95, p99, max). KAFKA PROTOCOL This module implements the Kafka binary protocol directly in XS. All integers are big-endian. Requests use a 4-byte size prefix followed by a header (API key, version, correlation ID, client ID) and a version-specific body. Responses are matched to requests by correlation ID. The broker guarantees FIFO ordering per connection, so the response queue is a simple FIFO. RecordBatch encoding (magic=2) is used for produce. CRC32C covers the batch from attributes through the last record. Records use ZigZag-encoded varints for lengths and deltas. The connection handshake sends ApiVersions (v0) on connect to discover supported protocol versions. SASL authentication uses SaslHandshake (v1) + SaslAuthenticate (v2) with PLAIN mechanism. Consumer group protocol uses sticky partition assignment with MEMBER_ID_REQUIRED (error 79) retry per KIP-394. Non-flexible API versions are used throughout (capped below the flexible-version threshold for each API) to avoid the compact encoding complexity. LIMITATIONS * LZ4 and gzip compression -- supported when built with liblz4 and zlib. snappy and zstd are not implemented. * Transactions / EOS -- "begin_transaction", "send_offsets_to_transaction", "commit_transaction", "abort_transaction" provide full exactly-once stream processing. "InitProducerId", "AddPartitionsToTxn", "TxnOffsetCommit", "EndTxn" are all wired. Requires "transactional_id" in constructor. * No GSSAPI/OAUTHBEARER -- SASL/PLAIN and SCRAM-SHA-256/512 are supported. GSSAPI (Kerberos) and OAUTHBEARER are not implemented. * Sticky partition assignment -- assignments are preserved across rebalances where possible. New partitions are distributed to the least-loaded member. Overloaded members shed excess partitions. * Blocking DNS resolution -- "getaddrinfo" is called synchronously in "conn_start_connect". For fully non-blocking operation, use IP addresses instead of hostnames. * No flexible API versions -- all API versions are capped below the flexible-version threshold to avoid compact string/array encoding. This limits interoperability with very new protocol features but works with all Kafka 0.11+ and Redpanda brokers. * Limited produce retry -- transient errors (NOT_LEADER, COORDINATOR_NOT_AVAILABLE) trigger metadata refresh and up to 3 retries with backoff. Non-retriable errors are surfaced to the callback immediately. AUTHOR vividsnow LICENSE This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.