一、Kakfa的基础知识
1.1 什么是 Kafka
Kafka 是一个 分布式的消息队列系统,采用 发布/订阅 的工作模式,常用于 大数据实时处理 场景。
Kafka 就像一个“数据快递员”,负责把一份份数据包,从一个程序“送”到另一个程序。 它不分析数据、不存档数据,只负责“中转”——安全、快速、准时地送达。
分布式:可以部署在多台机器上一起工作,像一支“快递团队”,效率更高、抗压更强。
发布/订阅模式:就像一个人发消息,很多人都能同时收到(比如微信公众号的推送)。
1.2 Kafka 消息传递机制
Kafka 使用 “发布 / 订阅(Publish / Subscribe)”模式 实现高效的消息传递。其核心组成包括:
Topic(主题):消息的分类标签,生产者将消息发送到 Topic,消费者从 Topic 中订阅消息;
Producer(生产者):负责将消息发布到指定 Topic;
Consumer(消费者):通过订阅 Topic 来接收消息;
Broker(代理服务器):Kafka 服务器,负责存储消息和转发消息。

✅ 特点:
一条消息可以被多个订阅者同时接收
发布者不需要知道订阅者是谁
非常适合“广播场景”,比如:系统日志、活动消息推送等
1.3 kafka架构与工作原理

