Kafka安装与部署
一、Kakfa的基础知识
1.1 什么是 Kafka
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
消息队列的两种方式
点对点模式
点对点模式下包括三个角色
消息队列queue
发送者 (生产者)
接收者(消费者)
消息发送者生产消息发送到
queue中,然后消息接收者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息接收者不可能消费到已经被消费的消息。
「点对点模式特点:」
这种模式涉及到一个发送者(生产者),一个消息队列,和一个接收者(消费者)。
消息发送到队列中,然后被单个消费者接收和消费。一旦消息被消费,它就从队列中消失。
这种模式适用于确保消息只被一个消费者处理的场景。
发布/订阅模式
发布 / 订阅模式下包括三个角色:
主题 (
Topic)发布者 (
Publisher)订阅者 (
Subscriber)
发布者将消息发送到 Topic, 系统将这些消息传递给多个订阅者。
发布 / 订阅模式特点:
在发布-订阅模型中,消息发送者(发布者)将消息发布到一个主题(Topic)。
消息接收者(订阅者)可以选择订阅感兴趣的主题,并从该主题接收消息。
当发布者发布一条消息到主题时,所有订阅了该主题的订阅者都会接收到这条消息。
每个订阅者都可以独立地处理消息,发布者不需要关心具体的订阅者。
这种模型适用于一对多的通信模式,其中发布者可以将消息广播给所有订阅者。

1.2 kafka架构与工作原理

Kafka的相关概念:
Producer(生产者)
定义:负责创建消息并将其发布到 Kafka 的 Topic 中。
作用:生产者将数据发送到 Kafka 集群的 Broker中的Topic 。
Consumer(消费者)
定义:读取 Kafka 中消息的客户端。
作用:从指定的 Topic 中读取消息并处理它。
Consumer Group(消费者组)
定义:多个消费者可以组成一个组,Kafka 将 Topic 中的分区分配给组内的消费者,保证每条消息在组内只能被一个消费者消费。
作用:同一 Topic 可以被不同组独立消费,而组内的消费者分工处理分区的数据。
Broker(代理)
定义:Kafka 集群中的服务器节点。master、slave1、slave2 部署了 Kafka Broker 的三个节点。
作用:负责存储和管理 Topic 的一部分分区数据,并处理来自生产者和消费者的请求。
Topic(主题)
定义:Kafka 中的消息分类标签,类似于文件夹的功能,用于存储和管理一类消息。
作用:用户只需指定 Topic 名称即可生产或消费数据,而不必关心物理存储的细节。
举例:在一个电商系统中,可以有以下几个 Topic:
订单Topic:存储所有订单数据。用户行为Topic:存储用户的点击、浏览等行为数据。库存Topic:存储库存更新信息。
Partition(分区)
定义:Topic 被划分成的多个子集,每个 Partition 内的消息是有序的。
作用:分布式存储和并行处理。消费者数量一般小于或等于分区数量。
形式:分区在底层表现为服务器上的文件夹。
Replication(副本)
定义:每个 Partition 会有多个副本,用于高可用性和容错。
作用:当一个 Broker 挂掉时,其他副本可以提供服务。
Replication-Leader 和 Replication-Follower
定义:每个 Partition 的多个副本中,有一个 Leader 和多个 Follower。
作用:
Leader:负责与生产者和消费者的所有读写交互。
Follower:仅复制 Leader 的数据,当 Leader 故障时,Follower 可自动提升为 Leader。
Offset(偏移量)
定义:标识消息在 Partition 中的位置,从 0 开始递增。
作用:消费者根据 Offset 知道该从哪里开始读取,确保数据读取的精准性和一致性。
举例:每条消息在其分区中都有唯一的 Offset。Offset 就像是书籍的页码或 Excel 的行号:
ZooKeeper
定义:Kafka 使用 ZooKeeper 来存储元数据并协调集群操作。
作用:用于分区的 Leader 选举、消费者分组的管理,以及确保 Kafka 的高可用性。
举例:元数据包括关于 Broker、Topic、Partition、Leader/Follower 的状态信息
1.2.1 Kafka 在电商行业的应用
1. 订单管理:处理订单消息
场景:
用户在电商平台下单。
系统需要通知多个部门(比如库存和配送)进行处理。
Kafka 如何工作:
1) 生产者(订单系统)
下单后,订单系统将订单信息发送到 Kafka 的
订单Topic。比如订单内容是:
订单#1001,用户A购买了商品B。
2) 消费者组(库存系统和配送系统)
库存系统:从
订单Topic读取订单信息,减少库存。配送系统:从
订单Topic读取订单信息,生成配送单。
简单比喻:
Kafka 是一个“订单广播站”,订单系统把订单消息放到广播站,库存部门和配送部门从广播站拿到订单,分别完成自己的任务。
2. 用户行为分析:推荐商品
场景:
用户在网站浏览商品、点击页面、搜索关键字。
系统需要分析用户的行为,推荐个性化商品。
Kafka 如何工作:
1) 生产者(前端系统)
用户的每个行为(比如浏览商品C,搜索“手机”)都会被发送到 Kafka 的
用户行为Topic。消息内容可能是:
用户A点击了商品C或用户B搜索了“手机”。
2) 消费者组(推荐系统和数据分析系统)
推荐系统:实时读取用户行为,更新推荐算法,展示相关商品。
数据分析系统:分析用户浏览和点击数据,统计热门商品。
简单比喻:
Kafka 是一个“用户行为记录本”,记录下每个用户的操作。推荐系统根据记录实时推荐,数据分析系统统计这些记录,用于改进服务。
3. 实时数据分析:发现销售趋势
场景:
需要实时分析商品销售数据,监控热门商品和销售趋势。
Kafka 如何工作:
1) 生产者(订单系统和用户行为系统)
将订单数据和用户行为数据发送到 Kafka 的
订单Topic和用户行为Topic。
2) 消费者(数据分析系统)
数据分析系统从多个 Topic 读取数据,进行实时分析,比如:
哪些商品销售火爆?
哪些页面点击量最高?
简单比喻:
Kafka 是“销售数据管道”,将订单和行为数据汇总到分析系统,实时生成趋势报告。
总结
Kafka 在电商行业中,就像是一个“消息分发中心”,无论是订单处理、用户行为分析,它都负责高效、可靠地传递消息给相关系统,让各部门能够实时协同工作。
通俗比喻:Kafka 是消息的快递员,确保每条信息快速、安全地送到需要的人手中!

