李翔-大数据技术

Big data technology!

第9章 Kafka安装与部署

Kafka安装与部署


一、Kakfa的基础知识

1.1 什么是 Kafka

Kafka 是一个 分布式的消息队列系统,采用 发布/订阅 的工作模式,常用于 大数据实时处理 场景。

Kafka 就像一个“数据快递员”,负责把一份份数据包,从一个程序“送”到另一个程序。 它不分析数据、不存档数据,只负责“中转”——安全、快速、准时地送达。

  • 分布式:可以部署在多台机器上一起工作,像一支“快递团队”,效率更高、抗压更强。

  • 发布/订阅模式:就像一个人发消息,很多人都能同时收到(比如微信公众号的推送)。


1.2  Kafka 消息传递机制

Kafka 使用 “发布 / 订阅(Publish / Subscribe)”模式 实现高效的消息传递。其核心组成包括:

  • Topic(主题):消息的分类标签,生产者将消息发送到 Topic,消费者从 Topic 中订阅消息;

  • Producer(生产者):负责将消息发布到指定 Topic;

  • Consumer(消费者):通过订阅 Topic 来接收消息;

  • Broker(代理服务器):Kafka 服务器,负责存储消息和转发消息。

image-20250804121319100

✅ 特点:

  • 一条消息可以被多个订阅者同时接收

  • 发布者不需要知道订阅者是谁

  • 非常适合“广播场景”,比如:系统日志、活动消息推送等


1.3 kafka架构与工作原理

未标题-2

Kafka 是一个分布式的消息传递系统,可以理解为一个“数据快递中心”,负责把数据从“发送方程序”送到“接收方程序”。

Kafka 架构图(课堂讲解比喻)


Kafka 概念比喻说明
Producer(生产者)寄件人负责“把数据发出去”
Broker(代理服务器)快递仓库Broker 存放消息,就像仓库存放快递
Topic(主题)货架同类商品放同一货架,类似同类消息同一个 Topic
Partition(分区)货架上的格子同一个主题被拆成多个格子(分区)提高并行能力
Consumer(消费者)收件人最后把消息取走
Offset(偏移量)快递单号表示“消费到第几条消息”
ZooKeeper(Kafka 2.4 仍使用)调度管理员负责集群管理、选主、记录元数据



Kafka的相关概念:

Producer(生产者)

  • 定义:Kafka 中负责创建并发送消息到 Topic 的客户端程序。

  • 作用:将数据(如日志、订单、传感器信息)发送到 Kafka 指定的 Topic 。

  • 比喻:寄快递的人,决定把什么包裹寄到哪个仓库(Topic)。

Consumer(消费者)

  • 定义:负责从 Kafka 中读取消息的程序。

  • 作用:从一个或多个 Topic 中订阅、接收并处理数据

  • 比喻:收快递的人,从指定货架(Topic)取自己想要的数据包。

Consumer Group(消费者组)

  • 定义:多个消费者组成一个小组,共同处理一个 Topic 的数据。

  • 机制:Kafka 会把 Topic 的分区(Partition)分配给组内不同的消费者。

    • 每个分区只能被组内一个消费者读取;

    • 不同组之间互不影响。

  • 比喻:一个快递公司(组)里的多个快递员,每个人负责不同片区的快递。

Broker(代理)

  • 定义:Kafka 集群中的一个服务器节点。

  • 作用:存储消息、接收 Producer 的数据请求、响应 Consumer 的读取请求。

  • 示例:master、slave1、slave2 三台机器就组成一个 Kafka 集群。

  • 比喻:每个 Broker 就是一座快递仓库,存放和派发部分包裹。

Topic(主题)

  • 定义:Kafka 中消息的分类通道,类似于“文件夹”。

  • 作用:不同类型的数据放在不同的 Topic 中,方便管理。

  • 举例:在一个电商系统中,可以有以下几个 Topic:

    • “订单Topic”放订单数据;

    • “库存Topic”放库存数据;

    • “日志Topic”放系统日志。

