要订阅kafka的,可以通过注解@kafkalistener来监听,kafka消费完一条消息会提交offset。
Kafka的offset机制是基于topic的,通过将一个topic的数据分成多个offset集群来管理多个话题的数据副本。具体来说,每个Kafka实例都会维护一个offset集群,其中包含多个offset记录,每个offset记录包含一个指向前一个数据记录的指针,这个指针指向的数据记录就是该offset的下一个副本。
当生产者向Kafka发送数据时,Kafka会将该数据发送到一个或多个offset集群中,根据每个offset集群中的offset记录,Kafka会将数据重新分发到不同的消费者,以确保每个消费者都可以获得正确的数据副本。
具体来说,当生产者向Kafka发送数据时,Kafka会将数据发送到一个或多个offset集群中,然后根据每个offset集群中的offset记录,Kafka会将数据重新分发到不同的消费者。例如,假设有一个topic包含3个数据记录,生产者向Kafka发送了第一个数据记录,Kafka会将第一个数据记录发送到一个offset集群中,并将该offset集群中的所有数据记录都复制到该消费者,然后消费者再从该offset集群中获取到第二个数据记录和第三个数据记录。
offset机制的维护是通过将数据分成多个offset集群来实现的,每个offset集群包含多个offset记录,每个offset记录包含一个指向前一个数据记录的指针,这个指针指向的数据记录就是该offset的下一个副本,从而确保每个消费者都可以获得正确的数据副本。
如果你在0.9版本以上,可以用最新的Consumer client 客户端,有consumer.seekToEnd() / consumer.position() 可以用于得到当前最新的offset: ${log.dirs}/replication-offset-checkpoint
kafka消费有三种模式,如下所述:
at most onece模式
基本思想是保证每一条消息commit成功之后,再进行消费处理;
设置自动提交为false,接收到消息之后,首先commit,然后再进行消费
at least onece模式
基本思想是保证每一条消息处理成功之后,再进行commit;
设置自动提交为false;消息处理成功之后,手动进行commit;
采用这种模式时,最好保证消费操作的“幂等性”,防止重复消费;
exactly onece模式
核心思想是将offset作为唯一id与消息同时处理,并且保证处理的原子性;
设置自动提交为false;消息处理成功之后再提交;
比如对于关系型数据库来说,可以将id设置为消息处理结果的唯一索引,再次处理时,如果发现该索引已经存在,那么就不处理;
无法设置延时消费,只能立即消费立即支付
生产者生成主键,利用主键不能重复的特性
生产者在发kafka时往redis写标志,消费消息时判断标志是否存在,存在就删除标志,处理业务。
Kafka rebalance会影响到consumer consumer group,它会重新分配consumer的partition分配情况,使得同一个consumer group内的consumer持有partition不能发生重复和遗漏的情况。
在rebalance过程中,Kafka会暂停所有partitions的分配,停止消费。但是Kafka会自动处理rebalance,尽快地为消费者重新分配分区,使得消费者能够恢复消费活动,并确保消费数据的一致性。需要注意的是,由于rebalance会暂停消费,所以处理rebalance的效率需要尽量高,以尽快恢复消费服务,避免影响业务应用。
可以刷新系统或者退出重新登录打开的
Kafka消费慢的解决方案可以从以下几个方面入手
1. 增加消费者线程数
2. 优化消费者组的设置,例如增加分区数
3. 调整消息发布的速率
4. 改进Kafka集群的部署方式,例如增加副本数
5. 优化Kafka集群的配置,例如修改消息大小和消息压缩等。
设置消费者properties的两个参数
consumer.group.id
properties.setProperty("auto.offset.reset", "earliest”) // latest
注意:只要不更改group.id,每次重新消费kafka,都是从上次消费结束的地方继续开始,不论"auto.offset.reset”属性设置的是什么