Kafka 生产者 API
测试 kakfa
生产者 Api 的 demo
pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
样例代码
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.Future;
public class Demo1 {
private static final Logger logger = LoggerFactory.getLogger(Demo1.class);
Properties kafkaProps;
KafkaProducer<String, String> producer;
@Before
public void initProperties(){
kafkaProps = new Properties();
// 指定broker地址,格式: host:port.可以多个地址,用逗号拼接
kafkaProps.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
// 指定key的序列化方式
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// value的序列化方式
kafkaProps.put("value.serializer"
, "org.apache.kafka.common.serialization.StringSerializer");
// 创建producer
producer = new KafkaProducer<>(kafkaProps);
}
/**
* 同步发送消息,不获取future对象
*/
@Test
public void demo1(){
// 创建消息record,三个参数,topic,key,value
ProducerRecord<String, String> record =
new ProducerRecord<>("ProgramingLanguage", "java", "ok");
try {
// 该方法会返回一个Future对象,如果不关心结果,那可以注释掉
/*Future<RecordMetadata> send = */
producer.send(record);
} catch (Exception e){
// producer可能发生异常,比如SerializationException,BufferExhaustedException
// 或TimeoutException
}
}
/**
* 同步发送消息,获取future对象
*/
@Test
public void demo2(){
// 创建消息record,三个参数,topic,key,value
ProducerRecord<String, String> record =
new ProducerRecord<>("ProgramingLanguage", "scala", "not bad");
// producer一般有两类错误,一类是可重试错误,这类错误可以通过重发消息解决.
// 另一种错误,消息太大,这类错误不会重试
try{
// 获取future中的recordMetadata对象,里面包含了偏移量offset
RecordMetadata recordMetadata = producer.send(record).get();
logger.info("recordMetadata的offset = {}",recordMetadata.offset());
} catch (Exception e){
// 异常如果发生错误,比如broker返回不允许重发消息的异常或已经超过了重发次数会抛异常
e.printStackTrace();
}
}
/**
* 异步发送消息
*/
@Test
public void demo3(){
// 实现org.apache.kafka.clients.producer.Callback接口
class ProducerCallBack implements Callback{
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// 如果onCompletion的异常不为空,说明服务端有异常
if(e!=null){
e.printStackTrace();
}
}
}
// 创建消息record,三个参数,topic,key,value
ProducerRecord<String, String> record =
new ProducerRecord<>("ProgramingLanguage", "python", "not bad");
// send重载方法,第二个参数传回调
producer.send(record,new ProducerCallBack());
// lambda表达式写法
// producer.send(record,
// (recordMetadata,e)->{ if(e!=null) e.printStackTrace();});
}
}
异常
生产者配置的参数
acks
acks 参数指定了 必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的 。
acks=0
, 生产者在成功写入悄息之前不会等待任何来自服务器的响应。消息丢了就丢了,可靠性差,吞吐量高
如果 acks=1
,只要集群的首领节点收到消息,会收到一个来自服务器的成功响应。同步消息延迟会低一些,异步会高一些,也有可能丢消息
- 如果
acks=all
,只有当所有参与复制的节点全部收到消息时,才会收到一个来自
服务器的成功响应。可靠性高,但是延迟也很高.
-
buffer.me mory
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息
-
compression .type
默认不会被压缩。可以设置为 snappy
、 gzip
或 lz4
,它指定了
消息发送前的压缩类型。 snappy
的 cpu 使用少,gzip
cpu 使用多,压缩效果好.
-
retries
producer 重发消息的次数,达到这个次数,producer 放弃重试并返回错误。默认情况下,producer 会在每次重试之间等待 100ms ,retry.backoff.mx
参数设置这个间隔。
-
batch.size
同一批次消息的数量,单位是字节数, 不会一直等到足够,配合下面那个参数使用.
-
linger.ms
producer 发送批次等待时间,到达这个时间就发送.
-
client.id
该参数可以是任意的字符串,服务器会用它来识别消息的来源。
-
max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用
越多的内存,不过也会提升吞吐量。 把它设为 1 可以保证消息是按照发送的顺序写入服务
器的,即使发生了重试。
-
timeout.ms,request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms
为 producer 等待服务器响应的时间,metadata.fetch.timeout.ms
为 producer 获取 RecordMetadata 时等待服务器响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调). timeout.ms
指定了 broker 等待同步副本返回消息确认的时间,与
asks
的配置相匹配一一如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回
一个错误 。
-
max.block.ms
send()
方法或 partitionsFor()
方法获取元数据时 producer 的阻塞时间.超时会抛出异常.
-
max.request.size
producer 发送的请求大小.可以是单个消息大小也可以是单个批次消息大小.
receive.buffer.bytes和send.buffer.bytes
这两个参数分别指定了 TCP socket
接收和发送数据包的缓冲区大小。 如果它们被设为 - 1 ,
就使用操作系统的默认值。
参考:<<Kafka权威指南>>
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于