Partition(分区)

  • 定义:Topic 的物理划分单元,每个分区存放一部分消息。

  • 作用:支持并行处理,提高系统吞吐量。

  • 比喻:货架上的格子,不同格子装不同的快递包裹。

Replication(副本)

  • 定义:每个分区可以有多个副本,分布在不同的 Broker 上。

  • 作用:保证数据高可用,防止某个 Broker 宕机导致数据丢失。

  • 结构:1 个 Leader + 若干个 Follower。

  • 比喻:同一份快递的备份,放在不同仓库里,主仓坏了备用仓顶上。

LeaderFollower

  • Leader:分区的主副本,也是唯一对外提供读写服务的角色。。

  • Follower:备份副本,从 Leader 同步数据;当 Leader 出问题时自动接管。

  • 作用:Leader 写读、Follower 复制。主挂备上,数据不丢。

  • 比喻:Leader 是“主仓管”,Follower 是“备用仓管”。

Offset(偏移量)

  • 定义:每条消息在分区中的唯一编号。

  • 作用:记录消费者读取到哪一条消息,用来追踪进度。

  • 比喻:快递单号 / Excel 行号,帮助“收件人”记住上次看到哪里。

ZooKeeper

  • 定义:Kafka 集群的协调和管理系统。

  • 作用

    • 管理 Broker 状态:记录 Broker 的上线、下线,并通知 Kafka;

    • 维护元数据(Topic / Partition 信息):保存 Topic 的结构、Partition 的分布、ISR 列表(同步副本列表)等集群元数据;

    • 负责选举:管理集群  Leader 选举。

  • 比喻:快递公司的“调度中心”,不搬快递,但决定谁负责、谁接班。


1.2.1 Kafka 在电商行业的应用

🎯 案例:电商系统实时订单分析

案例背景

一家电商平台希望实时统计每分钟的订单数量,以便在销量暴涨时及时扩容服务器。 系统由三个部分组成:


模块角色说明
订单系统Producer(生产者)当用户下单时,实时发送订单信息到 Kafka
Kafka 集群Broker / Topic / Partition负责接收、存储、分发这些订单消息
实时分析系统(Spark / Flink)Consumer(消费者)从 Kafka 读取订单数据,进行实时汇总与可视化展示



Kafka 架构在这个案例中的角色对应


Kafka 概念案例中的作用通俗比喻
Producer(生产者)电商下单系统,每次有新订单就发送数据寄快递的人
Topic(主题)“订单”主题,用来分类存放所有订单消息“订单货架”
Partition(分区)把订单数据分散存放到多个分区(比如A-0、A-1、A-2),提升并行处理效率货架的格子
Broker(代理)Kafka集群中的三台服务器,分别保存不同分区的数据仓库
Consumer(消费者)实时分析系统,从订单Topic中读取消息收快递的人
Consumer Group(消费者组)实时统计程序分成多个实例,每个实例处理不同分区的数据多个快递员分工取件
Offset(偏移量)每条订单消息的唯一编号,表示“读到哪了”快递单号 / 页码
Replication(副本)每个分区的副本分布在不同Broker上,防止某台机宕机快递备份仓库
Leader / FollowerLeader负责收发数据,Follower实时同步主仓 / 备用仓
ZooKeeper管理哪个Broker是Leader,协调系统运行



场景一:订单处理 —— Kafka 让多个系统同时知道下了订单

场景:在电商平台中,用户提交订单后,系统需要同时通知多个下游服务(如库存管理、配送系统、支付系统)来并行处理同一笔订单。

Kafka 如何帮忙:

  • 生产者(订单系统) 下单后,订单系统把订单信息发送到 Kafka 的 “订单主题(Topic)”

  • 消费者组(库存 + 配送)

    • 库存系统从订单 Topic 中读取消息 → 更新库存

    • 配送系统也从这个 Topic 中读取 → 安排发货

类比:

Kafka 就像是一个“订单广播站”📢, 订单系统把消息贴上去,库存和配送都能看到,并各自处理。



