Apache Kafka一直占据着消息队列技术方向重要地位。2025年3月18日,Kafka迎来了4.0版本的更新,这次更新带来了许多令人瞩目的新特性,对开发者和运维人员来说意义重大。下面咱们就详细聊聊这些新变化。

一、告别ZooKeeper,开启KRaft

在Kafka 4.0版本中,最大的亮点之一就是默认运行在KRaft模式下,这意味着它不再依赖Apache ZooKeeper。以前,ZooKeeper的存在虽然为Kafka提供了协调服务,但也增加了部署和管理的复杂性。现在,Kafka 4.0去掉了这一依赖,使得整个部署过程变得更加简单直接。运维人员不用再花费大量精力去维护ZooKeeper集群,降低了运营成本的同时,还提高了系统的可扩展性,管理任务也大大简化了。

二、新消费者组协议

Kafka 4.0引入了新的消费者组协议(KIP – 848),这个更新主要是为了优化重新平衡性能。在大规模部署场景下,消费者组的重新平衡操作经常会导致停机时间延长和延迟增加。而新协议将相关逻辑转移到了代理端,很好地解决了这些问题,提升了消费者组的可靠性和响应速度。简单来说,就是在处理大量消息时,系统能够更加稳定、高效地运行。

三、支持队列功能

Kafka 4.0新增的队列功能(KIP – 932),支持了传统队列语义。以前,Kafka在消息处理模式上有一定的局限性,现在通过这个功能,允许多个消费者协同处理同一个分区的消息。这就使得Kafka可以应用在更多场景中,尤其是那些需要点对点消息模式的场景,它变成了一个更通用的消息平台。

四、Java版本要求升级

随着Kafka 4.0的发布,对Java版本的要求也有所改变。Kafka客户端和Kafka Streams现在需要Java 11的支持,而Kafka代理、Connect和相关工具则需要Java 17。在项目升级到Kafka 4.0时,开发者需要注意确保Java环境符合要求,以免出现兼容性问题。

五、API更新

为了让平台更加简洁,同时鼓励开发者采用新功能,Kafka 4.0删除了至少12个月前被废弃的API。这一操作虽然可能会对部分依赖旧API的项目造成一定影响,但从长远来看,有助于Kafka生态系统的健康发展,让开发者能够更快地接触和使用到新的、更强大的功能。

六、代码示例

使用Kafka 4.0的KRaft模式创建主题

下面这段代码展示了如何在KRaft模式下创建Kafka主题:

import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import java.util.Collections; import java.util.Properties; public class KafkaTopicCreator { public static void main(String[] args) { // 配置Kafka连接属性 Properties props = new Properties(); // 设置Kafka服务器地址 props.put("bootstrap.servers", "localhost:9092"); // 设置消息确认机制 props.put("acks", "all"); // 创建AdminClient实例,用于管理Kafka集群 AdminClient adminClient = AdminClient.create(props); // 创建一个新的主题,指定主题名为my-topic,分区数为1,副本因子为1 NewTopic newTopic = new NewTopic("my-topic", 1, (short) 1); // 创建主题并获取创建结果 CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic)); try { // 输出主题创建结果 System.out.println("Topic created: " + result.all().get()); } catch (Exception e) { e.printStackTrace(); } } } 

使用新消费者组协议

接下来的代码示例展示了如何使用新消费者组协议:

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // 配置消费者属性 Properties props = new Properties(); // 设置Kafka服务器地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 设置消费者组ID props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); // 设置键的反序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 设置值的反序列化类 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 创建Kafka消费者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题my-topic consumer.subscribe(Collections.singleton("my-topic")); while (true) { // 拉取消息,设置拉取超时时间为100毫秒 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 输出消息内容 System.out.println(record.value()); } // 同步提交消费位移,确保消息被正确处理 consumer.commitSync(); } } } 

队列功能示例

下面的代码展示了如何在Kafka中实现类似队列的行为:

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaQueueExample { public static void main(String[] args) { // 配置消费者属性 Properties props = new Properties(); // 设置Kafka服务器地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 设置消费者组ID props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-queue-group"); // 设置键的反序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 设置值的反序列化类 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 创建Kafka消费者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题my-queue-topic consumer.subscribe(Collections.singleton("my-queue-topic")); while (true) { // 拉取消息,设置拉取超时时间为100毫秒 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 输出消息内容 System.out.println(record.value()); // 处理消息后,手动确认以避免重复消费 consumer.commitSync(Collections.singleton(record)); } } } } 

通过这些示例,希望能帮助大家更好地理解和使用Kafka 4.0的新特性。如果你在实际应用中遇到问题,欢迎在评论区留言讨论。