跳转至

kafka

🎣

Spring for Apache Kafka

一、Request🐟

  • Apache Kafka Clients 3.3.x
  • Spring Framework 6.0.x
  • Minimum Java version: 17

二、入门demo🐟

1⃣.Maven依赖
pom.xml
1
2
3
4
5
6
7
8
9
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
2⃣.Springboot配置文件
application.yml
spring:
  kafka:
    bootstrap-servers: 192.168.0.161:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: latest
3⃣.主题配置
TopicConfig.java
@Configuration
public class TopicConfig {
    @Bean
    public KafkaAdmin.NewTopics topicsCreate() {
        return new KafkaAdmin.NewTopics(
//                TopicBuilder.name("defaultBoth")
//                        .build(),
//                TopicBuilder.name("defaultPart")
//                        .replicas(1)
//                        .build(),
                TopicBuilder.name("shafish-dev")
                        .partitions(10)
                        .replicas(2)
                        .build());
    }
}
4⃣.消费端
ConsumerListener.java
@Component
public class ConsumerListener {
    @KafkaListener(topics = {KafkaProperties.devTopic}, groupId = KafkaProperties.consumerGroupId)
    void listener(ConsumerRecord<String, String> record) {
        Optional<String> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            System.out.println(message.get());
        }
    }
}
5⃣.生产端

MessageQuery.java
public record MessageQuery(String message) {
}
MessageController.java
@RestController
@RequestMapping("api/v1/message")
public class MessageController {

    private KafkaTemplate<String, String> kafkaTemplate;

    public MessageController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/publish")
    public void publish(@RequestBody MessageQuery query) {
        kafkaTemplate.send(KafkaProperties.devTopic, query.message());
    }
}

Tip

在broker允许自动创建主题情况下,可NewTopic中配置主题名称、分区数量、分区副本数;

可以使用kafka提供的shell查看信息消费:./kafka-console-consumer.sh --bootstrap-server 192.168.0.161:9092 --topic shafish-dev --from-beginning

三、主题配置🐟

Starting with version 2.7

broker端允许自动创建主题情况下
MyTopicConfig.java
@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth") // 主题名称
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1) // 副本数量
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3) // 分区数量
                .config(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime") // LOG_APPEND_TIME 设置主题消息的时间戳
                .build());
}

四、消息发送🐟

Abstract

Spring提供了KafkaTemplate封装类,可以方便得发送消息给主题。

提供了KafkaTemplate的子类RoutingKafkaTemplate,可以根据不同的主题选择不同Producer进行发送

1.KafkaTemplate🐟

KafkaTemplate中提供消息发送的方法
KafkaTemplate.java
CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.

void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

Quote

调用sendDefault方法需要提前设置好默认的主题(spring:kafka:template:default-topic: shafish-dev);

如果配置主题时设置了TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIGCREATE_TIME或者LOG_APPEND_TIME,则会配置对应时间戳到消息中; ConsumerRecord

3.0后消息发送返回的是异步回调CompletableFuture,当发送成功或者失败时会执行配置的回调方法;

1
2
3
4
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});
如果需要统一处理回调,可以实现ProducerListener方法,在其onSuccessonError中编写回调逻辑(spring提供了一个默认实现LoggingProducerListener);

1⃣.KafkaTemplate基本配置

可以在template中传入不同ProducerFactory进行不同使用场景的配置

KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String boostrapServers;

    @Bean
    public Map<String, Object> producerConfig() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}
// 发送string类型的消息
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

// 发送字节类型的消息
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

2⃣.异步发送消息

public record MessageQuery(String message) {
}
@RestController
@RequestMapping("api/v1/message")
public class MessageController {

    private KafkaTemplate<String, String> kafkaTemplate;

    public MessageController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/publish")
    public void publish(@RequestBody MessageQuery query) {
        kafkaTemplate.send(KafkaProperties.devTopic, query.message());
    }