场景二:用户行为分析 —— Kafka 记录每个用户的点击动作

场景:

  • 用户在页面上点击、浏览、搜索,系统希望了解用户喜好,做推荐。

Kafka 如何工作:

1) 生产者(前端系统)

  • 把用户每一次点击、搜索、浏览,发送到 Kafka 的 “用户行为主题”

2) 消费者组(推荐 + 数据分析):

  • 推荐系统:实时更新推荐算法 → 展示更相关商品。

  • 分析系统:统计点击热度 → 生成报表。

简单比喻:

  • Kafka 就像一个“用户行为记录本”📒, 谁点击了什么,它都记下来。推荐系统和分析系统随时翻记录,做决策。



场景三:销售趋势分析 —— Kafka 做实时数据的“传送带

场景:

  • 运营团队希望实时看到哪些商品热卖、哪类产品受欢迎。

Kafka 如何工作:

1) 生产者(订单系统和用户行为系统)

  • 将订单数据和用户行为数据发送到 Kafka 的 订单Topic用户行为Topic

2) 消费者(数据分析系统)

  • 数据分析系统从多个 Topic 读取数据,进行实时分析,比如:

    • 哪些商品销售火爆?

    • 哪些页面点击量最高?

简单比喻:

  • Kafka 是“销售数据管道”,将订单和行为数据汇总到分析系统,实时生成趋势报告。


总结

Kafka 就像是消息世界里的“顺丰快递员”

谁发消息,它就迅速、安全地送到目的地—— 让库存知道、让推荐系统知道、让分析平台知道!


img


1.2.2 Kafka 在电商订单场景中的数据流分析


一、场景背景

电商平台中,用户下单后,订单数据需要被多个系统使用,例如:

  • 库存系统要减少库存

  • 配送系统要生成配送单

Kafka 就是这个过程中负责消息传递的关键工具。


二、完整数据流步骤


步骤 1:生产者发送订单消息

  • 用户下单 → 订单系统作为 生产者(Producer)

  • 将订单数据发送到 Kafka 的 “订单 Topic”

示例订单:

订单#1001:用户A购买商品B订单#1002:用户C购买商品D

Kafka 会把消息发送到该 Topic 的不同 分区(Partition) 中:

  • 订单#1001 存入 Partition 0(由 Broker1 管理)

  • 订单#1002 存入 Partition 1(由 Broker2 管理)


步骤 2:分区复制与存储(数据高可用)

Kafka 为每个分区设置主副本机制:

  • Partition 0(Leader 在 Broker1)会复制一份到 Broker2(Follower)

  • Partition 1(Leader 在 Broker2)复制到 Broker1

✅ 这样即使某个 Broker 出故障,另一台机器也有完整数据!


步骤 3:多个消费者组独立消费订单消息

Kafka 允许多个系统同时消费同一个 Topic 中的消息,彼此互不影响。

  • 消费者组 1:库存系统

    • 从所有分区读取订单消息,处理库存扣减

  • 消费者组 2:配送系统

    • 从相同分区读取相同订单,生成配送单

每个系统都能独立完成自己的工作,互不冲突。


步骤 4:偏移量(Offset)管理

Kafka 会为每个消费者组维护一个 Offset(消费进度):

  • 库存系统 Offset 1:已处理订单#1001、#1002

  • 配送系统 Offset 1:同样处理完订单#1001、#1002

✅ 即使系统重启或故障,Kafka 会从上次的位置继续消费,保证不丢数据、不重复消费。


步骤 5:ZooKeeper 的作用

ZooKeeper 是 Kafka 的“后台大脑”,它负责:

  • 管理哪个分区属于哪个 Broker

  • 记录哪个消费者处理哪个分区

  • 监控 Kafka 集群运行状态

  • 存储 Kafka 的部分配置信息

ZooKeeper 确保 Kafka 集群稳定运行 + 自动分配 + 不重复消费


三、总结流程图