1.2.2 电商订单场景 Kafka 数据流分析
场景背景
用户在电商平台下单后,订单数据通过 Kafka 系统发送给多个消费者(如库存系统和配送系统),确保订单相关任务能够独立完成。
数据流分析
1. 生产者发送消息
Producer A(订单系统)生成订单消息:
订单#1001:用户A购买了商品B。
订单#1002:用户C购买了商品D。
消息被发送到 Topic A 的分区:
订单#1001 存储在 Partition 0(Leader 分区,位于 Broker1)。
订单#1002 存储在 Partition 1(Leader 分区,位于 Broker2)。
2. 分区存储与复制
消息存储:
Partition 0 的 Leader 存储在 Broker1,保存订单#1001。
Partition 1 的 Leader 存储在 Broker2,保存订单#1002。
消息复制:
Partition 0 的数据从 Broker1 复制到 Broker2(Follower 分区)。
Partition 1 的数据从 Broker2 复制到 Broker1(Follower 分区)。
3. 消费者读取消息
多个消费者组:
消费者组 1(库存系统): 负责更新库存。
消费者组 2(配送系统): 负责生成配送单。
独立消费:
消费者组 1(库存系统):
从 Partition 0 读取 订单#1001,并减少库存。
从 Partition 1 读取 订单#1002,并减少库存。
消费者组 2(配送系统):
从 Partition 0 读取 订单#1001,并生成配送单。
从 Partition 1 读取 订单#1002,并生成配送单。
4. 消息偏移量管理
Offset 跟踪:
每个消费者组独立维护 Offset:
消费者组 1(库存系统): 读取 Offset 1(已处理订单#1001 和订单#1002)。
消费者组 2(配送系统): 读取 Offset 1(已处理订单#1001 和订单#1002)。
故障恢复:
消费者组的 Offset 记录在 Kafka 中,发生故障时可从上次的消费位置继续处理。
5. ZooKeeper 协调
管理 Kafka 的元数据:如记录分区的 Leader 和 Follower 信息;Topic 的分区数量、每个分区对应的 Broker 等信息。
ZooKeeper 帮助消费者组进行分区的分配,确保每个分区被一个消费者处理,防止重复消费。
监控 Kafka 集群的健康状态。
存储了 Kafka 的部分配置信息
最终结果
库存系统(消费者组 1): 减少商品库存。
配送系统(消费者组 2): 生成配送单。
消息独立消费完成: 确保订单消息被多个系统独立处理,实现高效可靠的任务分发。
总结
从 Producer 到 Partition: 生产者将订单消息发送到指定分区。
从 Partition 到多个消费者组: 不同消费者组独立消费相同的消息,完成各自的任务。
ZooKeeper 协调: 确保 Kafka 的分区与消费者组正常运行,实现高效分发。
Kafka 和 Flume 的简单区别
| 特点 | Kafka | Flume |
|---|---|---|
| 用途 | 数据传输 + 数据存储 | 专注于数据采集和传输 |
| 数据持久化 | 支持,消息存储在硬盘,保证数据可靠性 | 不支持,只负责传输数据 |
| 实时处理 | 支持,能与实时分析工具结合(如 Spark/Flink) | 不支持,只做数据传输 |
| 扩展性 | 支持高扩展性,适合大规模数据分发 | 限于单一任务,扩展能力较弱 |
| 场景 | 需要高吞吐量、实时处理和持久化的数据场景 | 适合简单的数据采集(如日志传输) |
1.3 Kafka 的应用场景
1)应用解耦
多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
传统方式:订单管理系统直接调用库存管理系统接口检查库存,接口失败会导致订单处理失败。
引入消息队列:
订单管理系统将订单信息发布到消息队列。
库存管理系统订阅队列,读取订单消息,检查库存。
优势:即使库存系统故障,订单消息仍被保留,库存系统恢复后可继续处理,保证流程完整性,提高可靠性和稳定性。

2) 异步处理
多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;