    @PostMapping("/publishAsync")
    public void publishAsync(@RequestBody MessageQuery query) {
        CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(KafkaProperties.devTopic, query.message());
        send.whenComplete((result, ex) -> {
            if (ex == null) {
                // handleSuccess(data);
                System.out.println("success");
            }
            else {
                // handleFailure(data, record, ex);
                ex.printStackTrace();
            }
        });
    }

}

3⃣.同步阻塞发送消息

public record MessageQuery(String message) {
}
@RestController
@RequestMapping("api/v1/message")
public class MessageController {

    private KafkaTemplate<String, String> kafkaTemplate;

    public MessageController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/publish")
    public void publish(@RequestBody MessageQuery query) {
        kafkaTemplate.send(KafkaProperties.devTopic, query.message());
    }

    @PostMapping("/publishSync")
    public void publishSync(@RequestBody MessageQuery query) {
        try {
            kafkaTemplate.send(KafkaProperties.devTopic, query.message()).get(10, TimeUnit.SECONDS);
            // handleSuccess(data);
            System.out.println("success");
        }
        catch (ExecutionException e) {
            // handleFailure(data, record, e.getCause());
            e.printStackTrace();
        }
        catch (TimeoutException | InterruptedException e) {
            // handleFailure(data, record, e);
            e.printStackTrace();
        }

    }

}    

4⃣.同步阻塞发送消息
ConsumerListener.java
@Component
public class ConsumerListener {
    @KafkaListener(topics = {KafkaProperties.devTopic}, groupId = KafkaProperties.consumerGroupId)
    void listener(ConsumerRecord<String, String> record) {
        Optional<String> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            System.out.println(message.get());
        }
    }
}

2.RoutingKafkaTemplate🐟

RoutingKafkaTemplate不支持transactions, execute, flush, or metrics等操作,具体等后续深入理解。

1⃣.修改ProducerFactory
KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String boostrapServers;

    @Autowired
    private KafkaProperties kafkaProperties;

    public Map<String, Object> producerConfig() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public ProducerFactory<Object, Object> pf() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    /**
    * pf中使用了配置的producerConfig,其中的key与value都用String序列化;
    * 为了传输不同的主题信息就要配置不同的value序列化,比如这里的ByteArraySerializer
    */
    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
                                                ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("shafish-byte-dev"), bytesPF); // KafkaProperties.devByteTopic
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }
}
2⃣.消息生产者
MessageController.java
@RestController
@RequestMapping("api/v1/message")
public class MessageController {

    private RoutingKafkaTemplate routingKafkaTemplate;

    public MessageController(RoutingKafkaTemplate routingKafkaTemplate) {
        this.routingKafkaTemplate = routingKafkaTemplate;
    }

    @PostMapping("/publishByte")
    public void publishByte(@RequestBody MessageQuery query) {
        // 第二个参数就可以传入字节数组类型了
        routingKafkaTemplate.send(KafkaProperties.devByteTopic, query.message().getBytes());
        // handleSuccess(data);
        System.out.println("success");

    }

}
3⃣.消息消费者
ConsumerListener.java
@Component
public class ConsumerListener {

    @KafkaListener(topics = {"shafish-byte-dev"}, groupId = KafkaProperties.consumerGroupId)
    void byteListener(ConsumerRecord<String, String> record) {
        Optional<String> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            System.out.println(message.get());
        }
    }

}

3.DefaultKafkaProducerFactory🐟

ProducerFactory用于创建kafka的producer实例,具体可以看KafkaProducerConfig相关配置。在不使用事务的情况下,kafka默认只会创建一个单例的producer实例给客户端使用,但如果其中某个客户端调用了flush()方法,会造成使用该producer实例的其他客户端也阻塞。

如果配置ProducerFactory的producerPerThread为true,则会为每个线程都分配一个新的producer实例,避免flush阻塞问题。

4.ReplyingKafkaTemplate🐟

发送请求并需要获取回复的场景下使用

1⃣.创建收发主题