订单系统(Producer)   ↓Kafka 订单Topic(分区存储 + 复制)   ↓多个消费者组:   ├── 库存系统(消费 → 扣库存)   └── 配送系统(消费 → 生成配送单)Kafka 记录每个系统的消费进度(Offset) → ZooKeeper 统一协调管理

四、最终效果


系统功能
库存系统正确减少对应商品库存
配送系统自动创建对应订单配送单
Kafka + ZooKeeper保证消息可靠传输、不丢失



五、记忆口诀

一个 Topic,多个消费者组,互不影响 一个订单,多个系统处理,各司其职 Kafka 管传递,ZooKeeper 保秩序!



Kafka 和 Flume 的简单区别


特点KafkaFlume
用途数据传输 + 数据存储专注于数据采集和传输
数据持久化支持,消息存储在硬盘,保证数据可靠性不支持,只负责传输数据
实时处理支持,能与实时分析工具结合(如 Spark/Flink)不支持,只做数据传输
扩展性支持高扩展性,适合大规模数据分发限于单一任务,扩展能力较弱
场景需要高吞吐量、实时处理和持久化的数据场景适合简单的数据采集(如日志传输)



1.3 Kafka 的应用场景

1)应用解耦:系统之间不再“死绑”

💬 问题:

传统架构下,系统 A 调用系统 B,如果 B 挂了,A 就崩了。

例如:订单系统直接调用库存系统检查库存,库存系统故障就导致下单失败。

✅ Kafka 解决方式:

  • 订单系统把订单消息 发到 Kafka(异步发布)

  • 库存系统从 Kafka 订阅并处理消息

🟢 效果:

  • 即使库存系统短暂宕机,消息仍保留,稍后恢复处理

  • 系统之间解耦,提高了稳定性与容错能力

✅ 图示说明:Kafka 像一个“中转站”,两边不再强依赖

image-20230518164456301


2) 异步处理:各服务并发执行更高效

💬 问题:

传统方式下,各服务串行处理,效率低:

下单 → 支付 → 发货 → 通知

每一步都要等待上一步完成。

✅ Kafka 解决方式:

  • 订单系统只负责“下单 + 发送消息”到 Kafka

  • 支付系统、物流系统、通知系统 同时从 Kafka 获取消息并并发处理

🟢 效果:

  • 处理速度更快:各服务可并发运行

  • 互不影响:某个服务慢不会拖慢整体流程

  • 更灵活:可轻松增加消费者数量来扩容

✅ 图示说明:Kafka 像一台“广播机”,多个系统听广播、各自行动


image-20230521165644357


image-20230521171118303


3)限流削峰:抵御高并发

💬 场景:

在秒杀、抢购等活动中,成千上万用户同时下单,数据库瞬间崩溃。

✅ Kafka 解决方式:

  • 把所有请求发送到 Kafka 队列中

  • 后端系统按每秒固定速率处理(比如 100 条/秒)

  • 超出的请求直接拒绝或排队等待

🟢 效果:

  • 系统保持平稳运行,不被挤爆

  • 提高用户体验,避免“秒崩”

✅ 图示说明:Kafka 像“缓冲水池”,只让水缓慢流入系统

image-20230518170043638


4)分布式通信与协调:不同系统通过 Kafka 协作

💬 场景:

订单处理涉及多个系统协同:支付、库存、通知等。

✅ Kafka 解决方式:

  • 订单服务把信息发送到 Kafka

  • 其他系统作为订阅者,按需消费对应信息

🟢 效果:

  • 实现系统间的松耦合通信

  • 支持多系统协同异步处理

  • 业务流程更加灵活与高可用

✅ 图示说明:Kafka 像一个“总线”,各系统像接线板,插上就能收消息

image-20241116143306582


二、Kafka的安装与使用

环境要求:

  • 已安装zookeeper集群 (保存元数据)

  • 已安装Hadoop集群


2.1 Kafka单机版部署【略过】


2.2 Kafka 集群部署【重点】

Kafka 集群与 Broker 简介