Kafka 是一个分布式的消息传递系统,可以理解为一个“数据快递中心”,负责把数据从“发送方程序”送到“接收方程序”。
Kafka 架构图(课堂讲解比喻)
| Kafka 概念 | 比喻 | 说明 |
|---|---|---|
| Producer(生产者) | 寄件人 | 负责“把数据发出去” |
| Broker(代理服务器) | 快递仓库 | Broker 存放消息,就像仓库存放快递 |
| Topic(主题) | 货架 | 同类商品放同一货架,类似同类消息同一个 Topic |
| Partition(分区) | 货架上的格子 | 同一个主题被拆成多个格子(分区)提高并行能力 |
| Consumer(消费者) | 收件人 | 最后把消息取走 |
| Offset(偏移量) | 快递单号 | 表示“消费到第几条消息” |
| ZooKeeper(Kafka 2.4 仍使用) | 调度管理员 | 负责集群管理、选主、记录元数据 |
Kafka的相关概念:
Producer(生产者)
定义:Kafka 中负责创建并发送消息到 Topic 的客户端程序。
作用:将数据(如日志、订单、传感器信息)发送到 Kafka 指定的 Topic 。
比喻:寄快递的人,决定把什么包裹寄到哪个仓库(Topic)。
Consumer(消费者)
定义:负责从 Kafka 中读取消息的程序。
作用:从一个或多个 Topic 中订阅、接收并处理数据
比喻:收快递的人,从指定货架(Topic)取自己想要的数据包。
Consumer Group(消费者组)
定义:多个消费者组成一个小组,共同处理一个 Topic 的数据。
机制:Kafka 会把 Topic 的分区(Partition)分配给组内不同的消费者。
每个分区只能被组内一个消费者读取;
不同组之间互不影响。
比喻:一个快递公司(组)里的多个快递员,每个人负责不同片区的快递。
Broker(代理)
定义:Kafka 集群中的一个服务器节点。
作用:存储消息、接收 Producer 的数据请求、响应 Consumer 的读取请求。
示例:master、slave1、slave2 三台机器就组成一个 Kafka 集群。
比喻:每个 Broker 就是一座快递仓库,存放和派发部分包裹。
Topic(主题)
定义:Kafka 中消息的分类通道,类似于“文件夹”。
作用:不同类型的数据放在不同的 Topic 中,方便管理。
举例:在一个电商系统中,可以有以下几个 Topic:
“订单Topic”放订单数据;
“库存Topic”放库存数据;
“日志Topic”放系统日志。
Partition(分区)
定义:Topic 的物理划分单元,每个分区存放一部分消息。
作用:支持并行处理,提高系统吞吐量。
比喻:货架上的格子,不同格子装不同的快递包裹。
Replication(副本)
定义:每个分区可以有多个副本,分布在不同的 Broker 上。
作用:保证数据高可用,防止某个 Broker 宕机导致数据丢失。
结构:1 个 Leader + 若干个 Follower。
比喻:同一份快递的备份,放在不同仓库里,主仓坏了备用仓顶上。
Leader 和 Follower
Leader:分区的主副本,也是唯一对外提供读写服务的角色。。
Follower:备份副本,从 Leader 同步数据;当 Leader 出问题时自动接管。
作用:Leader 写读、Follower 复制。主挂备上,数据不丢。
比喻:Leader 是“主仓管”,Follower 是“备用仓管”。
Offset(偏移量)
定义:每条消息在分区中的唯一编号。
作用:记录消费者读取到哪一条消息,用来追踪进度。
比喻:快递单号 / Excel 行号,帮助“收件人”记住上次看到哪里。
ZooKeeper
定义:Kafka 集群的协调和管理系统。
作用:
管理 Broker 状态:记录 Broker 的上线、下线,并通知 Kafka;
维护元数据(Topic / Partition 信息):保存 Topic 的结构、Partition 的分布、ISR 列表(同步副本列表)等集群元数据;
负责选举:管理集群 Leader 选举。
比喻:快递公司的“调度中心”,不搬快递,但决定谁负责、谁接班。
1.2.1 Kafka 在电商行业的应用
🎯 案例:电商系统实时订单分析
案例背景
一家电商平台希望实时统计每分钟的订单数量,以便在销量暴涨时及时扩容服务器。 系统由三个部分组成:
| 模块 | 角色 | 说明 |
|---|---|---|
| 订单系统 | Producer(生产者) | 当用户下单时,实时发送订单信息到 Kafka |
| Kafka 集群 | Broker / Topic / Partition | 负责接收、存储、分发这些订单消息 |
| 实时分析系统(Spark / Flink) | Consumer(消费者) | 从 Kafka 读取订单数据,进行实时汇总与可视化展示 |
Kafka 架构在这个案例中的角色对应
| Kafka 概念 | 案例中的作用 | 通俗比喻 |
|---|---|---|
| Producer(生产者) | 电商下单系统,每次有新订单就发送数据 | 寄快递的人 |
| Topic(主题) | “订单”主题,用来分类存放所有订单消息 | “订单货架” |
| Partition(分区) | 把订单数据分散存放到多个分区(比如A-0、A-1、A-2),提升并行处理效率 | 货架的格子 |
| Broker(代理) | Kafka集群中的三台服务器,分别保存不同分区的数据 | 仓库 |
| Consumer(消费者) | 实时分析系统,从订单Topic中读取消息 | 收快递的人 |
| Consumer Group(消费者组) | 实时统计程序分成多个实例,每个实例处理不同分区的数据 | 多个快递员分工取件 |
| Offset(偏移量) | 每条订单消息的唯一编号,表示“读到哪了” | 快递单号 / 页码 |
| Replication(副本) | 每个分区的副本分布在不同Broker上,防止某台机宕机 | 快递备份仓库 |
| Leader / Follower | Leader负责收发数据,Follower实时同步 | 主仓 / 备用仓 |
| ZooKeeper | 管理哪个Broker是Leader,协调系统运行 |
场景一:订单处理 —— Kafka 让多个系统同时知道下了订单
场景:在电商平台中,用户提交订单后,系统需要同时通知多个下游服务(如库存管理、配送系统、支付系统)来并行处理同一笔订单。
Kafka 如何帮忙:
生产者(订单系统): 下单后,订单系统把订单信息发送到 Kafka 的 “订单主题(Topic)”。
消费者组(库存 + 配送):
库存系统从订单 Topic 中读取消息 → 更新库存
配送系统也从这个 Topic 中读取 → 安排发货
类比:
Kafka 就像是一个“订单广播站”📢, 订单系统把消息贴上去,库存和配送都能看到,并各自处理。
场景二:用户行为分析 —— Kafka 记录每个用户的点击动作
场景:
用户在页面上点击、浏览、搜索,系统希望了解用户喜好,做推荐。
Kafka 如何工作:
1) 生产者(前端系统)
把用户每一次点击、搜索、浏览,发送到 Kafka 的 “用户行为主题”
2) 消费者组(推荐 + 数据分析):
推荐系统:实时更新推荐算法 → 展示更相关商品。
分析系统:统计点击热度 → 生成报表。
简单比喻:
Kafka 就像一个“用户行为记录本”📒, 谁点击了什么,它都记下来。推荐系统和分析系统随时翻记录,做决策。
场景三:销售趋势分析 —— Kafka 做实时数据的“传送带
场景:
运营团队希望实时看到哪些商品热卖、哪类产品受欢迎。
Kafka 如何工作:
1) 生产者(订单系统和用户行为系统)
将订单数据和用户行为数据发送到 Kafka 的
订单Topic和用户行为Topic。
2) 消费者(数据分析系统)
数据分析系统从多个 Topic 读取数据,进行实时分析,比如:
哪些商品销售火爆?
哪些页面点击量最高?
简单比喻:
Kafka 是“销售数据管道”,将订单和行为数据汇总到分析系统,实时生成趋势报告。
总结
Kafka 就像是消息世界里的“顺丰快递员”:
谁发消息,它就迅速、安全地送到目的地—— 让库存知道、让推荐系统知道、让分析平台知道!

