kafka
Spring for Apache Kafka
一、Request🐟
- Apache Kafka Clients 3.3.x
- Spring Framework 6.0.x
- Minimum Java version: 17
二、入门demo🐟
.Maven依赖
.Springboot配置文件
.主题配置
.消费端
.生产端
Tip
在broker允许自动创建主题情况下,可NewTopic中配置主题名称、分区数量、分区副本数;
可以使用kafka提供的shell查看信息消费:
./kafka-console-consumer.sh --bootstrap-server 192.168.0.161:9092 --topic shafish-dev --from-beginning
三、主题配置🐟
Starting with version 2.7
broker端允许自动创建主题情况下
四、消息发送🐟
Abstract
Spring提供了
KafkaTemplate
封装类,可以方便得发送消息给主题。提供了KafkaTemplate的子类
RoutingKafkaTemplate
,可以根据不同的主题选择不同Producer进行发送
1.KafkaTemplate🐟
KafkaTemplate中提供消息发送的方法
Quote
调用
sendDefault
方法需要提前设置好默认的主题(spring:kafka:template:default-topic: shafish-dev
);如果配置主题时设置了
TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG
为CREATE_TIME
或者LOG_APPEND_TIME
,则会配置对应时间戳到消息中;3.0后消息发送返回的是异步回调
CompletableFuture
,当发送成功或者失败时会执行配置的回调方法;如果需要统一处理回调,可以实现
ProducerListener
方法,在其onSuccess
与onError
中编写回调逻辑(spring提供了一个默认实现LoggingProducerListener
);
.KafkaTemplate基本配置
可以在template中传入不同
ProducerFactory
进行不同使用场景的配置
.异步发送消息
.同步阻塞发送消息
.同步阻塞发送消息
2.RoutingKafkaTemplate🐟
RoutingKafkaTemplate不支持transactions
, execute
, flush
, or metrics
等操作,具体等后续深入理解。
.修改ProducerFactory
.消息生产者
.消息消费者
3.DefaultKafkaProducerFactory🐟
ProducerFactory用于创建kafka的producer实例,具体可以看KafkaProducerConfig
相关配置。在不使用事务的情况下,kafka默认只会创建一个单例的producer实例给客户端使用,但如果其中某个客户端调用了flush()
方法,会造成使用该producer实例的其他客户端也阻塞。
如果配置ProducerFactory的producerPerThread
为true,则会为每个线程都分配一个新的producer实例,避免flush阻塞问题。
4.ReplyingKafkaTemplate🐟
发送请求并需要获取回复的场景下使用
.创建收发主题
./kafka-topics.sh --create --bootstrap-server 192.168.0.162:9092 --replication-factor 2 --partitions 10 --topic shafish-dev-requests
./kafka-topics.sh --create --bootstrap-server 192.168.0.162:9092 --replication-factor 2 --partitions 10 --topic shafish-dev-replies
.producer配置
.消息生产者
@RestController
@RequestMapping("api/v1/message")
public class MessageController {
@Autowire
private ReplyingKafkaTemplate replyingKafkaTemplate;
@PostMapping("/publishReply")
public String publishReply(@RequestBody MessageQuery query) throws ExecutionException, InterruptedException, TimeoutException {
ProducerRecord<String, String> record = new ProducerRecord<>("shafish-dev-requests", query.message()); //KafkaProperties.DEVREQUEST
// record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, query.message().getBytes()));
RequestReplyFuture<String, String, String> requestReplyFuture = replyingKafkaTemplate.sendAndReceive(record);
SendResult<String, String> sendResult = requestReplyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("send:"+sendResult.getRecordMetadata());
ConsumerRecord<String, String> stringStringConsumerRecord = requestReplyFuture.get(10, TimeUnit.SECONDS);
System.out.println("return value:"+stringStringConsumerRecord.value());
return stringStringConsumerRecord.value();
}
}
.消息消费
@Component
public class ConsumerListener {
//KafkaProperties.DEVREQUEST
@KafkaListener(topics = "shafish-dev-requests", groupId = KafkaProperties.consumerGroupId)
@SendTo
public String requestListener(ConsumerRecord<String, String> record) {
Optional<String> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
System.out.println(message.get());
// Message<String> build = MessageBuilder.withPayload(record.value().toUpperCase())
// //.setHeader(KafkaHeaders.TOPIC, replyTo)
// .setHeader(KafkaHeaders.KEY, 42)
// //.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
// .build();
return "yes";
}
return null;
}
}
五、消费接收🐟
Abstract
可以通过配置
MessageListenerContainer
监听器或者使用@KafkaListener
注解的方式接收kafka消息。
1.message listener🐟
八种消息监听
当监听消息时,需要提供一个监听者去接收消息,提供了八种监听接口:
方法中的consumer不是线程安全的
2.message listener containers🐟
两种实现类
提供了两个listenerContainer实现类:
KafkaMessageListenerContainer
和ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
在单线程中接收订阅主题的所有消息;ConcurrentMessageListenerContainer
提供一个或多个KafkaMessageListenerContainer
实例去消费消息
KafkaMessageListenerContainer
最后更新: 2024年6月21日 22:29:28