1. 什么是 Kafka 集群?

  • Kafka 集群 = 多个 Broker 节点 + ZooKeeper组成。

  • Broker 就是 Kafka 的“消息服务器”,负责接收消息、存储消息、再把消息发给消费者。

  • 每个 Broker 都能接收、存储和转发消息,多个 Broker 协同工作,让 Kafka 更强大。

2. 为什么需要多个 Broker?


目的好处说明
高可用一个 Broker 坏了,其他的还能继续工作,不影响整体运行。
高并发需要处理更多数据?加几个 Broker 就行了,不用重做系统。
高可扩展性多个 Broker 分担任务,消息处理更快、不容易卡顿。


3. Kafka 集群怎么搭建?

  • 在多台机器上分别部署并启动 Kafka 实例,每个即为一个 Broker;

  • Kafka 会自动把这些 Broker 连成一个集群

  • Kafka 会自动协调这些 Broker,实现消息分区、副本同步、Leader 选举等功能。

4. 示例:三节点 Kafka 集群

  • 三台服务器(如 masterslave1slave2)分别部署 Kafka Broker

  • 启动后,自动组成 Kafka 集群;

  • 消息会按 Partition 分布到不同 Broker;

  • Kafka 还会在其他 Broker 上创建副本,确保数据不丢失、服务不断。


2.2.1 集群规划


ip主机名kafka角色备注
192.168.36.100masterbroker1第一个 Broker
192.168.36.101slave1broker2第二个 Broker
192.168.36.102slave2broker3第三个 Broker


2.2.1 集群规划

ip主机名kafka角色备注
192.168.36.100masterbroker1第一个 Broker
192.168.36.101slave1broker2第二个 Broker
192.168.36.102slave2broker3第三个 Broker


2.2.2 准备工作

1.确保三台主机已安装 JDK 和 Hadoop 环境。

2. 安装和启动 ZooKeeper

  • Kafka 依赖 ZooKeeper 管理元数据和协调服务。

  • 元数据包括 Broker 信息、Topic 配置、分区和副本分配、Leader 选举状态 等。


2.2.3 集群配置

2.2.3.1 下载

下载地址:https://archive.apache.org/dist/kafka/

下载kafka_2.12-2.4.1.tgz 放入master主机的/opt/software

2.12是Scala的版本号;2.4.1是Kafka的版本号


2.2.3.2 安装kafka
cd /opt/software
# 解压
tar -zvxf kafka_2.12-2.4.1.tgz -C /opt/apps

cd /opt/apps
# 改名
mv kafka_2.12-2.4.1  kafka


2.2.3.3 配置环境变量

修改master,slave1,slave2的环境变量

# 分别在master,slave1,slave2节点上编辑环境变量
双击编辑 /etc/profile

# 在文件末尾追加如下内容:
# Kafka 安装路径
export KAFKA_HOME=/opt/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin

# 三台主机上依次运行以下命令,使环境变量生效
source /etc/profile
# 查看环境变量
echo $PATH


2.2.3.4 配置(server.properties)
# 在`master`中进行配置
cd /opt/apps/kafka/config
# 修改服务端配置
vi server.properties
# 修改以下五个关键配置参数

# 设置 broker.id 编号,集群模式下该 ID 必须唯一,且不可更改。
broker.id=1

# 配置 Kafka 服务器监听的网络地址和端口。
# 格式:协议://地址:端口
# PLAINTEXT:表示明文传输协议。
# 作用:Kafka 服务器监听客户端(生产者和消费者)连接请求,通过明文协议传输和接收消息。
listeners=PLAINTEXT://master:9092

# 指定 Kafka 存储消息日志数据(包括主题内容和索引)的目录,用于持久化消息数据。
log.dirs=/opt/apps/kafka/data

# 配置 ZooKeeper 集群连接信息及 Kafka 元数据存储路径。
# 作用:每个 broker 使用 ZooKeeper 保存元数据信息,如分区 Leader 信息。
# 路径 /kafka 用于存储 Kafka 的元数据。
zookeeper.connect=master:2181,slave1:2181,slave2:2181/kafka

