Kafka

简介

介绍

什么是消息:

统计用户在网站的操作,点击任意一个🔗都会被采集到

统计消息,临时存储,一个通信的管道

为什么叫kafka 喜欢的作者

消息队列的中间件:消息队列的组件

场景

异步处理。下单之后邮件通知,注册短信等一系列操作。如果并行会非常耗时。

异步处理:

将比较耗时的操作放在其它系统里面,通过消息队列将需要处理的信息进行存储

例如发邮件,短信验证等

系统解耦:

订单系统-》消息队列-》库存系统

原来一个微服务是通过接口来调用另一个微服务,耦合严重。

把消息放在一个管道(消息队列里面去,放与取即可

流量削峰:

双十一,高吞吐

日志处理:

大数据领域常见,直接把用户操作通过服务器存在文件

用户–》web服务器–》消息队列–》Flink程序–》计算结果

生产者消费者模型

交互模型:先类比

HTTP请求 响应模型

数据库请求响应模型JDBC 基于MySQL的通信协议

在消息队列中变为生产者消费模型:如下

生产者*n–>📥=====消息队列=====📤<–消费者

1.点对点模式:

一个发送只有一个消费者,消费了就删除

发送者与接收者之间没有依赖性,互不影响

接收者成功接收之后需要应答,以便消息的删除

2.发布订阅模式:

源源不断放在消息队列,每个消息可以有很多个消费者(订阅者

有时间依赖性

需要订阅,提前订阅,保持在线

Kafka

围绕 发布与订阅/存储/处理 来讲解

对比rabbitmq的优点:消息延迟毫秒级,单机吞吐量十万级,可用性非常高(分布式而不是主从,消息理论上不会丢失

哪些公司在用:腾讯、微软、华为,等等

安装、搭建集群、环境搭建、一键启动脚本等…略

  • 依赖zookeeper
  • 每个节点时唯一传递 broker.id
  • 配置数据存储目录

分区和副本是什么?

kafka有三层形式。Kafka有多个topic,每个主题有多个分区,每个分区又有多条消息

每个分区可以分不到不同的机器上:这样一来,从服务端来说,分区可以实现高伸缩性,以及负载均衡动态调节能力。单个分区的时候,和顺序队列一样。

副本机制:每个分区可以有多个副本。这多个副本中,只有一个是leader,而其他的都是follower副本。仅有leader副本可以对外提供服务。目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。

基础操作

topic:表需要表名,Kafka需要主题

#创建create 
bin/kafka-topic.sh --create --bootstrap-server nodel.itcast.cn:9002 --topic test

#查看当前
bin/kafka-topic.sh --list --bootstrap-server nodel.itcast.cn:9002

#生产/消费测试脚本来读取标准输入/消费
bin/kafka-producer.sh --broker-list nodel.itcast.cn:9002 --topic test

bin/kafka-consumer.sh --bootstrap-server nodel.itcast.cn:9002 --topic test --from beginning
#from deginning 从头拉取消息

开发时使用KafkaTool

  • 浏览节点,分区,副本
  • 创建/删除topic
  • 浏览zookeeper中的数据

基准测试

可以有测试工具测试生产者和消费者

JAVA-API程序

同步生产消息到Kafka

  • 导入pom列表
    • 代码库
    • 客户端
    • 工具类
    • slf4j
    • log
  • 配置文件log4j.properties 配置输出
  • 创建包和类KafkaProduserTest 创建消息并发送到集群之中
  • 创建用于连接Kafka的properties配置 指定服务器位置等
  • 构建生产者对象 <key,value>
  • 调用send发送消息
  • 调用future.get等待响应
  • 关闭生产者

消费者程序

offset(消息拉取到哪里了),key,value

  • Kafka的消费者properties配置:
    • 消费者组,将若干个消费者组织到一起,共同消费Kafka中topic的数据
    • 每一个消费者需要指定一个组名
    • 自动提交offset,提交时间间隔
  • 拉取key、value数据的deserializer
  • 创建消费者
  • 订阅topic
  • 循环,不断拉取数据

回调函数—异步方式生产消息

使用匿名内部类实现callback接口,该接口中表示Kafka服务器响应给客户端,会自动调用onCompletion方法

metadata:元数据

exception:如果为空,发送成功。如果不为空就失败,有失败信息。

Kafka结构

——Kafka集群——-============

broker-0 进程 || Zoo- ||

broker-1 进程 || Keeper || 保存一些元数据隐私政策

broker-2 进程 || 集群 || 使用Kafkatool能够看到

————————===========

  • 一个集群由多个broker组成 实现负载均衡和容错
  • broker是无状态的,通过zookeeper来维护
  • 每个broker都可以处理tb消息而不影响性能

zookeeper:

用于管理和协调broker,并且存储了Kafka的元数据(多少个topic、partition、consumer

ZK用来通知P和C有新的broker加入,或者通知又出现故障的B

什么是partition 区

B–>(TOPIC)–>P–>CG–>C1 C2 C3

副本可以确保某个服务器故障时,数据依然可用

Kafka数据不能做修改 旧———–>新

topic是逻辑结构,是写在每一个分区里面的

offset:

每个partition都有很多数据,关于拉取数据,消费者/组应该记住拉取到哪里了

  • offset记录着下一条要发送给consumer的消息序列号
  • 默认存储在zk 自动提交offset:enable.auto.commit
  • offset只在分区中才有意义,分区间没有意义
  • 每一个分区中消息是有顺序的,有一个id,这个就是offset

消费者组CG:

  • 每一个消费组有一个唯一的名字。配置groupid一样的消费组属于同一个组
  • 一个消费组中可以包含多个消费者,共同来消费topic中的数据
  • 一个topic中如果只有一个分区,那么只能被某个组中的一个消费者消费。
  • 有多少个分区,就可以被同一个人租内的多少个消费者消费 1v1

幂等性与事务

幂等性原理

若Kafka没有幂等性(ack响应失败,但其实已经保存了,出现retry就可能在partition中存入重复的消息

幂等性需要配置

props.put("enable.idempotence",true)

引入了produser id PID 和sequence number的概念

  • 每个produser在初始化的时候会分配一个唯一的PID,对用户是透明的
  • SN:从0开始递增的,发送到指定主题分区的消息对应一个SN。若seqn=0,<=pid对应的seq,说明以前已经存过了

避免生产重复的消息去保存

Kafka高级

生产者分区写入策略

轮询分区策略(默认、负载均衡,key为空,乱序)

随机分区策略(不用,乱序)

按key分区策略(注意出现数据倾斜,局部有序)

自定义分区策略(创建自定义分区器

乱序问题

按分区来看,若一段连续的数据,使用轮询算法,为了保证负载均衡,数据会被打散。所以,Kafka的有序,是每个分区的有序。若只用一个区,就失去了分布式的意义。

消费者组rebalance机制

再均衡机制,某些情况下,消费者消费的分区会产生变化(比如某个P崩溃了,一个消费者当前没有分区要削峰)导致消费者分配不均匀,去均衡消费者消费的分区

触发时机

  • 消费者数量发生变化
  • topic发生变化
  • partition数量

有不良影响,rebalance时,所有消费者都会停止,直到每个消费者都已经被分配所需要消费的分区为止

消费者分区分配机制

-Range范围分配策略(默认,是一个范围,保证每一个消费者消费的是均衡的,*Range的范围分配策略是针对每个Topic,不是全局,某个Topic的哪几个分区

n=分区数量/消费者数量

m=分区数量%消费者数量

前m个消费者,每一个消费n+1个

剩余消费者消费n

-轮询RoundRobin策略

所有topic的分区先排序,hashcode,然后来轮询

-粘性Stricky分配

1.尽可能均匀

2.发生rebalance,分区的分配经可能与上一次相同

3.没发生rebalance时,和轮询类似

副本的ACK机制

producer不断写数据给Kafka,数据是否写入成功,就会有一个对应ACKs参数配置。

1、ack为0 不管返回。性能最好。数据可能丢失

2、ack为1 等待leader确认接收(返回成功)之后,才会发送下一条数据,性能中等。一个分区只能有一个leader,保证拉取的数据一致。

3、ack=-1 or all 等到所有副本同步leader的数据成功才会发送下一条。性能最慢。能够保证数据不丢失。

*根据业务情况来选择ACKs。一部分数据丢失不会有影响,可以选择0/1.若数据一定不能丢失就配置-1/all

*分区中有 leader和follower 根据分区来。???为了确保消费者消费的数据是一致的。只能从leader去读取数据。follower只能备份不能操作数据。

*👆意义:follower可能会被选举为leader!!!针对分区。一个分区只会有一个leader。

high level API & low level API

高级API

不需要管理offset,也不需要管理分区,副本。由Kafka统一管理。rebalance。

无需关注底层逻辑,开发简单。

自动根据ZK中保存的offset来读取数据。

不能自己控制,无法做到细粒度的控制

低级API

可以自己存储offset。自己控制offset。

指定消费者拉取某个分区的消息👇👇👇可以做到细粒度的控制

手动消费分区数据

  • 某些时候需要指定分区。将指定分区保存到外部存储
  • 某个程序是高可用的。出现故障重启。从指定分区重新消费数据。

不使用subscribe订阅而是assign方法指定消费消息。

注意:Kafka的协调不在起作用。失效。没有rebalance。

监控工具 Kafka-eagle

/////略

Kafka原理

分区的leader和follower

leader和follower

针对分区而不是broker

  • leader负责处理读写操作,而follower只负责副本数据的同步
  • 如果leader出现故障,其它follower会被重新选举为leader
  • follower会像consumer一样拉取leader的数据

*ZK的follower可以读

*一个topic有多个分区,同样可以实现负载均衡

ar、isr、osr

实际环境中leader会出现故障,所以会选举出来新的leader。投票(redis。Kafka希望我们快速选举出来leader,所以把follower可以按照不同状态分为三类

*ZK选举要投票,创建临时节点(慢,废弃

  • AR已分配的副本 分区所有副本称为【AR】
  • ISR所有与leader副本保持一定程度同步的副本
  • OSR 表示不同步的

AR=ISR+OSR=leader+follower+follower+… 正常情况下,OSR为空

controller介绍与leader选举

controller

  • Kafka启动时会在所有broker中选择一个controller
  • controller是根据broker来的
  • 其它的broker会注册该节点的监视器
  • controller也是高可用的,某个controller崩溃,其他的broker会注册为controller

controller选举leader

通过RPC,过程调用,通知broker

controller读取当前分区的ISR 只要有一个副本replica幸存,就会被选为leader,否则leader=-1 快速选举

leader的负载均衡

如果某个broker crash之后会导致leader分别配不均:一个broker上存在一个topic下不同partition的leader

有指令代码

读写流程

找到leader在哪 从zookeeper/broker/topics/主题名/分区名/state

leader将消息写到本地log,顺序写

拉模式(√,推模式(rabbitMQ

Kafka的物理存储

以partition为单位

topic(partition0(segment(log index timeindex)) partition1 partition2)

消息不丢失(面试常问,待总结

broker

生产者

消费者

。。。。。。

数据积压

外部业务外部IO

内部异常:宁可积压也不愿意数据丢了,没有消费成功最好不要提交offset

所以企业要有监控系统

数据清理

定期清理(因为是临时的 日志删除/压缩

配置log.cleanup,×××

可以设置阈值等

时间、空间概念上的阈值


评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

Translate »