1.2.2 Kafka 在电商订单场景中的数据流分析
一、场景背景
电商平台中,用户下单后,订单数据需要被多个系统使用,例如:
库存系统要减少库存
配送系统要生成配送单
Kafka 就是这个过程中负责消息传递的关键工具。
二、完整数据流步骤
步骤 1:生产者发送订单消息
用户下单 → 订单系统作为 生产者(Producer)
将订单数据发送到 Kafka 的 “订单 Topic”
示例订单:
订单#1001:用户A购买商品B订单#1002:用户C购买商品D
Kafka 会把消息发送到该 Topic 的不同 分区(Partition) 中:
订单#1001 存入 Partition 0(由 Broker1 管理)
订单#1002 存入 Partition 1(由 Broker2 管理)
步骤 2:分区复制与存储(数据高可用)
Kafka 为每个分区设置主副本机制:
Partition 0(Leader 在 Broker1)会复制一份到 Broker2(Follower)
Partition 1(Leader 在 Broker2)复制到 Broker1
✅ 这样即使某个 Broker 出故障,另一台机器也有完整数据!
步骤 3:多个消费者组独立消费订单消息
Kafka 允许多个系统同时消费同一个 Topic 中的消息,彼此互不影响。
消费者组 1:库存系统
从所有分区读取订单消息,处理库存扣减
消费者组 2:配送系统
从相同分区读取相同订单,生成配送单
每个系统都能独立完成自己的工作,互不冲突。
步骤 4:偏移量(Offset)管理
Kafka 会为每个消费者组维护一个 Offset(消费进度):
库存系统 Offset 1:已处理订单#1001、#1002
配送系统 Offset 1:同样处理完订单#1001、#1002
✅ 即使系统重启或故障,Kafka 会从上次的位置继续消费,保证不丢数据、不重复消费。
步骤 5:ZooKeeper 的作用
ZooKeeper 是 Kafka 的“后台大脑”,它负责:
管理哪个分区属于哪个 Broker
记录哪个消费者处理哪个分区
监控 Kafka 集群运行状态
存储 Kafka 的部分配置信息
ZooKeeper 确保 Kafka 集群稳定运行 + 自动分配 + 不重复消费
三、总结流程图
订单系统(Producer) ↓Kafka 订单Topic(分区存储 + 复制) ↓多个消费者组: ├── 库存系统(消费 → 扣库存) └── 配送系统(消费 → 生成配送单)Kafka 记录每个系统的消费进度(Offset) → ZooKeeper 统一协调管理
四、最终效果
| 系统 | 功能 |
|---|---|
| 库存系统 | 正确减少对应商品库存 |
| 配送系统 | 自动创建对应订单配送单 |
| Kafka + ZooKeeper | 保证消息可靠传输、不丢失 |
五、记忆口诀
一个 Topic,多个消费者组,互不影响; 一个订单,多个系统处理,各司其职; Kafka 管传递,ZooKeeper 保秩序!
Kafka 和 Flume 的简单区别
| 特点 | Kafka | Flume |
|---|---|---|
| 用途 | 数据传输 + 数据存储 | 专注于数据采集和传输 |
| 数据持久化 | 支持,消息存储在硬盘,保证数据可靠性 | 不支持,只负责传输数据 |
| 实时处理 | 支持,能与实时分析工具结合(如 Spark/Flink) | 不支持,只做数据传输 |
| 扩展性 | 支持高扩展性,适合大规模数据分发 | 限于单一任务,扩展能力较弱 |
| 场景 | 需要高吞吐量、实时处理和持久化的数据场景 | 适合简单的数据采集(如日志传输) |
1.3 Kafka 的应用场景
1)应用解耦:系统之间不再“死绑”
💬 问题:
传统架构下,系统 A 调用系统 B,如果 B 挂了,A 就崩了。
例如:订单系统直接调用库存系统检查库存,库存系统故障就导致下单失败。
✅ Kafka 解决方式:
订单系统把订单消息 发到 Kafka(异步发布)
库存系统从 Kafka 订阅并处理消息
🟢 效果:
即使库存系统短暂宕机,消息仍保留,稍后恢复处理
系统之间解耦,提高了稳定性与容错能力
✅ 图示说明:Kafka 像一个“中转站”,两边不再强依赖