# 允许删除 Kafka 中的主题。
# 默认为 false,设置为 true 以支持删除主题。
delete.topic.enable=true


2.2.3.5 创建日志目录:

作用:存储消息日志数据(包括主题内容和索引),用于持久化消息数据。

# 在master节点上操作
cd /opt/apps/kafka
mkdir data


2.2.3.6 分发

分发已经配置好的kafka目录到slave1slave2节点的/opt/apps目录下。

# 在master上操作
scp -r /opt/apps/kafka/  slave1:/opt/apps/
scp -r /opt/apps/kafka/  slave2:/opt/apps/


2.2.3.7 修改slave1/slave2配置参数

修改slave1节点上server.properties中的idhost:

# 进入slave1
cd /opt/apps/kafka/config
vi server.properties

# 修改以下2处内容:
# 设置broker.id编号
broker.id=2

# Kafka Broker 通过slave1(192.168.36.101)的9092 端口监听客户端(生产者或消费者)的连接请求。
# 使用明文传输协议(无加密或认证) PLAINTEXT 进行通信。
# 即:客户端(生产者或消费者) -> 通过 slave1:9092 -> 连接到 Kafka Broker,发送或接收消息
listeners=PLAINTEXT://slave1:9092

修改slave2节点上server.properties中的idhost:

# 进入slave2
cd /opt/apps/kafka/config
vim server.properties

# 修改以下2处内容:
# 设置broker.id编号
broker.id=3

# Kafka Broker 通过slave2(192.168.36.1020的9092 端口监听客户端(生产者或消费者)的连接请求。
# 使用明文传输协议(无加密或认证) PLAINTEXT 进行通信。
# 即:客户端(生产者或消费者) -> 通过 slave2:9092 -> 连接到 Kafka Broker,发送或接收消息
listeners=PLAINTEXT://slave2:9092

kafka的集群配置完成。


2.3 Kafka集群启停

2.3.1 启动 ZooKeeper

# 在master/slave1/slave2启动 ZooKeeper
zkServer.sh start

# 在master/slave1/slave2检查 ZooKeeper状态
zkServer.sh status


2.3.2 启动 Kafka 集群

# # 在master/slave1/slave2启动 ZooKeeper
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties
  • 特点

    • Kafka 在后台运行。

    • 日志不会输出到终端。

    • Kafka 默认会将日志保存到配置文件中指定的日志文件(通常在 /opt/apps/kafka/logs/server.log)。

  • 优势:适合生产环境,运行稳定且管理方便。

  • 适用场景:建议生产环境优先使用。


2.3.3 运行命令查看集群信息

# 查看 Kafka 集群中每个 Broker 在集群中的基本信息。
kafka-broker-api-versions.sh --bootstrap-server master:9092


2.3.4 集群校验

  • 依次测试每台节点的进程是否启动(三台机器开新窗口)

====master
[hadoop@master kafka]# jps
3109 Jps
2713 QuorumPeerMain
2780 Kafka

=====slave1
[hadoop@slave1 kafka]# jps
2643 Kafka
2967 Jps
2603 QuorumPeerMain

=====slave2
[hadoop@slave2 kafka]# jps
2690 Kafka
2658 QuorumPeerMain
3014 Jps


三、kafka实例操作

kafka集群启动

# 1.在master/slave1/slave2启动 ZooKeeper(建议在集群中使用独立安装的 ZooKeeper)
zkServer.sh start

# 2.在master/slave1/slave2启动 Kafka 集群
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties


3.1 创建主题

任务:在 Kafka 集群中创建一个名为 test 的新主题

目标:主题 test,1 个分区,3 个副本。


kafka-topics.sh --create \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic test --partitions 1 --replication-factor 3