电商系统包括以下应用程序:订单服务、支付服务、物流服务和通知服务。
传统处理方式
按顺序依次调用各服务。
每个服务需等待前一服务完成,导致处理延迟。
引入消息队列后
订单服务将订单消息放入队列后立即返回响应。
支付、物流、通知服务并发从队列获取消息,独立处理。
改进效果
并发处理:各服务无需等待,大幅缩短处理时间。
解耦服务:服务故障或延迟不影响整体系统。
可扩展性:通过调整队列容量和消费者数量应对负载变化。
结果:系统响应更快,吞吐量更高,可靠性和伸缩性更强。
3)限流削峰
在秒杀或抢购活动中,为应对高并发压力,可使用消息队列进行限流削峰:
问题:数千用户同时抢购可能导致系统崩溃。
解决:通过消息队列暂存请求,限制每秒处理固定数量(如100个),超出的请求直接拒绝并提示用户稍后再试。
效果:平稳处理高峰流量,避免系统宕机。

4)消息通信和协调
消息队列用于分布式系统中的消息通信和协调:
应用场景:订单处理。
订单服务将订单信息发布到消息队列。
支付服务订阅消息,处理支付。
库存服务订阅消息,更新库存。
效果:服务异步通信,提升性能和可靠性。

二、Kafka的安装与使用
安装要求:
已安装
Hadoop集群已安装
zookeeper集群 (保存元数据)
2.1 Kafka 单机安装与配置步骤
略过
2.2 Kafka 集群安装与配置步骤
Kafka 集群与 Broker 简介
1. 什么是 Kafka 集群?
Kafka 集群由多个 Broker 实例组成,每个 Broker 是 Kafka 的独立节点。
集群的设计目的是提高系统的可靠性、容错性以及处理高并发消息的能力。
2. 为什么需要多个 Broker?
提升可靠性:即使一个 Broker 故障,其他 Broker 仍能继续工作,保证服务不中断。
实现伸缩性:通过增加 Broker,可以处理更多的消息,满足业务增长需求。
实现负载均衡:多个 Broker 分担消息存储和处理压力,提升整体性能。
3. 如何构建 Kafka 集群?
在多台机器上启动多个 Broker 实例:每个 Broker 实例运行在一台服务器上,分散负载。
Broker 自动组成集群:Kafka 会将多个 Broker 自动连接成一个集群,处理消息的存储和分发。
4. 示例:三节点 Kafka 集群
启动三个 Broker 实例,每个实例运行在一台服务器上(如
master,slave1,slave2)。Kafka 将自动管理分区和副本,增强系统的容错性和并发处理能力。
2.2.1 集群规划
| ip | 主机名 | kafka角色 | 备注 |
|---|---|---|---|
| 192.168.36.100 | master | broker1 | 启动broker角色 |
| 192.168.36.101 | slave1 | broker2 | 启动broker角色 |
| 192.168.36.102 | slave2 | broke3r | 启动broker角色 |
2.2.2 准备工作
1. 安装 JDK 和 Hadoop
任务:确保 JDK 和 Hadoop 环境正常安装并配置。
2. 安装和启动 ZooKeeper
任务:配置并启动 ZooKeeper,为 Kafka 提供元数据管理和协调服务。
内容:元数据,包括 Broker 信息、Topic 配置、分区和副本分配、Leader 选举状态 等。
注意:Kafka 2.8+ 的新特性
ZooKeeper 不再必要:Kafka 2.8 及以上版本引入 KRaft(Kafka Raft),替代 ZooKeeper 管理集群元数据。
推荐:如果使用 Kafka 2.8 或更新版本,可跳过 ZooKeeper 的安装,直接配置 Kafka 的 KRaft 模式。
2.2.3 集群配置
2.2.3.1 下载
下载地址:https://archive.apache.org/dist/kafka/
下载kafka_2.12-2.4.1.tgz 放入master主机的/opt/software中
2.12是Scala的版本号;2.4.1是Kafka的版本号
2.2.3.2 解压
cd /opt/software
# 解压
tar -zvxf kafka_2.12-2.4.1.tgz -C /opt/apps
cd /opt/apps
# 改名
mv kafka_2.12-2.4.1 kafka2.2.3.3 配置环境变量
1.修改master,slave1,slave2的环境变量
# 在master,slave1,slave2节点上编辑环境变量
vi /etc/profile
# 在文件末尾追加如下内容:
# Kafka
export KAFKA_HOME=/opt/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin
# 三台主机上依次运行以下命令,使环境变量生效
source /etc/profile
# 查看环境变量
echo $PATH2.创建Kafka存储消息日志数据(包括主题内容和索引)的目录,用于持久化消息数据。
# 在master节点上操作
cd /opt/apps/kafka
mkdir data3.修改的三个配置文件
在 Kafka 的 config 目录下,有以下 3 个主要配置文件与主题、消息生产和消费相关:
server.properties作用:配置 Kafka 服务端(Broker)参数。
内容:包括 Broker ID、日志存储路径(
log.dirs)、分区和副本设置,以及 Zookeeper 连接信息等服务端相关设置。producer.properties作用:配置 Kafka 消息生产者参数。
内容:包括 Kafka 集群地址(
bootstrap.servers)、序列化方式(key.serializer和value.serializer)、ACK 级别(acks)等生产相关配置。consumer.properties作用:配置 Kafka 消费者参数。
内容:包括 Kafka 集群地址(
bootstrap.servers)、消费者组 ID(group.id)、消息自动提交偏移量(enable.auto.commit)等消费相关配置。
在master中进行配置
2.2.3.4 配置(server.properties)
cd /opt/apps/kafka/config
# 修改服务端配置
vi server.properties# 修改以下四个关键配置参数
# 设置 broker.id 编号,集群模式下该 ID 必须唯一,且不可更改。
# 注意:每个 Kafka Server 加入集群时需要设置一个唯一的 ID(从 0 开始逐个递增)。如果设置重复会报错。
broker.id=1
# 配置 Kafka 服务器监听的网络地址和端口。
# 格式:协议://地址:端口
# PLAINTEXT:表示明文传输协议。
# 作用:Kafka 服务器监听客户端(生产者和消费者)连接请求,通过明文协议传输和接收消息。
listeners=PLAINTEXT://192.168.36.100:9092
# 指定 Kafka 存储消息日志数据(包括主题内容和索引)的目录,用于持久化消息数据。
log.dirs=/opt/apps/kafka/data
# 配置 ZooKeeper 集群连接信息及 Kafka 元数据存储路径。
# 作用:每个 broker 使用 ZooKeeper 保存元数据信息,如分区 Leader 信息。
# 路径 /kafka 用于存储 Kafka 的元数据。
zookeeper.connect=master:2181,slave1:2181,slave2:2181/kafka
# 允许删除 Kafka 中的主题。
# 默认为 false,设置为 true 以支持删除主题。
delete.topic.enable=true2.2.3.5 配置生产者(producer.properties)
cd /opt/apps/kafka/config/
# 修改生产者端配置
vi producer.properties# 指定 Kafka 客户端(生产者或消费者)连接 Kafka 集群的地址列表,用于建立初始连接。
# 格式为 "主机名:端口号",多个地址用逗号分隔
bootstrap.servers=master:9092,slave1:9092,slave2:9092
# 指定消息键的序列化方式,将键序列化为字符串
key.serializer=org.apache.kafka.common.serialization.StringSerializer
# 指定消息值的序列化方式,将值序列化为字符串
value.serializer=org.apache.kafka.common.serialization.StringSerializer解释:
配置 Kafka Broker集群地址列表,格式为 "主机名:端口号",多个地址用逗号分隔。 示例:
master:9092,slave1:9092,slave2:9092。Kafka 客户端随机选择一个地址作为连接点,若连接失败会自动尝试其他地址,实现负载均衡和故障转移。
此参数仅用于初始连接,一旦连接成功,客户端会获知整个集群的拓扑信息。
序列化? 序列化是把数据(如字符串、整数)转换成字节流,方便网络传输或存储。
反序列化? 反序列化是把收到的字节流还原成可读数据(如字符串、整数)。
Kafka 中:消费者从 Kafka 接收字节流后将其还原成原始格式。
作用:反序列化器将字节流解包成原始数据,供消费者使用。
Kafka 中:生产者将消息键和值转成字节流后发送。
使用:序列化器将消息打包成字节流,供生产者发送。。
2.2.3.6 配置消费者(consumer.properties)
cd /opt/apps/kafka/config
# 修改消费者端配置
vi consumer.properties# 指定 Kafka 客户端(生产者或消费者)连接 Kafka 集群的地址列表,用于建立初始连接。
# 格式为 "主机名:端口号",多个地址用逗号分隔
bootstrap.servers=master:9092,slave1:9092,slave2:9092
# 指定消息键的反序列化方式,将消息键反序列化为字符串
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 指定消息值的反序列化方式,将消息值反序列化为字符串
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer2.2.3.7 分发
分发已经配置好的kafka目录到slave1和slave2节点的/opt/apps目录下。
# 在master上操作
scp -r /opt/apps/kafka/ slave1:/opt/apps/
scp -r /opt/apps/kafka/ slave2:/opt/apps/修改slave1节点上server.properties中的id和host:
# 进入slave1
cd /opt/apps/kafka/config
vi server.properties
# 修改以下2处内容:
# 设置broker.id编号
broker.id=2
# Kafka Broker 通过192.168.36.101的9092 端口监听客户端(生产者或消费者)的连接请求。
# 使用明文传输协议(无加密或认证) PLAINTEXT 进行通信。
# 主机地址为 slave1。
# 即:客户端(生产者或消费者) -> 通过 slave1:9092 -> 连接到 Kafka Broker,发送或接收消息。
listeners=PLAINTEXT://slave1:9092修改slave2节点上server.properties中的id和host:
# 进入slave2
cd /opt/apps/kafka/config
vim server.properties
# 修改以下2处内容:
# 设置broker.id编号
broker.id=3
# Kafka Broker 通过 9092 端口监听客户端(生产者或消费者)的连接请求。
# 使用明文传输协议 PLAINTEXT 进行通信。
# 主机地址为 slave1。
# 即:客户端(生产者或消费者) -> 通过 slave1:9092 -> 连接到 Kafka Broker,发送或接收消息。
listeners=PLAINTEXT://slave2:9092kafka的集群配置完成。
2.3 Kafka集群启停
2.3.1 启动 ZooKeeper
启动 ZooKeeper的两种方式
1)集群独立安装的 ZooKeeper
# 启动 ZooKeeper(建议在集群中使用独立安装的 ZooKeeper)
zkServer.sh start2.3.2 启动 Kafka 集群
所有三台机器都需启动 Kafka Broker
后台运行(守护进程模式,带日志输出在终端)
kafka-server-start.sh /opt/apps/kafka/config/server.properties &
特点:Kafka 以后台方式运行,但日志会打印到终端窗口。
限制:如果关闭终端窗口,Kafka 服务也会停止。
适用场景:临时运行,不建议用于生产环境。
2.3.3 运行命令查看集群信息
# 通过连接master:9092,查看Kafka集群中Broker的API版本信息及集群元数据(如ClusterId和Broker列表)。
kafka-broker-api-versions.sh --bootstrap-server master:90922.3.4 关闭 Kafka 和 ZooKeeper** 集群
1)关闭 Kafka
# 挂起运行时
# 即 Kafka 是以前台挂起模式运行(没有 `&` 或 `-daemon`)
ctrl+c
# 后台运行时
# 即 Kafka 是以后台模式运行(通过 `&` 或 `-daemon` 启动)
kafka-server-stop.sh2)关闭 ZooKeeper
# 挂起运行时
# 即 ZooKeeper 是以前台挂起模式运行(没有 `&` 或 `-daemon`)
ctrl+c
# 后台运行时
# 即 ZooKeeper 是以后台模式运行(通过 `&` 或 `-daemon` 启动)
zookeeper-server-stop.sh2.3.5 实操:kafka集群启停操作
1.依次在三个节点分别启动zookeeper集群。
[root@master kafka]$ zkServer.sh start
[root@slave1 kafka]$ zkServer.sh start
[root@slave2 kafka]$ zkServer.sh start2.依次在三个节点启动kafka的broker
# 建议第一次启动使用挂起命令,便于查找错误
# 加载./config/server.properties配置文件
cd /opt/apps/kafka/
[root@master kafka]# kafka-server-start.sh ./config/server.properties &
[root@slave1 kafka]# kafka-server-start.sh ./config/server.properties &
[root@slave2 kafka]# kafka-server-start.sh ./config/server.properties &
# kafka能正常启动后,建议以后按以下的方法后台启动进程
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties


