Kafka 2.3 Producer (0.9以后版本适用)

2022-10-17,,,,

kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。

这里直接使用最新2.3版本,0.9以后的版本都适用。

注意引用的包为:org.apache.kafka.clients.producer

import java.util.properties;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;

public class producerdemo {

    public static void main(string[] args) {

        properties properties = new properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");
        kafkaproducer<string, string> kafkaproducer = new kafkaproducer<string, string>(properties);
        kafkaproducer.send(new producerrecord<>("topic", "value"));
        kafkaproducer.close();

    }
    
}

0.11.0以后增加了事务,事务producer的示例代码如下,需要适用于0.11.0以后的版本:

import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producer;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.common.kafkaexception;
import org.apache.kafka.common.errors.authorizationexception;
import org.apache.kafka.common.errors.outofordersequenceexception;
import org.apache.kafka.common.errors.producerfencedexception;
import org.apache.kafka.common.serialization.stringserializer;

import java.util.properties;

public class transactionsproducerdemo {

    public static void main(string[] args) {

        properties props = new properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("transactional.id", "my-transactional-id");
        producer<string, string> producer = new kafkaproducer<>(props, new stringserializer(), new stringserializer());

        producer.inittransactions();

        try {
            producer.begintransaction();
            for (int i = 0; i < 100; i++)
                producer.send(new producerrecord<>("my-topic", integer.tostring(i), integer.tostring(i)));
            producer.committransaction();
        } catch (producerfencedexception | outofordersequenceexception | authorizationexception e) {
            // we can't recover from these exceptions, so our only option is to close the producer and exit.
            producer.close();
        } catch (kafkaexception e) {
            // for all other exceptions, just abort the transaction and try again.
            producer.aborttransaction();
        }
        producer.close();

    }
    
}

更多实时计算,kafka等相关技术博文,欢迎关注实时流式计算

《Kafka 2.3 Producer (0.9以后版本适用).doc》

下载本文的Word格式文档,以方便收藏与打印。