参数解释

  • kafka-topics.sh:Kafka 主题管理脚本。

  • --create:表示创建主题。

  • --bootstrap-server:指定 Kafka 集群的 Broker 地址列表(多个地址用逗号分隔)。

  • --topic:指定主题名称(此处为 test)。

  • --partitions:指定分区数(此处为 1)。

  • --replication-factor:指定副本数(此处为 3,表示每个分区有 3 个副本)。

注意事项

  • 主题名称必须唯一,不能与现有主题重复。

  • 分区(Partition) 是生产/消费的基本单位,对应 Kafka 服务端的一个目录(例如:主题名称-分区编号),目录中存放数据文件和索引文件。



3.2 查看所有主题列表

kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092


3.3 查看所有topic的详细信息

kafka-topics.sh --describe \
--bootstrap-server master:9092,slave1:9092,slave2:9092

##运行结果
Topic: test     PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824
       Topic: test     Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2


3.4 查看指定topic的详细信息

kafka-topics.sh --describe \
--topic test \
--bootstrap-server master:9092,slave1:9092,slave2:9092

# 查看结果如下:
Topic: test     PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824
       Topic: test     Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2

# 解释
Topic: test:这是主题的名称,即 "test"
PartitionCount: 1:这个主题有一个分区。
ReplicationFactor: 3:这个主题的复制因子是 3,意味着每个分区都会有 3 个副本。
Configs: segment.bytes=1073741824:表示 Kafka 分区的日志存储会被切分成多个文件,每个文件最大为 1 GB。
Partition: 0:这是分区的编号,这个主题只有一个分区,所以编号是 0
Leader: 1  表示分区的当前领导者(Leader)的 ID,这个分区的领导者是 1 号服务器。
Replicas: 1,3,2 表示分区 0 有三个副本,分别存储在 Broker 1、3 和 2 上。
Isr: 1,3,2 此处说明分区 0 的所有副本(1、3、2)都处于同步状态。


3.5 启动控制台消费者

# 启动一个 Kafka 控制台消费者,从 master:9092 连接 Kafka 集群,订阅主题 test 并从头开始消费消息。
kafka-console-consumer.sh \
--topic test \
--from-beginning \
--bootstrap-server master:9092,slave1:9092,slave2:9092

# 解释
`kafka-console-consumer.sh`:Kafka 命令行工具,用于消费指定主题的消息。  
`--bootstrap-server master:9092,slave1:9092,slave2:9092 `:指定 Kafka Broker 地址作为消费者的连接点。  
`--topic test`:指定目标主题 `test`。  
`--from-beginning`:从主题的起始位置消费所有历史消息。  



3.6 启动控制台生产者

# 通过slave1节点启动一个控制台生产者,建立与Kafka集群的连接,并将用户在控制台输入的消息发送到主题 "test"。
kafka-console-producer.sh \
--topic test \
--broker-list master:9092,slave1:9092,slave2:9092

# 解释
`kafka-console-producer.sh`:Kafka 命令行工具,用于向主题发送消息。  
`--broker-list master:9092,slave1:9092,slave2:9092`:指定 Kafka 集群中 broker 的地址和端口,用逗号分隔。  
`--topic test`:指定发送消息的目标主题 `test`

# 注意:若Kafka版本 ≥ 2.5.0:可使用 --bootstrap‑server <host:port,…>,推荐使用该方式以符合新标准。


3.7 生产、消费数据

# 在slave1上(生产者控制台)输入(生产)数据
>hello kafka

# 在master上(消费者控制台)就可以自动接收(消费)数据
hello kafka

# 表示消费端成功消费了生产者生产的消息!


注意:Apache Kafka接收到的每一条消息都存储在日志中,默认情况下,它保持消息168小时,即7天。


3.8 删除主题

kafka-topics.sh --delete \
--topic test \
--bootstrap-server master:9092,slave1:9092,slave2:9092

检查主题是否删除成功

kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092

如果列表中没有 test,说明删除成功 ✅



四、实验