./kafka-topics.sh --create --bootstrap-server 192.168.0.162:9092 --replication-factor 2 --partitions 10 --topic shafish-dev-requests

./kafka-topics.sh --create --bootstrap-server 192.168.0.162:9092 --replication-factor 2 --partitions 10 --topic shafish-dev-replies

2⃣.producer配置
KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String boostrapServers;

    public Map<String, Object> producerConfig() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        DefaultKafkaProducerFactory<String, String> objectObjectDefaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(producerConfig());
        //objectObjectDefaultKafkaProducerFactory.setProducerPerThread(true);
        return objectObjectDefaultKafkaProducerFactory;
    }

    /**
    * 多实例消费
    */
    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("shafish-dev-replies"); // KafkaProperties.DEVREPLIES
        container.getContainerProperties().setGroupId("repliesGroup");
        container.setAutoStartup(false);
        return container;
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(ProducerFactory<String, String> producerFactory, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
        return new ReplyingKafkaTemplate<>(producerFactory, repliesContainer);
    }
}
3⃣.消息生产者
@RestController
@RequestMapping("api/v1/message")
public class MessageController {

    @Autowire
    private ReplyingKafkaTemplate replyingKafkaTemplate;

    @PostMapping("/publishReply")
    public String publishReply(@RequestBody MessageQuery query) throws ExecutionException, InterruptedException, TimeoutException {
        ProducerRecord<String, String> record = new ProducerRecord<>("shafish-dev-requests", query.message()); //KafkaProperties.DEVREQUEST
        // record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, query.message().getBytes()));
        RequestReplyFuture<String, String, String> requestReplyFuture = replyingKafkaTemplate.sendAndReceive(record);
        SendResult<String, String> sendResult = requestReplyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        System.out.println("send:"+sendResult.getRecordMetadata());

        ConsumerRecord<String, String> stringStringConsumerRecord = requestReplyFuture.get(10, TimeUnit.SECONDS);
        System.out.println("return value:"+stringStringConsumerRecord.value());
        return stringStringConsumerRecord.value();
    }
}
4⃣.消息消费
@Component
public class ConsumerListener {
    //KafkaProperties.DEVREQUEST
    @KafkaListener(topics = "shafish-dev-requests", groupId = KafkaProperties.consumerGroupId)
    @SendTo
    public String requestListener(ConsumerRecord<String, String> record) {
        Optional<String> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            System.out.println(message.get());
//            Message<String> build = MessageBuilder.withPayload(record.value().toUpperCase())
//                    //.setHeader(KafkaHeaders.TOPIC, replyTo)
//                    .setHeader(KafkaHeaders.KEY, 42)
//                    //.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
//                    .build();
            return "yes";
        }
        return null;
    }

}

五、消费接收🐟

Abstract

可以通过配置MessageListenerContainer监听器或者使用@KafkaListener注解的方式接收kafka消息。

1.message listener🐟

八种消息监听

当监听消息时,需要提供一个监听者去接收消息,提供了八种监听接口:

// 接收一条消息,自动提交消费记录
public interface MessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data);

}

// 接收一条消息,手动提交消费记录
public interface AcknowledgingMessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

// 接收一条消息,自动提交消费记录,并附带消费者相关信息
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

// 接收一条消息,手动提交消费记录,并附带消费者相关信息
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

public interface BatchMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

方法中的consumer不是线程安全的

2.message listener containers🐟

两种实现类

提供了两个listenerContainer实现类:KafkaMessageListenerContainerConcurrentMessageListenerContainer

KafkaMessageListenerContainer在单线程中接收订阅主题的所有消息;ConcurrentMessageListenerContainer提供一个或多个KafkaMessageListenerContainer实例去消费消息

KafkaMessageListenerContainer

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
            ContainerProperties containerProperties)
// 传入指定分区下标,明确该container使用哪个分区。默认使用负数,表示当前主题最后一个分区
public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)


创建日期: 2023年2月28日 00:30:17
最后更新: 2024年6月21日 22:29:28