2) 异步处理:各服务并发执行更高效
💬 问题:
传统方式下,各服务串行处理,效率低:
下单 → 支付 → 发货 → 通知
每一步都要等待上一步完成。
✅ Kafka 解决方式:
订单系统只负责“下单 + 发送消息”到 Kafka
支付系统、物流系统、通知系统 同时从 Kafka 获取消息并并发处理
🟢 效果:
处理速度更快:各服务可并发运行
互不影响:某个服务慢不会拖慢整体流程
更灵活:可轻松增加消费者数量来扩容
✅ 图示说明:Kafka 像一台“广播机”,多个系统听广播、各自行动


3)限流削峰:抵御高并发
💬 场景:
在秒杀、抢购等活动中,成千上万用户同时下单,数据库瞬间崩溃。
✅ Kafka 解决方式:
把所有请求发送到 Kafka 队列中
后端系统按每秒固定速率处理(比如 100 条/秒)
超出的请求直接拒绝或排队等待
🟢 效果:
系统保持平稳运行,不被挤爆
提高用户体验,避免“秒崩”
✅ 图示说明:Kafka 像“缓冲水池”,只让水缓慢流入系统

4)分布式通信与协调:不同系统通过 Kafka 协作
💬 场景:
订单处理涉及多个系统协同:支付、库存、通知等。
✅ Kafka 解决方式:
订单服务把信息发送到 Kafka
其他系统作为订阅者,按需消费对应信息
🟢 效果:
实现系统间的松耦合通信
支持多系统协同异步处理
业务流程更加灵活与高可用
✅ 图示说明:Kafka 像一个“总线”,各系统像接线板,插上就能收消息

