NAME
    Net::Kafka::Producer::Avro - Apache Kafka message producer based on
    librdkafka, Avro serialization and Confluent Schema Registry validation.

SYNOPSIS
      use Net::Kafka::Producer::Avro;
      use Confluent::SchemaRegistry;
      use AnyEvent;
      use JSON;
  
  
      my $producer = Net::Kafka::Producer::Avro->new(
        'bootstrap.servers' => 'localhost:9092',
        'schema-registry'  => Confluent::SchemaRegistry->new(), # defaults to http://localhost:8081
        'compression.codec' => 'gzip',  # optional, one of: 'none' (def.), 'gzip', 'snappy', 'lz4', 'zstd'
        'log_level' => 0, # suppress librdkafka internal logging
        'error_cb' => sub {
          my ($self, $err, $msg) = @_;
          die "Connection error:\n\t- err: " . $err . "\n\t- msg: " . $msg . "\n";
        }
      );
  
      # creates the header object (if you need to add headers to the message)
      my $headers = Net::Kafka::Headers->new();
      $headers->add('my-header-1', 'foo');
      $headers->add('my-header-2', 'bar');
  
      my $condvar = AnyEvent->condvar;
  
      my $promise = $producer->produce(
        topic          => 'mytopic',
        partition      => 0,
        key            => 1000,
        key_schema     => to_json(
                            {
                              name => 'id',
                              type => 'long'
                            }
                          ),
        payload        => {
                            id  => 1210120,
                            f1  => 'text message'
                          },
        payload_schema => to_json(
                            {
                              type => 'record',
                              name => 'myrecord',
                              fields => [
                                {
                                  name => 'id',
                                  type => 'long'
                                },
                                {
                                  name => 'f1',
                                  type => 'string'
                                }
                              ]
                            }
                          ),
        headers        => $headers
      );
  
      die "Error requesting message production: " . $producer->get_error() . "\n"
        unless $promise;
  
      $promise->then(
        sub {
          my $delivery_report = shift;
          $condvar->send; # resolve the promise
          print "Message delivered with offset " . $delivery_report->{offset};
        }, 
        sub {
          my $error = shift;
          $condvar->send; # resolve the promise
          die "Unable to produce message: " . $error->{error} . ", code: " . $error->{code};
        }
      );
  
      $condvar->recv; # wait for the promise resolution
  
      print "Message produced", "\n";

DESCRIPTION
    "Net::Kafka::Producer::Avro" main goal is to provide object-oriented API
    to produce Avro-serialized messages according to *Confluent
    SchemaRegistry*.

    "Net::Kafka::Producer::Avro" inerhits from and extends
    Net::Kafka::Producer module.

INSTALL
    Installation of "Net::Kafka::Producer::Avro" is a canonical:

      perl Makefile.PL
      make
      make test
      make install

  TESTING TROUBLESHOOTING
    Tests are focused on verifying Avro-formatted messages and theirs
    interactions with Confluent Schema Registry and are intended to extend
    the "Net::Kafka::Producer"'s test suite.

    It's expected that a local Apache Kafka and Schema Registry services are
    listening on "localhost:9092" and "http://localhost:8081".

    You can either set different endpoints by exporting the following
    environment variables:

    "KAFKA_HOST"
    "KAFKA_PORT"
    "CONFLUENT_SCHEMA_REGISTY_URL"

    For example:

      export KAFKA_HOST=my-kafka-host.my-domain.org
      export KAFKA_PORT=9092
      export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org

USAGE
  CONSTRUCTOR
   "new"
    Creates a message producer.

    new() method expects the same arguments set as the Net::Kafka::Producer
    parent constructor.

    In addition, takes in the following mandatory argument:

    "SchemaRegistry => $schema_registry" (mandatory)
       Is a Confluent::SchemaRegistry instance.

  METHODS
    The following methods are defined for the "Net::Kafka::Producer::Avro"
    class:

   "schema_registry"()
    Returns the Confluent::SchemaRegistry instance supplied to the
    construcor.

   "get_error"()
    Returns a string containing last error message.

   produce( %named_params )
    Sends Avro-formatted key/message pairs.

    According to "Net::Kafka::Producer", returns a promise value if the
    message was successfully sent.

    In order to handle Avro format, the
    "Net::Kafka::Producer|Net::Kafka::Producer"'s produce() method has been
    extended with two more arguments, "key_schema" and "payload_schema":

      $producer->produce(
            topic             => $topic,             # scalar 
            partition         => $partition,         # scalar
            key_schema        => $key_schema,        # (optional) scalar representing a JSON string of the Avro schema to use for the key
            key               => $key,               # (optional) scalar | hashref
            payload_schema    => $payload_schema,    # (optional) scalar representing a JSON string of the Avro schema to use for the payload
            payload           => $payload,           # scalar | hashref
            timestamp         => $timestamp,         # (optional) scalar representing milliseconds since epoch
            headers           => $headers,           # (optional) Net::Kafka::Headers object
            # ...other params accepted by Net::Kafka::Producer's produce() method
      );

    Both $key_schema and $payload_schema parameters are optional and must
    provide a JSON strings representing the Avro schemas to use for
    validating and serializing key and payload.

    These schemas will be validated against the $schema_registry supplied to
    the "new" method and, if compliant, will be added to the registry under
    the "$topic+'key'" or "$topic+'value'" Schema Registry subjects.

    If a schema isn't provided, the latest version from Schema Registry will
    be used accordingly to the (topic + key/value) subject.

AUTHOR
    Alvaro Livraghi, <alvarol@cpan.org>

CONTRIBUTE
    <https://github.com/alivraghi/Net-Kafka-Producer-Avro>

BUGS
    Please use GitHub project link above to report problems or contact
    authors.

COPYRIGHT AND LICENSE
    Copyright 2026 by Alvaro Livraghi

    This program is free software; you can redistribute it and/or modify it
    under the same terms as Perl itself.