2.3.6 集群校验
依次测试每台节点的进程是否启动(三台机器开新窗口)
====master
[hadoop@master kafka]# jps
3109 Jps
2713 QuorumPeerMain
2780 Kafka
=====slave1
[hadoop@slave1 kafka]# jps
2643 Kafka
2967 Jps
2603 QuorumPeerMain
=====slave2
[hadoop@slave2 kafka]# jps
2690 Kafka
2658 QuorumPeerMain
3014 Jps错误坑点:
Kafka 创建 Topic 报错:
replication factor: 3 larger than available brokers: 1
原因
Topic 副本数超过 Broker 数量:
设置
replication.factor=3,但只有 1 个 Broker 启动。集群 ID 不匹配:
Kafka 启动时,
meta.properties文件中的cluster.id不一致。
解决方案
方法 1:清理日志目录
找到
server.properties中的log.dirs路径:log.dirs=/opt/apps/kafka/data删除日志目录中的所有文件:
rm -rf /opt/apps/kafka/data/*重启 Kafka。
方法 2:修改 meta.properties
找到
meta.properties文件所在路径:/opt/apps/kafka/data/meta.properties修改
cluster.id为与其他 Broker 一致。
总结
测试环境:清空日志目录(方法 1)。
生产环境:修改
cluster.id保留数据(方法 2)。
关闭kafka集群
# 方法一:脚本
kafka-server-stop.sh
# 方法二:强制
kill -9 pid三、kafka实例操作
kafka集群启动成功后,我们就可以对kafka集群进行操作了。
3.1 创建主题
任务:在 Kafka 集群中创建一个名为 test 的新主题
目标:主题 test,1 个分区,3 个副本。
第一种方法:推荐方法(适用于 Kafka 2.2 及以上版本)
kafka-topics.sh --create \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic test --partitions 1 --replication-factor 3参数解释
kafka-topics.sh:Kafka 主题管理脚本。--create:表示创建主题。--bootstrap-server:指定 Kafka 集群的 Broker 地址列表(多个地址用逗号分隔)。--topic:指定主题名称(此处为test)。--partitions:指定分区数(此处为 1)。--replication-factor:指定副本数(此处为 3,表示每个分区有 3 个副本)。
注意事项
主题名称必须唯一,不能与现有主题重复。
分区(Partition) 是生产/消费的基本单位,对应 Kafka 服务端的一个目录(例如:
主题名称-分区编号),目录中存放数据文件和索引文件。
简化命令
kafka-topics.sh --create \
--bootstrap-server master:9092 \
--topic test --partitions 1 --replication-factor 3区别
多节点连接:
--bootstrap-server master:9092,slave1:9092,slave2:9092指定多个 Kafka Broker,客户端可以连接任意一个 Broker,增强容错性和可用性。
单节点连接:
--bootstrap-server master:9092只指定一个 Kafka Broker。如果该 Broker 不可用,客户端将无法连接到集群。
创建主题时的警告信息
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
解释
原因:由于 Kafka 内部的指标(Metrics)名称限制,含有点(
.)或下划线(_)的主题在某些情况下可能会被视为相同,从而导致冲突。影响:如果同时使用点和下划线作为主题名称的一部分,可能会造成监控和指标系统中的数据错误或冲突。
建议:
主题名称中只使用点(
.)或下划线(_),不要两者混用。统一命名规则,保持一致性,避免冲突。
3.2 查看所有主题列表
kafka-topics.sh --list --bootstrap-server master:9092,slave1:9092,slave2:9092
# 简化命令,只指定一个节点作为引导服务器,用于获取集群中的主题列表。
kafka-topics.sh --list --bootstrap-server master:90923.3 查看所有topic的详细信息
kafka-topics.sh --describe \
--bootstrap-server master:9092,slave1:9092,slave2:9092
##运行结果
Topic: test PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,23.4 查看指定topic的详细信息
kafka-topics.sh --describe \
--topic test \
--bootstrap-server master:9092,slave1:9092,slave2:9092
# 查看结果如下:
Topic: test PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
# 解释
Topic: test:这是主题的名称,即 "test"。
PartitionCount: 1:这个主题有一个分区。
ReplicationFactor: 3:这个主题的复制因子是 3,意味着每个分区都会有 3 个副本。
Configs: segment.bytes=1073741824:表示 Kafka 分区的日志存储会被切分成多个文件,每个文件最大为 1 GB。
Partition: 0:这是分区的编号,这个主题只有一个分区,所以编号是 0。
Leader: 1 表示分区的当前领导者(Leader)的 ID,这个分区的领导者是 1 号服务器。
Replicas: 1,3,2 表示分区 0 有三个副本,分别存储在 Broker 1、3 和 2 上。
Isr: 1,3,2 此处说明分区 0 的所有副本(1、3、2)都处于同步状态。3.5 启动控制台生产者
# 通过slave1节点启动一个控制台生产者,建立与Kafka集群的连接,并将用户在控制台输入的消息发送到主题 "test"。
kafka-console-producer.sh --topic test --broker-list master:9092,slave1:9092,slave2:9092
# 解释
- `kafka-console-producer.sh`:Kafka 命令行工具,用于向主题发送消息。
- `--broker-list master:9092,slave1:9092,slave2:9092`:指定 Kafka 集群中 broker 的地址和端口,用逗号分隔。
- `--topic test`:指定发送消息的目标主题 `test`。3.6 启动控制台消费者
# 启动一个 Kafka 控制台消费者,从 master:9092 连接 Kafka 集群,订阅主题 test 并从头开始消费消息。
kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server master:9092,slave1:9092,slave2:9092
# 解释
- `kafka-console-consumer.sh`:Kafka 命令行工具,用于消费指定主题的消息。
- `--broker-list master:9092,slave1:9092,slave2:9092`:指定 Kafka 集群中 broker 的地址和端口,用逗号分隔。
- `--topic test`:指定目标主题 `test`。
- `--from-beginning`:从主题的起始位置消费所有历史消息。 3.7 生产、消费数据
# 在slave1上(生产者控制台)输入(生产)数据
>hello kafka
# 在master上(消费者控制台)接收(消费)数据
hello kafka
# 表示消费端成功消费了生产者生产的消息!注意:Apache Kafka接收到的每一条消息都存储在日志中,默认情况下,它保持消息168小时,即7天。
3.8 删除主题
在 Kafka 配置中,server.properties配置文件中
log.retention.hours=168 :表示日志文件即消息被写入到 Kafka 主题的某个特定分区后开始计时,该消息将至少被保留在服务器上 168 小时(即 7 天)
log.retention.hours=-1:消息永久保存
1.删除主题 test:
kafka-topics.sh --delete \
--topic test \
--bootstrap-server master:9092,slave1:9092,slave2:90924. 验证删除状态
使用以下命令查看主题列表,确认主题是否仍然存在:
kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092四、实验
实验任务
配置并启动 ZooKeeper 和 Kafka 集群。
创建、查看、描述、删除主题。
使用生产者发送消息,消费者读取消息。
验证消费者组的分区分配和负载均衡。
学习多种消费方式并记录日志。
删除主题并清理实验环境。
实验目标
掌握 Kafka 的启动与主题管理。
理解消息生产与消费流程。
熟悉消费者组机制及负载均衡。
掌握不同消费方式和环境清理操作。
实验结束后,学员可熟练使用 Kafka 的基础功能。
一、启动环境
1. 启动 ZooKeeper
# 依次在三台主机上操作
zkServer.sh start
# 查看 ZooKeeper 的状态
zkServer.sh status2. 启动 Kafka
# 依次在三台主机上操作
cd /opt/apps/kafka
./bin/kafka-server-start.sh ./config/server.properties &二、实验操作
3. 查看已存在的主题
cd /opt/apps/kafka
./bin/kafka-topics.sh --list --bootstrap-server master:9092,slave1:9092,slave2:9092 4. 创建主题
创建一个主题 user_logs,用于模拟用户日志处理:
./bin/kafka-topics.sh --create \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--partitions 3 \
--replication-factor 25. 查看主题的详细信息
# 查看所有主题的明细
./bin/kafka-topics.sh --describe --bootstrap-server master:9092,slave1:9092,slave2:9092
# 查看指定主题的明细
./bin/kafka-topics.sh --describe --topic user_logs \
--bootstrap-server master:90926. 消费与生产实验
6.1 启动生产者,发送消息
# 在 slave1 节点启动生产者,发送模拟用户日志数据
./bin/kafka-console-producer.sh \
--broker-list master:9092,slave1:9092,slave2:9092 \
--topic user_logs6.2 单一消费者的基本操作
# 在 master 节点启动消费者,消费 user_logs 主题的消息
./bin/kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs7. 消费者组实验
7.1 启动多个消费者,加入同一组
# 在 master 节点启动第一个消费者,加入组 user_logs_group
./bin/kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--group user_logs_group
# 在 slave1 节点启动第二个消费者,加入组 user_logs_group
./bin/kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--group user_logs_group7.2 验证分区分配
打开生产者,发送消息:
./bin/kafka-console-producer.sh \
--broker-list master:9092,slave1:9092,slave2:9092 \
--topic user_logs观察两个消费者终端的消息分布,验证分区分配。
7.3 验证分区重新分配
停止其中一个消费者(
Ctrl+C)。观察剩余消费者是否接管分区数据。
7.4 查看消费者组状态
./bin/kafka-consumer-groups.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--group user_logs_group --describe8. 消费数据的不同方式
8.1 从最早的消息开始消费
./bin/kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:9092 --from-beginning8.2 从最新的消息开始消费
./bin/kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:90928.3 在后台运行消费者
nohup ./bin/kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:9092 > /var/log/kafka/consumer.log 2>&1 &9. 删除主题(实验结束后)
9.1 删除主题
./bin/kafka-topics.sh --delete \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs9.2 确认主题删除
./bin/kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092【知识点】
五、补充知识
5.1 Topic和Partition的关系
Topic(主题):
Topic是Kafka中消息的逻辑容器或类别。
消息被发布到特定的Topic中,并且消费者订阅Topic以接收其中的消息。
Topic的名称用于标识和区分不同类型的消息数据,通常与业务或应用程序相关。
Partition(分区):
Partition是Topic的一个物理分片或子集。
每个Topic可以分为多个Partition,每个Partition都是独立的存储单元。
Partition解决了单台物理机存储和处理大量数据的限制,允许数据在多个分区中分散存储和处理。
关于Topic和Partition的关系:
一个Topic可以有多个Partition,这些Partition可以分布在Kafka集群中的不同Broker上。这允许Topic的数据分布在多台机器上,以实现横向扩展和高吞吐量。
当消息被生产者发布到Topic时,Kafka会根据一定的分区策略将消息写入特定的Partition。这可以是手动指定的分区,也可以是自动轮询分区或根据消息的Key进行分区。
消费者订阅Topic时,它们实际上订阅的是Topic的所有Partition。每个消费者会负责消费一个或多个Partition中的消息。
Partition的数量通常根据数据负载和性能需求进行规划。增加Partition数量可以提高并行性和负载均衡,但也需要更多的资源来管理。
5.2 Partition到底是什么?
【对于服务端而言】
Partition是我们进行生产/消费的真正实体,那么这个实体在服务端看起来是什么样子呢?

Partition在服务端是一个目录,其名称由主题名称和分区编号组成。
那我们进这个文件夹内部看一下,里面存放了些什么东西的?

Partition包含以下内容:
.log文件:这是用于进行读写的物理数据文件。所有消息的实际数据都存储在这些日志文件中,按顺序追加。
.index文件:稀疏索引文件,通过消息的offset进行构建的稀疏索引。
.timeindex文件:稀疏索引文件,通过消息的写入/创建时间进行构建的稀疏索引。
checkpoint 检查点文件:在Kafka中用于记录分区的消费者状态信息,以实现恢复和持久化。
Partition在服务端就是一个TopicPartition,本质是一个文件夹,里面放着一些数据文件和对应的索引文件。
【对于生产者而言】
生产者在生产数据时会将数据写入特定TopicPartition的数据文件内。
生产者可以将消息发送到指定分区、自动轮询分区或根据指定的Key来确定发送到哪个分区。
【对于消费者而言,分区是什么】
消费者需要知道要消费的具体分区才能获取数据。
消费者可以手动指定分区或自动分配分区来进行消费。
【对于整个Kafka集群而言,分区的意义是什么】
在生产和消费Kafka中的Topic数据时,当数据量逐渐增加导致单台物理机的磁盘容量不足时,可以考虑横向扩容,即通过增加机器的方式来分散数据。这是分区设计的核心概念,通过将Topic的数据分片成多个Partition,并将这些Partition分布到集群中的不同Broker上,实现了分布式存储,