1.什么是Kafka
2.什么是消息中间件
3.消息中间件的应用场景
4.Kafka优势和劣势
5.Kafka应用场景
6.kafka名词解释
代理(Broker)
主题(Topic)
分区(partition)
生产者(Producer)
消费群组(Consumer Group)
消费者(Consumer)
7.代理(Broker)
8.主题(Topic)
9.分区(Partition)
10.生产者(Producer)
10.1.生产者如何保证消息不丢失(可靠性)
10.2.生产者原理
11.消费者(Consumer)
11.1.
11.2.rebalance
11.3.rebalance触发时机
11.4.rebalance协议
11.5.rebalance流程
11.6.消费者核心配置
11.7.提交和偏移量
12.参考资料
Kafka是消息中间件的一种,它是一个分布式、可分区、可复制的高性能消息系统,它提供普通消息系统的功能队列、发布订阅
Kafka使用队列模型时,它将处理工作平均分配给消费组(Consumer Group)中的消费者成员
Kafka使用发布订阅模式时,它可以将消息广播给多个消费组,也允许消息被多个消费者订阅
消息中间件是一种异步通信协议,也可以说试一种通信软件,它可以为分布式应用、系统或者实体提供跨平台、异步、松耦合、
高可靠性并且安全的通信功能
1.应用程序解耦
1.比如用户注册送红包发送注册短信,传统做法是先用户注册然后送红包最后发送注册短信,
2.使用消息队列用户注册成功后发送1个消息立即返回, 同时发送用户注册和送注册红包
3.传统如果用户注册送红包发送注册短信分别需要用时50ms,
就需要150ms=(50ms 用户注册 + 50ms 送红包 + 50ms发送注册短信)
4.消息中间件用户注册成功后发送1个消息立即返回,同时发送用户注册和送注册红包,
就需要100ms=(50ms 用户注册 + 50ms(送红包 + 发送注册短信))
5.如果不需要发送短信,停用短信消费者即可,需要时启用短信消费者就好
6.如果不需要送红包,停用送红包消费者即可,需要的启用送红包消费者就好
7.从而提高吞吐量和应用解耦
2.异步处理
发送短信、发送邮件、注册送红包等
3.流量削峰
主要用于高并发业务场景,缓解短时间内高流量压垮应用比如秒杀和活动等,将用户请求放入消息队列中,如果消息队列成功最大数量,
直接抛弃用户请求或跳转到错误页面
优势:
1.可靠性: Kafka是分布式负载均衡、分区、副本复制实现故障转移来保存可靠性.Kafka分布式的,一个数据多个副本,
少数集群宕机,不会丢失数据,也不会导致不可用
2.可扩展性: 可以通过简单的增加服务器横向扩展,无需停机
3.持久性: Kafka消息持久化(零拷贝zero-copy)到磁盘上,副本机制实现数据沉余,保证数据不会丢失
4.高性能: Kafka对于发布订阅订阅都具有很高吞吐量
1.单机写入TPS约在百万条/秒,即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
2.零拷贝技术(sendfile zero copy)
3.分布式存储
4.点对点消息传递: 分区(Partition)可以在多个节点上(例如: 1个节点1个主题3分区吞吐量是300MB, 分成3个节点分别
存放个主题3分区即300MB*3=900MB的吞吐量)
5.磁盘顺序读顺序写
6.批量读批量写
7.kafka不保存消息状态,消费者通过记录主题分区中的offset表示之前的消息都被消费,offset之后的都是未消费,
offset可以任意移动
缺点:
1.消息乱序: Kafka只能保证单个分区(Partition)内部的消息有序,如果单个主题(Topic)有多个分区,分区之间无法保证消息有序
2.重复消息: Kafka保证每条消息至少送达一次,虽然几率很小,但一条消息可能被送达多次.
3.复杂性: Kafka需要Zookeeper的支持,主题(Topic)一般需要人工创建,部署和维护比一般MQ成本更高
1. 日志收集处理
2. 消息系统
3. 网站行为日志跟踪
4. 流式处理: 对输入数据流进行转换、聚合或汇总然后输出到某个地方,当然也可以是另一个流
5. 事件溯源
代理(Broker): 是一个独立的Kafka服务器, 一个集群由多个Broker组成
Broker接收来自生产者的消息保存到指定主题(Topic)中的分区(Partition)上并设置消息偏移(Offset)然后保存在磁盘上.
Broker为消费者提供读取服务,对读取分区的请求作出响应,返回已提交到硬盘中的消息.
单个Broker可以轻松处理上千个分区和每秒百万级消息量
单个Broker在低延迟下也能处理支持每秒100K(上千条消息)条消息的传输
主题(Topic): 消息的类别称为Topic
一个Broker包含一个或者多个主题(Topic),Kafka根据topic对消息进行归类,
发布到Kafka集群的每条消息都需要指定一个topic
分区(partition):消息存储的队列
每个主题(Topic)包含一个或者多个分区(Partition),每个partition内部是有序的
生产者(Producer): 发送消息的客户端
负责发送消息到指定的代理(Broker)下的主题(Topic)中的分区(Partition)
消费群组(Consumer Group): 消息消费队列称为消费组
一个消费组(Consumer Group)可以包含一个或者多个消费者(Consumer)
注意消费组订阅某个主题,如果该主题的中的每个分区只能有一个消费者
比如消费者G订阅了主题T并且主题只有2个分区分别是A和B,所以主题T最多只能有2个消费者
消费者(Consumer): 读取消息的客户端
消费者(Consumer)负责从Broker中拉取消息并处理,设置已处理的消息偏移量
1.副本节点必须能与zookeeper保持会话(心跳机制)
2.副本能复制leader上的所有写操作,并且不能落后太多.如果滞后太多(数量滞后和时间滞后两个维度,replica.lag.time.max.ms和replica.lag.max.message可配置)leader会把该replica从ISR中移除
rerplica.lag.time.max.ms=10000
如果leader发现flower超过10秒没有向它发起fech请求,那么leader考虑这个flower是不是程序出了点问题或者资源紧张调度不过来,
它太慢了,不希望它拖慢后面的进度,就把它从ISR中移除.
rerplica.lag.max.messages=4000
相差4000条就移除,flower慢的时候,保证高可用性,同时满足这两个条件后又加入ISR中,在可用性与一致性做了动态平衡
实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模
生产者如何保证消息不丢失:
1.最多一次(At most once): 消息可能会丢失但绝不重传
啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker
2.至少一次(At least once): 消息可以重传但绝不丢失
Master-Slave模型,只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能
3.精准一次(Exactly once): 每一条消息只被传递一次,这正是人们想要的
只要Master确认收到消息就算投递成功
生产者保证消息不丢失(可靠性)配置:
1.acks=0 如果设置为0,则 producer 不会等待任何来自服务器的响应.该消息会被立刻添加到 socket buffer 中并认为已经发送完成.
在这种情况下,服务器是否收到请求是没法保证的.也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,
消息也就丢失了,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量.
2.acks=1 如果设置为1,leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功.在这种情况下,
如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失.
如果acks=1,只要集群的首领(leader)节点收到消息,生产者就会收到一个来自服务器的成功响应.
如果消息无法到达首领(leader)节点(比如首领节点崩愤,新的首领还没有被选举出来),生产者会收到一个错误响应,
为了避免数据丢失,生产者会重发消息.不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失.
这个时候的吞吐量取决于使用的是同步发送还是异步发送.
如果让发送客户端等待服务器的响应,显然会增加延迟.
如果客户端使用回调,延迟问题就可以得到缓解.
不过吞吐量还是会受发送中消息数量的限制(比如生产者在收到服务器响应之前可以发送多少个消息).
3.acks=all 如果设置为all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成.只
要至少有一个同步副本存在,记录就不会丢失.这种方式是对请求传递的最有效保证.acks=-1与acks=all是等效的.
生产者消息幂等保证:
1.为什么要引入幂等
生产者进行retry会产生重试时,会重复产生消息.有了幂等性之后,在进行retry重试时,只会生成一个消息.
2.幂等性实现:
为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number
PID: 每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的.
Sequence Numbler: 对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number.
Broker端在缓存中保存了这seq number,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃.
这样就可以实现了消息重复提交了.但是,只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义.
不能保证同一个Producer一个topic不同的partion幂等
3.在kafka 0.11.0.0之前,是无法保证Exactly once的.但从0.11.0.0开始 producer 引入了幂等性的概念,
保证消息只会被传递一次
生产者选择主题分区:
1.如果没有Key值则进行轮询发送.
2.如果有Key值,对Key值进行Hash计算得到hashCode,然后对分区数量取余.保证了同一个Key值的会被路由到同一个分区,
如果想队列的强顺序一致性,可以让所有的消息都设置为同一个Key.
同步生产者原理:
1.配置生产者
2.创建生产者实例 KafkaProducer
3.检测元数据是否可用(如果不可用或者没有就拉取元数据)
4.选择主题分区
5.创建客户端请求(编码消息成kafka二进制协议)
5.发送消息请求到kafka broker
6.接收kafka broker响应(如果没有直接返回)
7.如果消息发送出错重试(重新执行3-7步)
异步生产者原理:
1.配置生产者
2.创建生产者实例 KafkaProducer
3.检测元数据是否可用(如果不可用或者没有就拉取元数据)
4.选择主题分区
5.把消息添加到记录收集器收集,记录收集器收集以topic-partition区分
6.如果记录收集器有消息批次已满,就通知发送者线程读取最老的一批消息
7.发送线程通过从记录收集器中找出已经准备好的服务端节点
8.如果节点已经准备好,节点如果可以连接,同时初始化该连接,否则证明该节点(node)暂时不能发送数据,暂时移除该节点(node)
9.发送者线程通过从记录收集器获取按照节点整理好的每个分区的批记录
10.发送线程得到每个节点的批记录后,设置节点正在发送消息,为每个节点创建客户端请求,设置客户端请求回调,
并将请求发送到kafka broker.
11.客户端请求回调收到响应,设置节点已经发送消息请求完成,如果成功,设置了回调就调用回到,没有就直接返回
12.客户端请求回调收到响应,消息发送失败,就检测是否需要重试.
为了保证服务端的处理性能,客户端网络连接对象有一个限制条件:针对同一个服务端,如果上一个客户端请求还没有发送完成,
则不允许发送新的客户端请求.客户端网络连接对象用inFlightRequests变量在客户端缓存了还没有收到响应的客户端请求,
InFlightRequests类包含一个节点到双端队列的映射结构.在准备发送客户端请求时,请求将添加到指定节点对应的队列中;
在收到 响应后,才会将请求从队列中移除
request.timeout.ms: 该配置控制客户端等待请求响应的最长时间.如果在超时之前未收到响应,则客户端将在必要时重新发送请求,
如果重试仍然失败,则请求失败.
消费者2中模式
发布-订阅模式: 同一条消息会被多个消费组消费,每个消费组只有一个消费者消费,实现广播
队列模式: 只有一个消费组、多个消费者,一条消息只被消费组的一个消费者消费,实现单播
消费组5个状态:
1.Empty:消费组下没有任何活跃的消费者,可能为消费组刚创建的时刻或工作一段时间后所有消费者离开.
2.PreparingRebalance: 表明group正在准备进行group rebalance.此时group收到部分成员发送的JoinGroup请求,同时等待其他成员发送JoinGroup请求,知道所耦成员都成功加入组或超时.
3.AwaitingSyc: 表明所有成员都已经加入组并等待leader consumer发送分区分配方案.
4.Stable: 表明group开始正常消费,可以响应客户端发送的任何请求
5.Dead: 表明group已经彻底废弃,group内没有任何active成员且group的所有元数据都已被删除
什么是消费组再均衡(rebalance): 所有的消费者都要执行重新分配分区的动作
在再均衡期间,消费者无法读取消息,会造成整个群组一小段时间的不可用.
rebalance触发时机:
消费者通过被指派为群组协调器的broker(不同的群组有不同的协调器)发送心跳来维持他们和群组的从属关系以及他们对分区的所有权.
只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明还在读取分区的消息.消费者会在轮询尝试获取新消息或提交偏移量时发送心跳,
如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为消费者宕机,会触发一次再均衡.
另一方面,我们在主动清理消费者时,消费者也会通知协调器它即将离开群组,也会触发一次再均衡.
可以分为以下三种情况:
1.组成员发生变更:新consumer加入,原有consumer主动离开或崩溃
2.组订阅topic数发生变更: 基于正则订阅,新主题创建,命中正则规则
3.组订阅topic的分区数发生变更: 如通过脚本增加订阅topic的分区数
rebalance分区分配策略:
1.range策略: 基于范围的思想,将单个topic的所有分区按照顺序排列,然后把这些分区划分成固定大小的分区段并依次分配给每个消费者.
2.round-robin策略: 把所有topic的所有分区顺序分开,轮训式地分配给每个消费者
3.sticky策略: 采用“有黏性”策略对所有消费者实例进行分配,以规避在极端情况下的数据倾斜并在两次rebalance间最大限度维持之前的分配方案.
rebalance再均衡后如何防止延迟提交的offset:
每次触发再均衡后,有一个标志再均衡代数的变量,会在每次触发再均衡后+1.主要用于保护consumer group,尤其是防止无效offset的提交.
比如上一代的consumer成员由于某些原因延迟提交了offset.但再均衡后该group产生了新一届的group成员,而这次延迟的offset
提交携带的是旧的generation信息,则这次提交会被拒绝
rebalance协议
rebalance本质是一组协议,由group和coordinator(协调者)共同完成,其中coordinator是每个组的一个协调者,负责对组的状态进行管理,
主要职责是再均衡时促成组内所有成员达成新的分区分配方案.再均衡协议包含以下协议请求:
1.JoinGroup请求: consumer请求加入组
2.SyncGroup请求: group leader把分配方案同步更新到组内所有成员
3.Heartbear请求: consumer定期向coordinator汇报心跳表名自己依然存活
4.LeaveGroup请求: consumer主动通知coordinator该consumer即将离组
5.DescribeGroup请求: 查看组的所有信息,包括成员信息、协议信息、分配方案以及订阅信息等.主要供管理员使用
在rebalance过程中,coordinator主要处理consumer发来的JoinGroup和SyncGroup请求.
当consumer主动离组时会发送LeaveGroup给coordinator。
在成功rebalance后,组内所有consumer定期向coordinator发送Heartbeat请求.
而consumer则根据Heartbeat请求的响应中是否包含REBALANCE_IN_PROGRESS来判断当前group是否开启新一轮rebalance
rebalance流程:
1.找到coordinator,确定协调者的算法如下:
1.计算 Math.abs(groupID.hashCode) % offsets.topic.num.partitions参数值(默认是 50) , 假设是 10.
2.寻找一consumer_offsets分区 10的 leader副本所在的 broker,该 broker即为这个group的 coordinator.
2.收集consumer,选举Leader并制定分配方案
组内所有consumer向coordinator发送JoinGroup请求,coordinator从中选择一个(通常是第一个)作为leader.并把所有
成员信息以及他们的订阅信息发给leader,由leader负责为整个group的所有成员制定分配方案
3.同步更新分配方案:
所有消费组发送SyncGroup请求给coordinator,但只有leader会将制定的分配方案封装进SyncGroup请求发送给coordinator.
coordinator从leader请求中把属于每个consumer的方案单独抽取出来,作为SyncGroup请求的response返还给各自的consumer.
消费者核心配置:
fetch.min.bytes
指定消费者从服务器获取记录的最小字节数.broker在收到消费者的数据请求时,如果可用的数据量小于配置指定的大小,
会等有足够的数据再一起返回给消费者,以此降低消费者和broker的工作负载.
fetch.max.wait.ms
指定broker在没有收到足够数据时的最大等待时间,默认500ms,如果没有足够的数据流入broker,即使消费者尝试获取数据,
broker也不会立即返回,而会等待离上次拉取数据时间间隔fetch.max.wait.ms才会返回给客户端.这个配置设置过大,
会导致数据消费延迟,但可以降低消费者和broker的工作负载
max.partition.fetch.bytes
指定服务器从每个分区里返回给消费者的最大字节数.默认为1MB.这个配置值必须比broker能够接受的最大消息的字节数
(max.message.size配置)大,否则消费者可能无法读取过大的消息,导致消费者一直刮起重试.另外还需要考虑消费者处理的时间,
如果单词poll数据太多,消费者处理可能无法及时进行下一个轮询来避免会话过期.
session.timeout.ms
该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认为3s,如果消费者没有在指定时间内发送心跳给群组协调器,
会被认为死亡,群组协调器会触发再均衡,把它的分区分配给其他消费者.阈值关联的另一个配置是heartbeat.interval.ms,
用来指定poll()方法向协调器发送心跳的频率.因此两个属性一把你需要同步修改,如session.timeout.ms是3s,
则heartbeat.interval.ms应该是1s.将session.timeout.ms设置更小一些,可以更快地监测和恢复崩溃节点.但可能会导致非预期的再均衡.
auto.offset.reset
指定消费者在读取一个没有偏移量的分区或者偏移量无效(银消费者长时间失效,包含偏移量的记录已经过期或被删除)的情况下的处理动作,有两个值:
latest: 默认值,从最新记录开始读取
earliest: 从起始位置读取分区记录
enable.auto.commit
指定消费者是否自动提交偏移量,默认为true.为了尽量避免出现重复数据和数据丢失,可以设为false,有自己控制何时提交偏移量,
如果设为true,则可以通过配置auto.commit.intervall.ms来控制提交的频率
partition.assignment.strategy
分区分配策略,决定哪些分区由哪些消费者消费,有两种默认策略:
Range:把主题的若干个连续分区分配给消费者,如有分区p1,p2,p3分配给消费者c1,c2,则分配结果可能为c1->p1,c1->p2,c2->p3
RoundRobin:把主题逐个分配给消费者.如对于上例,分配结果为c1->p1,c2->p2,c1->p3.RoundRobin策略保证所有消费这分配相差0或1个数量的分区.
sticky:采用“有黏性”的策略对所有的consumer实例进行分配,可以规避极端情况下的数据倾斜并且在两次rebalance间
最大限度地维持原有的分配方案,相对上面两种方案,有效避免了无视历史分配方案的缺陷.
提交和偏移量:
消费者需要更新自己在分区消费的记录偏移量,这个操作叫做提交.通常,偏移量是下一条带消费的消息的位置.
消费者提交的偏移量作用在于当消费者发生崩溃或有新消费者加入群组引发分区再均衡时,当分区被分配到新的消费者时,
新的消费者可以根据分区记录的偏移量来继续消费消息.
这里有两种异常情况:
1.如果提交的偏移量小于客户端处理的最后一个消息的偏移量,则两个偏移量之间的消息会被重复处理。
2.如果提交的偏移量大于客户端处理的最后一个消息的偏移量,则两个偏移量之间的消息会被丢失。
提交策略
kafka提供了多种策略来提交偏移量:
1.自动提交
2.手动提交
1.自动提交
自动提交通过两个配置指定:
1.enable.auto.commit: 设为true时,消费者会在经过配置间隔后把从poll()方法收到的最大偏移量提交上去.
2.auto.commit.interval.ms: 控制消费者提交偏移量的间隔时间,默认为5s.
自动提交虽然便利,但存在风险:
如果在提交后,在配置间隔时间前,如提交后第3秒分区发生再均衡,则意味这3s内处理的消息会被重复处理.虽然可以通过调整更短
的提交间隔时间来减少这个风险,但仍有可能发生.虽然提交了最新的偏移量,但这不意味最后一批拉取的消息已被正常消费,如果在
消费过程消费者宕机,会导致部分消息丢失.
手动提交
可以通过设置auto.commit.offset=false.在每轮消费完调用poll()获取的消息后,手动调用commitSync()来提交最新偏移量.
如果在这个过程中,分区发生再均衡,也会有消息被重复消费的可能.手动提交后,会堵塞一直重试,知道提交成功
同步和异步组合提交
可以结合同步和异步提交,在正常轮询消费过程中采用异步提交,当出现异常或消费被中断时,再用同步提交来兜底
分区分配给消费者:
1.消费者(线程)数量多于分区数量,有部分消费者无法消费该主题下任何消息
2.消费者数量少于分区数量,有些线程会消费多个分区的数据
3.消费者数量等于分区数量,刚好一个线程消费一个分区的数据
通常根据时间来决定数据可以被保留多久
log.retention.ms
log.retention.minutes
log.retention.hours
通过保留的消息字节数来判断消息是否过期
log.retention.bytes
参数来限制单个消息的大小
message.max.bytes
消费者客户端设置的 message.max.bytes 必须与服务 器端设置的消息大小进行协调.
如果这个值比 message.max.bytes 小,那 么消费者就无法读 取比较大的消息,导致出现消费者被阻塞的情况.
在为集群里的 broker 配置 replica.fetch.max.bytes 参数时,也遵循同样的原则