实验任务

  1. 配置并启动 ZooKeeper 和 Kafka 集群。  

  2. 创建、查看、描述、删除主题。  

  3. 使用生产者发送消息,消费者读取消息。  

  4. 验证消费者组的分区分配和负载均衡。  

  5. 学习多种消费方式并记录日志。  

  6. 删除主题并清理实验环境。


实验目标

  1. 掌握 Kafka 的启动与主题管理。  

  2. 理解消息生产与消费流程。  

  3. 熟悉消费者组机制及负载均衡。  

  4. 掌握不同消费方式和环境清理操作。  

实验结束后,学员可熟练使用 Kafka 的基础功能。

一、启动环境

1. 启动 ZooKeeper

# 依次在三台主机上操作
zkServer.sh start

# 查看 ZooKeeper 的状态
zkServer.sh status

2. 启动 Kafka

# 依次在三台主机上操作
kafka-server-start.sh -daemon \
/opt/apps/kafka/config/server.properties


二、实验操作


3. 查看已存在的主题

#  查看已存在的主题
kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092

4. 创建主题

创建一个主题 user_logs,用于模拟用户日志处理:

kafka-topics.sh --create \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--partitions 3 \
--replication-factor 2

5. 查看主题的详细信息

# 查看所有主题的明细
kafka-topics.sh --describe --bootstrap-server master:9092

# 查看指定主题的明细
kafka-topics.sh --describe --topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:9092


6. 消费与生产实验

6.1 单一消费者的基本操作

# 在 master 节点启动消费者,消费 user_logs 主题的消息
kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs

6.2 启动生产者,发送消息

# 在 slave1 节点启动生产者,向 user_logs 主题发送模拟用户日志数据
kafka-console-producer.sh \
--broker-list master:9092,slave1:9092,slave2:9092 \
--topic user_logs


7. 消费者组实验

7.1 启动多个消费者,加入同一组user_logs_group

首先按下ctrl + c停掉刚才的master/slave1上开启的生产者和消费者

# 在 master 节点启动第一个消费者,加入组 user_logs_group
kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--group user_logs_group

# 在 slave1 节点启动第二个消费者,加入组 user_logs_group
kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--group user_logs_group

7.2 验证分区分配

1)在slave2打开生产者,发送消息:

kafka-console-producer.sh \
--broker-list master:9092,slave1:9092,slave2:9092 \
--topic user_logs

# 尝试发送1-20的数据

2)观察master/slave1两个消费者终端的消息分布,验证分区分配。

结论:在同一个消费组的两个消费者不能消费重复数据,共同消费生产者发来的数据

也可以使用下面的方法查看消费组的每个分区被分配到的消费者

# 【重新给master开窗运行下面的命令,此时不要关闭两个消费者】
# 查看消费者组状态:输出会显示 user_logs_group 消费组的每个分区被分配到的消费者
# 结果:分区0,1在master节点的broker上,分区2在slave1节点的broker上,
kafka-consumer-groups.sh \
--bootstrap-server master:9092 \
--describe --group user_logs_group
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG      HOST            CLIENT-ID
user_logs_group user_logs       0          13              13              0        /192.168.36.100 consumer-user_logs_group-1
user_logs_group user_logs       1          13              13              0        /192.168.36.100 consumer-user_logs_group-1
user_logs_group user_logs       2          17              17              0        /192.168.36.101 consumer-user_logs_group-1


7.3 验证分区重新分配

1)停止master上的消费者(Ctrl+C)。

2)观察剩余slave1消费者是否接管分区数据。



8. 消费数据的不同方式

首先按下ctrl + c停掉刚才的master/slave1上开启的生产者和消费者

8.1 从Topic主题最早的消息(历史数据)开始消费

# 在 master 节点启动消费者
kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--from-beginning

8.2 从最新的消息开始消费

# 重新在 slave1 节点启动消费者
kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:9092

8.3 从最新的消息开始消费

# 重新在 slave2 节点启动生产者,再次生产数据,看消费者的变化 
kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:9092


9. 删除主题(实验结束后)

9.1 删除主题

kafka-topics.sh --delete \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs

9.2 确认主题删除

kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092





发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1