二、Kafka的安装与使用
环境要求:
已安装
zookeeper集群 (保存元数据)已安装
Hadoop集群
2.1 Kafka单机版部署【略过】
2.2 Kafka 集群部署【重点】
Kafka 集群与 Broker 简介
1. 什么是 Kafka 集群?
Kafka 集群 = 多个 Broker 节点 + ZooKeeper组成。
Broker 就是 Kafka 的“消息服务器”,负责接收消息、存储消息、再把消息发给消费者。
每个 Broker 都能接收、存储和转发消息,多个 Broker 协同工作,让 Kafka 更强大。
2. 为什么需要多个 Broker?
| 目的 | 好处说明 |
|---|---|
| 高可用 | 一个 Broker 坏了,其他的还能继续工作,不影响整体运行。 |
| 高并发 | 需要处理更多数据?加几个 Broker 就行了,不用重做系统。 |
| 高可扩展性 | 多个 Broker 分担任务,消息处理更快、不容易卡顿。 |
3. Kafka 集群怎么搭建?
在多台机器上分别部署并启动 Kafka 实例,每个即为一个 Broker;
Kafka 会自动把这些 Broker 连成一个集群;
Kafka 会自动协调这些 Broker,实现消息分区、副本同步、Leader 选举等功能。
4. 示例:三节点 Kafka 集群
三台服务器(如
master、slave1、slave2)分别部署 Kafka Broker;启动后,自动组成 Kafka 集群;
消息会按 Partition 分布到不同 Broker;
Kafka 还会在其他 Broker 上创建副本,确保数据不丢失、服务不断。
2.2.1 集群规划
| ip | 主机名 | kafka角色 | 备注 |
|---|---|---|---|
| 192.168.36.100 | master | broker1 | 第一个 Broker |
| 192.168.36.101 | slave1 | broker2 | 第二个 Broker |
| 192.168.36.102 | slave2 | broker3 | 第三个 Broker |
ip 主机名 kafka角色 备注 192.168.36.100 master broker1 第一个 Broker 192.168.36.101 slave1 broker2 第二个 Broker 192.168.36.102 slave2 broker3 第三个 Broker
2.2.2 准备工作
1.确保三台主机已安装 JDK 和 Hadoop 环境。
2. 安装和启动 ZooKeeper
Kafka 依赖 ZooKeeper 管理元数据和协调服务。
元数据包括 Broker 信息、Topic 配置、分区和副本分配、Leader 选举状态 等。
2.2.3 集群配置
2.2.3.1 下载
下载地址:https://archive.apache.org/dist/kafka/
下载kafka_2.12-2.4.1.tgz 放入master主机的/opt/software中
2.12是Scala的版本号;2.4.1是Kafka的版本号
2.2.3.2 安装kafka
cd /opt/software
# 解压
tar -zvxf kafka_2.12-2.4.1.tgz -C /opt/apps
cd /opt/apps
# 改名
mv kafka_2.12-2.4.1 kafka2.2.3.3 配置环境变量
修改master,slave1,slave2的环境变量
# 分别在master,slave1,slave2节点上编辑环境变量
双击编辑 /etc/profile
# 在文件末尾追加如下内容:
# Kafka 安装路径
export KAFKA_HOME=/opt/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin
# 三台主机上依次运行以下命令,使环境变量生效
source /etc/profile
# 查看环境变量
echo $PATH2.2.3.4 配置(server.properties)
# 在`master`中进行配置
cd /opt/apps/kafka/config
# 修改服务端配置
vi server.properties# 修改以下五个关键配置参数
# 设置 broker.id 编号,集群模式下该 ID 必须唯一,且不可更改。
broker.id=1
# 配置 Kafka 服务器监听的网络地址和端口。
# 格式:协议://地址:端口
# PLAINTEXT:表示明文传输协议。
# 作用:Kafka 服务器监听客户端(生产者和消费者)连接请求,通过明文协议传输和接收消息。
listeners=PLAINTEXT://master:9092
# 指定 Kafka 存储消息日志数据(包括主题内容和索引)的目录,用于持久化消息数据。
log.dirs=/opt/apps/kafka/data
# 配置 ZooKeeper 集群连接信息及 Kafka 元数据存储路径。
# 作用:每个 broker 使用 ZooKeeper 保存元数据信息,如分区 Leader 信息。
# 路径 /kafka 用于存储 Kafka 的元数据。
zookeeper.connect=master:2181,slave1:2181,slave2:2181/kafka
# 允许删除 Kafka 中的主题。
# 默认为 false,设置为 true 以支持删除主题。
delete.topic.enable=true2.2.3.5 创建日志目录:
作用:存储消息日志数据(包括主题内容和索引),用于持久化消息数据。
# 在master节点上操作
cd /opt/apps/kafka
mkdir data2.2.3.6 分发
分发已经配置好的kafka目录到slave1和slave2节点的/opt/apps目录下。
# 在master上操作
scp -r /opt/apps/kafka/ slave1:/opt/apps/
scp -r /opt/apps/kafka/ slave2:/opt/apps/2.2.3.7 修改slave1/slave2配置参数
修改slave1节点上server.properties中的id和host:
# 进入slave1
cd /opt/apps/kafka/config
vi server.properties
# 修改以下2处内容:
# 设置broker.id编号
broker.id=2
# Kafka Broker 通过slave1(192.168.36.101)的9092 端口监听客户端(生产者或消费者)的连接请求。
# 使用明文传输协议(无加密或认证) PLAINTEXT 进行通信。
# 即:客户端(生产者或消费者) -> 通过 slave1:9092 -> 连接到 Kafka Broker,发送或接收消息
listeners=PLAINTEXT://slave1:9092修改slave2节点上server.properties中的id和host:
# 进入slave2
cd /opt/apps/kafka/config
vim server.properties
# 修改以下2处内容:
# 设置broker.id编号
broker.id=3
# Kafka Broker 通过slave2(192.168.36.1020的9092 端口监听客户端(生产者或消费者)的连接请求。
# 使用明文传输协议(无加密或认证) PLAINTEXT 进行通信。
# 即:客户端(生产者或消费者) -> 通过 slave2:9092 -> 连接到 Kafka Broker,发送或接收消息
listeners=PLAINTEXT://slave2:9092kafka的集群配置完成。
2.3 Kafka集群启停
2.3.1 启动 ZooKeeper
# 在master/slave1/slave2启动 ZooKeeper
zkServer.sh start
# 在master/slave1/slave2检查 ZooKeeper状态
zkServer.sh status2.3.2 启动 Kafka 集群
# # 在master/slave1/slave2启动 ZooKeeper
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties特点:
Kafka 在后台运行。
日志不会输出到终端。
Kafka 默认会将日志保存到配置文件中指定的日志文件(通常在
/opt/apps/kafka/logs/server.log)。优势:适合生产环境,运行稳定且管理方便。
适用场景:建议生产环境优先使用。
2.3.3 运行命令查看集群信息
# 查看 Kafka 集群中每个 Broker 在集群中的基本信息。
kafka-broker-api-versions.sh --bootstrap-server master:90922.3.4 集群校验
依次测试每台节点的进程是否启动(三台机器开新窗口)
====master
[hadoop@master kafka]# jps
3109 Jps
2713 QuorumPeerMain
2780 Kafka
=====slave1
[hadoop@slave1 kafka]# jps
2643 Kafka
2967 Jps
2603 QuorumPeerMain
=====slave2
[hadoop@slave2 kafka]# jps
2690 Kafka
2658 QuorumPeerMain
3014 Jps三、kafka实例操作
kafka集群启动
# 1.在master/slave1/slave2启动 ZooKeeper(建议在集群中使用独立安装的 ZooKeeper)
zkServer.sh start
# 2.在master/slave1/slave2启动 Kafka 集群
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties3.1 创建主题
任务:在 Kafka 集群中创建一个名为 test 的新主题
目标:主题 test,1 个分区,3 个副本。
kafka-topics.sh --create \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic test --partitions 1 --replication-factor 3参数解释
kafka-topics.sh:Kafka 主题管理脚本。--create:表示创建主题。--bootstrap-server:指定 Kafka 集群的 Broker 地址列表(多个地址用逗号分隔)。--topic:指定主题名称(此处为test)。--partitions:指定分区数(此处为 1)。--replication-factor:指定副本数(此处为 3,表示每个分区有 3 个副本)。
注意事项
主题名称必须唯一,不能与现有主题重复。
分区(Partition) 是生产/消费的基本单位,对应 Kafka 服务端的一个目录(例如:
主题名称-分区编号),目录中存放数据文件和索引文件。
3.2 查看所有主题列表
kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:90923.3 查看所有topic的详细信息
kafka-topics.sh --describe \
--bootstrap-server master:9092,slave1:9092,slave2:9092
##运行结果
Topic: test PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,23.4 查看指定topic的详细信息
kafka-topics.sh --describe \
--topic test \
--bootstrap-server master:9092,slave1:9092,slave2:9092
# 查看结果如下:
Topic: test PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
# 解释
Topic: test:这是主题的名称,即 "test"。
PartitionCount: 1:这个主题有一个分区。
ReplicationFactor: 3:这个主题的复制因子是 3,意味着每个分区都会有 3 个副本。
Configs: segment.bytes=1073741824:表示 Kafka 分区的日志存储会被切分成多个文件,每个文件最大为 1 GB。
Partition: 0:这是分区的编号,这个主题只有一个分区,所以编号是 0。
Leader: 1 表示分区的当前领导者(Leader)的 ID,这个分区的领导者是 1 号服务器。
Replicas: 1,3,2 表示分区 0 有三个副本,分别存储在 Broker 1、3 和 2 上。
Isr: 1,3,2 此处说明分区 0 的所有副本(1、3、2)都处于同步状态。3.5 启动控制台消费者
# 启动一个 Kafka 控制台消费者,从 master:9092 连接 Kafka 集群,订阅主题 test 并从头开始消费消息。
kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server master:9092,slave1:9092,slave2:9092
# 解释
`kafka-console-consumer.sh`:Kafka 命令行工具,用于消费指定主题的消息。
`--bootstrap-server master:9092,slave1:9092,slave2:9092 `:指定 Kafka Broker 地址作为消费者的连接点。
`--topic test`:指定目标主题 `test`。
`--from-beginning`:从主题的起始位置消费所有历史消息。 3.6 启动控制台生产者
# 通过slave1节点启动一个控制台生产者,建立与Kafka集群的连接,并将用户在控制台输入的消息发送到主题 "test"。
kafka-console-producer.sh \
--topic test \
--broker-list master:9092,slave1:9092,slave2:9092
# 解释
`kafka-console-producer.sh`:Kafka 命令行工具,用于向主题发送消息。
`--broker-list master:9092,slave1:9092,slave2:9092`:指定 Kafka 集群中 broker 的地址和端口,用逗号分隔。
`--topic test`:指定发送消息的目标主题 `test`。
# 注意:若Kafka版本 ≥ 2.5.0:可使用 --bootstrap‑server <host:port,…>,推荐使用该方式以符合新标准。3.7 生产、消费数据
# 在slave1上(生产者控制台)输入(生产)数据
>hello kafka
# 在master上(消费者控制台)就可以自动接收(消费)数据
hello kafka
# 表示消费端成功消费了生产者生产的消息!注意:Apache Kafka接收到的每一条消息都存储在日志中,默认情况下,它保持消息168小时,即7天。
3.8 删除主题
kafka-topics.sh --delete \
--topic test \
--bootstrap-server master:9092,slave1:9092,slave2:9092检查主题是否删除成功
kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092如果列表中没有 test,说明删除成功 ✅
四、实验
实验任务
配置并启动 ZooKeeper 和 Kafka 集群。
创建、查看、描述、删除主题。
使用生产者发送消息,消费者读取消息。
验证消费者组的分区分配和负载均衡。
学习多种消费方式并记录日志。
删除主题并清理实验环境。
实验目标
掌握 Kafka 的启动与主题管理。
理解消息生产与消费流程。
熟悉消费者组机制及负载均衡。
掌握不同消费方式和环境清理操作。
实验结束后,学员可熟练使用 Kafka 的基础功能。
一、启动环境
1. 启动 ZooKeeper
# 依次在三台主机上操作
zkServer.sh start
# 查看 ZooKeeper 的状态
zkServer.sh status2. 启动 Kafka
# 依次在三台主机上操作
kafka-server-start.sh -daemon \
/opt/apps/kafka/config/server.properties二、实验操作
3. 查看已存在的主题
# 查看已存在的主题
kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:90924. 创建主题
创建一个主题 user_logs,用于模拟用户日志处理:
kafka-topics.sh --create \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--partitions 3 \
--replication-factor 25. 查看主题的详细信息
# 查看所有主题的明细
kafka-topics.sh --describe --bootstrap-server master:9092
# 查看指定主题的明细
kafka-topics.sh --describe --topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:90926. 消费与生产实验
6.1 单一消费者的基本操作
# 在 master 节点启动消费者,消费 user_logs 主题的消息
kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs6.2 启动生产者,发送消息
# 在 slave1 节点启动生产者,向 user_logs 主题发送模拟用户日志数据
kafka-console-producer.sh \
--broker-list master:9092,slave1:9092,slave2:9092 \
--topic user_logs7. 消费者组实验
7.1 启动多个消费者,加入同一组user_logs_group
首先按下ctrl + c停掉刚才的master/slave1上开启的生产者和消费者
# 在 master 节点启动第一个消费者,加入组 user_logs_group
kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--group user_logs_group
# 在 slave1 节点启动第二个消费者,加入组 user_logs_group
kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--group user_logs_group7.2 验证分区分配
1)在slave2打开生产者,发送消息:
kafka-console-producer.sh \
--broker-list master:9092,slave1:9092,slave2:9092 \
--topic user_logs
# 尝试发送1-20的数据2)观察master/slave1两个消费者终端的消息分布,验证分区分配。
结论:在同一个消费组的两个消费者不能消费重复数据,共同消费生产者发来的数据
也可以使用下面的方法查看消费组的每个分区被分配到的消费者
# 【重新给master开窗运行下面的命令,此时不要关闭两个消费者】
# 查看消费者组状态:输出会显示 user_logs_group 消费组的每个分区被分配到的消费者
# 结果:分区0,1在master节点的broker上,分区2在slave1节点的broker上,
kafka-consumer-groups.sh \
--bootstrap-server master:9092 \
--describe --group user_logs_groupGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG HOST CLIENT-ID
user_logs_group user_logs 0 13 13 0 /192.168.36.100 consumer-user_logs_group-1
user_logs_group user_logs 1 13 13 0 /192.168.36.100 consumer-user_logs_group-1
user_logs_group user_logs 2 17 17 0 /192.168.36.101 consumer-user_logs_group-17.3 验证分区重新分配
1)停止master上的消费者(Ctrl+C)。
2)观察剩余slave1消费者是否接管分区数据。
8. 消费数据的不同方式
首先按下ctrl + c停掉刚才的master/slave1上开启的生产者和消费者
8.1 从Topic主题最早的消息(历史数据)开始消费
# 在 master 节点启动消费者
kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--from-beginning8.2 从最新的消息开始消费
# 重新在 slave1 节点启动消费者
kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:90928.3 从最新的消息开始消费
# 重新在 slave2 节点启动生产者,再次生产数据,看消费者的变化
kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:90929. 删除主题(实验结束后)
9.1 删除主题
kafka-topics.sh --delete \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs9.2 确认主题删除
kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092