李翔-大数据技术

Big data technology!

第10章 Kafka的部署与使用

Kafka安装与部署


一、Kakfa的基础知识

1.1 什么是 Kafka

Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。


消息队列的两种方式

点对点模式

点对点模式下包括三个角色

  • 消息队列queue

  • 发送者 (生产者)

  • 接收者(消费者)

    输入图片说明

    消息发送者生产消息发送到 queue 中,然后消息接收者从 queue 中取出并且消费消息。消息被消费以后, queue 中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

「点对点模式特点:」

  • 这种模式涉及到一个发送者(生产者),一个消息队列,和一个接收者(消费者)。

  • 消息发送到队列中,然后被单个消费者接收和消费。一旦消息被消费,它就从队列中消失。

  • 这种模式适用于确保消息只被一个消费者处理的场景。


发布/订阅模式

发布 / 订阅模式下包括三个角色:

  • 主题        (Topic

  • 发布者      (Publisher)

  • 订阅者      (Subscriber)

发布者将消息发送到 Topic, 系统将这些消息传递给多个订阅者。

发布 / 订阅模式特点:

  • 在发布-订阅模型中,消息发送者(发布者)将消息发布到一个主题(Topic)。

  • 消息接收者(订阅者)可以选择订阅感兴趣的主题,并从该主题接收消息。

  • 当发布者发布一条消息到主题时,所有订阅了该主题的订阅者都会接收到这条消息。

  • 每个订阅者都可以独立地处理消息,发布者不需要关心具体的订阅者。

  • 这种模型适用于一对多的通信模式,其中发布者可以将消息广播给所有订阅者。

输入图片说明


1.2 kafka架构与工作原理

img


Kafka的相关概念:

Producer(生产者)

  • 定义:负责创建消息并将其发布到 Kafka 的 Topic 中。

  • 作用:生产者将数据发送到 Kafka 集群的 Broker中的Topic 。

Consumer(消费者)

  • 定义:读取 Kafka 中消息的客户端。

  • 作用:从指定的 Topic 中读取消息并处理它。

Consumer Group(消费者组)

  • 定义:多个消费者可以组成一个组,Kafka 将 Topic 中的分区分配给组内的消费者,保证每条消息在组内只能被一个消费者消费。

  • 作用:同一 Topic 可以被不同组独立消费,而组内的消费者分工处理分区的数据。

Broker(代理)

  • 定义:Kafka 集群中的服务器节点。master、slave1、slave2 部署了 Kafka Broker 的三个节点。

  • 作用:负责存储和管理 Topic 的一部分分区数据,并处理来自生产者和消费者的请求。

Topic(主题)

  • 定义:Kafka 中的消息分类标签,类似于文件夹的功能,用于存储和管理一类消息。

  • 作用:用户只需指定 Topic 名称即可生产或消费数据,而不必关心物理存储的细节。

  • 举例:在一个电商系统中,可以有以下几个 Topic:

    • 订单Topic:存储所有订单数据。

    • 用户行为Topic:存储用户的点击、浏览等行为数据。

    • 库存Topic:存储库存更新信息。

Partition(分区)

  • 定义:Topic 被划分成的多个子集,每个 Partition 内的消息是有序的。

  • 作用:分布式存储和并行处理。消费者数量一般小于或等于分区数量。

  • 形式:分区在底层表现为服务器上的文件夹。

Replication(副本)

  • 定义:每个 Partition 会有多个副本,用于高可用性和容错。

  • 作用:当一个 Broker 挂掉时,其他副本可以提供服务。

Replication-LeaderReplication-Follower

  • 定义:每个 Partition 的多个副本中,有一个 Leader 和多个 Follower。

  • 作用:

    • Leader:负责与生产者和消费者的所有读写交互。

    • Follower:仅复制 Leader 的数据,当 Leader 故障时,Follower 可自动提升为 Leader。

Offset(偏移量)

  • 定义:标识消息在 Partition 中的位置,从 0 开始递增。

  • 作用:消费者根据 Offset 知道该从哪里开始读取,确保数据读取的精准性和一致性。

  • 举例:每条消息在其分区中都有唯一的 Offset。Offset 就像是书籍的页码或 Excel 的行号

ZooKeeper

  • 定义:Kafka 使用 ZooKeeper 来存储元数据并协调集群操作。

  • 作用:用于分区的 Leader 选举、消费者分组的管理,以及确保 Kafka 的高可用性。

  • 举例:元数据包括关于 Broker、Topic、Partition、Leader/Follower 的状态信息


1.2.1 Kafka 在电商行业的应用

1. 订单管理:处理订单消息

场景:

  • 用户在电商平台下单。

  • 系统需要通知多个部门(比如库存和配送)进行处理。

Kafka 如何工作:

1) 生产者(订单系统)

  • 下单后,订单系统将订单信息发送到 Kafka 的 订单Topic

  • 比如订单内容是:订单#1001,用户A购买了商品B

2) 消费者组(库存系统和配送系统)

  • 库存系统:从 订单Topic 读取订单信息,减少库存。

  • 配送系统:从 订单Topic 读取订单信息,生成配送单。

简单比喻:

  • Kafka 是一个“订单广播站”,订单系统把订单消息放到广播站,库存部门和配送部门从广播站拿到订单,分别完成自己的任务。



2. 用户行为分析:推荐商品

场景:

  • 用户在网站浏览商品、点击页面、搜索关键字。

  • 系统需要分析用户的行为,推荐个性化商品。

Kafka 如何工作:

1) 生产者(前端系统)

  • 用户的每个行为(比如浏览商品C,搜索“手机”)都会被发送到 Kafka 的 用户行为Topic

  • 消息内容可能是:用户A点击了商品C用户B搜索了“手机”

2) 消费者组(推荐系统和数据分析系统)

  • 推荐系统:实时读取用户行为,更新推荐算法,展示相关商品。

  • 数据分析系统:分析用户浏览和点击数据,统计热门商品。

简单比喻:

  • Kafka 是一个“用户行为记录本”,记录下每个用户的操作。推荐系统根据记录实时推荐,数据分析系统统计这些记录,用于改进服务。



3. 实时数据分析:发现销售趋势

场景:

  • 需要实时分析商品销售数据,监控热门商品和销售趋势。

Kafka 如何工作:

1) 生产者(订单系统和用户行为系统)

  • 将订单数据和用户行为数据发送到 Kafka 的 订单Topic用户行为Topic

2) 消费者(数据分析系统)

  • 数据分析系统从多个 Topic 读取数据,进行实时分析,比如:

    • 哪些商品销售火爆?

    • 哪些页面点击量最高?

简单比喻:

  • Kafka 是“销售数据管道”,将订单和行为数据汇总到分析系统,实时生成趋势报告。


总结

Kafka 在电商行业中,就像是一个“消息分发中心”,无论是订单处理、用户行为分析,它都负责高效、可靠地传递消息给相关系统,让各部门能够实时协同工作。

通俗比喻:Kafka 是消息的快递员,确保每条信息快速、安全地送到需要的人手中!




img


1.2.2 电商订单场景 Kafka 数据流分析

场景背景

用户在电商平台下单后,订单数据通过 Kafka 系统发送给多个消费者(如库存系统和配送系统),确保订单相关任务能够独立完成。


数据流分析

1. 生产者发送消息

Producer A(订单系统)生成订单消息:

  • 订单#1001:用户A购买了商品B

  • 订单#1002:用户C购买了商品D

消息被发送到 Topic A 的分区:

  • 订单#1001 存储在 Partition 0(Leader 分区,位于 Broker1)。

  • 订单#1002 存储在 Partition 1(Leader 分区,位于 Broker2)。


2. 分区存储与复制

消息存储:

  • Partition 0 的 Leader 存储在 Broker1,保存订单#1001。

  • Partition 1 的 Leader 存储在 Broker2,保存订单#1002。

消息复制:

  • Partition 0 的数据从 Broker1 复制到 Broker2(Follower 分区)。

  • Partition 1 的数据从 Broker2 复制到 Broker1(Follower 分区)。


3. 消费者读取消息

多个消费者组:

  • 消费者组 1(库存系统): 负责更新库存。

  • 消费者组 2(配送系统): 负责生成配送单。

独立消费:

  • 消费者组 1(库存系统):

    • Partition 0 读取 订单#1001,并减少库存。

    • Partition 1 读取 订单#1002,并减少库存。

  • 消费者组 2(配送系统):

    • Partition 0 读取 订单#1001,并生成配送单。

    • Partition 1 读取 订单#1002,并生成配送单。


4. 消息偏移量管理

Offset 跟踪:

  • 每个消费者组独立维护 Offset:

    • 消费者组 1(库存系统): 读取 Offset 1(已处理订单#1001 和订单#1002)。

    • 消费者组 2(配送系统): 读取 Offset 1(已处理订单#1001 和订单#1002)。

故障恢复:

  • 消费者组的 Offset 记录在 Kafka 中,发生故障时可从上次的消费位置继续处理。


5. ZooKeeper 协调

  • 管理 Kafka 的元数据:如记录分区的 Leader 和 Follower 信息;Topic 的分区数量、每个分区对应的 Broker 等信息。

  • ZooKeeper 帮助消费者组进行分区的分配,确保每个分区被一个消费者处理,防止重复消费。

  • 监控 Kafka 集群的健康状态。

  • 存储了 Kafka 的部分配置信息


最终结果

  1. 库存系统(消费者组 1): 减少商品库存。

  2. 配送系统(消费者组 2): 生成配送单。

  3. 消息独立消费完成: 确保订单消息被多个系统独立处理,实现高效可靠的任务分发。


总结

  1. 从 Producer 到 Partition: 生产者将订单消息发送到指定分区。

  2. 从 Partition 到多个消费者组: 不同消费者组独立消费相同的消息,完成各自的任务。

  3. ZooKeeper 协调: 确保 Kafka 的分区与消费者组正常运行,实现高效分发。



Kafka 和 Flume 的简单区别


特点KafkaFlume
用途数据传输 + 数据存储专注于数据采集和传输
数据持久化支持,消息存储在硬盘,保证数据可靠性不支持,只负责传输数据
实时处理支持,能与实时分析工具结合(如 Spark/Flink)不支持,只做数据传输
扩展性支持高扩展性,适合大规模数据分发限于单一任务,扩展能力较弱
场景需要高吞吐量、实时处理和持久化的数据场景适合简单的数据采集(如日志传输)



1.3 Kafka 的应用场景

1)应用解耦

多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;

传统方式:订单管理系统直接调用库存管理系统接口检查库存,接口失败会导致订单处理失败。

引入消息队列

  • 订单管理系统将订单信息发布到消息队列。

  • 库存管理系统订阅队列,读取订单消息,检查库存。

优势:即使库存系统故障,订单消息仍被保留,库存系统恢复后可继续处理,保证流程完整性,提高可靠性和稳定性。

image-20230518164456301


2) 异步处理

多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;

image-20230521165644357


image-20230521171118303


电商系统包括以下应用程序:订单服务、支付服务、物流服务和通知服务。

传统处理方式

  • 按顺序依次调用各服务。

  • 每个服务需等待前一服务完成,导致处理延迟。

引入消息队列后

  • 订单服务将订单消息放入队列后立即返回响应。

  • 支付、物流、通知服务并发从队列获取消息,独立处理。

改进效果

  • 并发处理:各服务无需等待,大幅缩短处理时间。

  • 解耦服务:服务故障或延迟不影响整体系统。

  • 可扩展性:通过调整队列容量和消费者数量应对负载变化。

结果:系统响应更快,吞吐量更高,可靠性和伸缩性更强。


3)限流削峰

在秒杀或抢购活动中,为应对高并发压力,可使用消息队列进行限流削峰

  • 问题:数千用户同时抢购可能导致系统崩溃。

  • 解决:通过消息队列暂存请求,限制每秒处理固定数量(如100个),超出的请求直接拒绝并提示用户稍后再试。

  • 效果:平稳处理高峰流量,避免系统宕机。

image-20230518170043638


4)消息通信和协调

消息队列用于分布式系统中的消息通信和协调:

  • 应用场景:订单处理。

    • 订单服务将订单信息发布到消息队列。

    • 支付服务订阅消息,处理支付。

    • 库存服务订阅消息,更新库存。

效果:服务异步通信,提升性能和可靠性。

image-20241116143306582


二、Kafka的安装与使用


安装要求:

  • 已安装Hadoop集群

  • 已安装zookeeper集群 (保存元数据)

2.1 Kafka 单机安装与配置步骤

略过

2.2 Kafka 集群安装与配置步骤

Kafka 集群与 Broker 简介

1. 什么是 Kafka 集群?

  • Kafka 集群由多个 Broker 实例组成,每个 Broker 是 Kafka 的独立节点。  

  • 集群的设计目的是提高系统的可靠性、容错性以及处理高并发消息的能力。

2. 为什么需要多个 Broker?

  • 提升可靠性:即使一个 Broker 故障,其他 Broker 仍能继续工作,保证服务不中断。  

  • 实现伸缩性:通过增加 Broker,可以处理更多的消息,满足业务增长需求。  

  • 实现负载均衡:多个 Broker 分担消息存储和处理压力,提升整体性能。

3. 如何构建 Kafka 集群?

  • 在多台机器上启动多个 Broker 实例:每个 Broker 实例运行在一台服务器上,分散负载。  

  • Broker 自动组成集群:Kafka 会将多个 Broker 自动连接成一个集群,处理消息的存储和分发。  

4. 示例:三节点 Kafka 集群

  • 启动三个 Broker 实例,每个实例运行在一台服务器上(如 master,slave1,slave2)。  

  • Kafka 将自动管理分区和副本,增强系统的容错性和并发处理能力。  


2.2.1 集群规划


ip主机名kafka角色备注
192.168.36.100masterbroker1启动broker角色
192.168.36.101slave1broker2启动broker角色
192.168.36.102slave2broke3r启动broker角色



2.2.2 准备工作

1. 安装 JDK 和 Hadoop

  • 任务:确保 JDK 和 Hadoop 环境正常安装并配置。

2. 安装和启动 ZooKeeper

  • 任务:配置并启动 ZooKeeper,为 Kafka 提供元数据管理和协调服务。

  • 内容:元数据,包括 Broker 信息、Topic 配置、分区和副本分配、Leader 选举状态 等。

注意:Kafka 2.8+ 的新特性

  • ZooKeeper 不再必要:Kafka 2.8 及以上版本引入 KRaft(Kafka Raft),替代 ZooKeeper 管理集群元数据。

  • 推荐:如果使用 Kafka 2.8 或更新版本,可跳过 ZooKeeper 的安装,直接配置 Kafka 的 KRaft 模式。


2.2.3 集群配置

2.2.3.1 下载

下载地址:https://archive.apache.org/dist/kafka/

下载kafka_2.12-2.4.1.tgz 放入master主机的/opt/software

2.12是Scala的版本号;2.4.1是Kafka的版本号


2.2.3.2 解压
cd /opt/software
# 解压
tar -zvxf kafka_2.12-2.4.1.tgz -C /opt/apps

cd /opt/apps
# 改名
mv kafka_2.12-2.4.1  kafka


2.2.3.3 配置环境变量

1.修改master,slave1,slave2的环境变量

# 在master,slave1,slave2节点上编辑环境变量
vi /etc/profile

# 在文件末尾追加如下内容:
# Kafka
export KAFKA_HOME=/opt/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin

# 三台主机上依次运行以下命令,使环境变量生效
source /etc/profile
# 查看环境变量
echo $PATH


2.创建Kafka存储消息日志数据(包括主题内容和索引)的目录,用于持久化消息数据。

# 在master节点上操作
cd /opt/apps/kafka
mkdir data


3.修改的三个配置文件

在 Kafka 的 config 目录下,有以下 3 个主要配置文件与主题、消息生产和消费相关:

  1. server.properties

    • 作用:配置 Kafka 服务端(Broker)参数。  

    • 内容:包括 Broker ID、日志存储路径(log.dirs)、分区和副本设置,以及 Zookeeper 连接信息等服务端相关设置。  

  2. producer.properties

    • 作用:配置 Kafka 消息生产者参数。  

    • 内容:包括 Kafka 集群地址(bootstrap.servers)、序列化方式(key.serializervalue.serializer)、ACK 级别(acks)等生产相关配置。  

  3. consumer.properties

    • 作用:配置 Kafka 消费者参数。  

    • 内容:包括 Kafka 集群地址(bootstrap.servers)、消费者组 ID(group.id)、消息自动提交偏移量(enable.auto.commit)等消费相关配置。  



master中进行配置

2.2.3.4 配置(server.properties)
cd /opt/apps/kafka/config
# 修改服务端配置
vi server.properties
# 修改以下四个关键配置参数

# 设置 broker.id 编号,集群模式下该 ID 必须唯一,且不可更改。
# 注意:每个 Kafka Server 加入集群时需要设置一个唯一的 ID(从 0 开始逐个递增)。如果设置重复会报错。
broker.id=1

# 配置 Kafka 服务器监听的网络地址和端口。
# 格式:协议://地址:端口
# PLAINTEXT:表示明文传输协议。
# 作用:Kafka 服务器监听客户端(生产者和消费者)连接请求,通过明文协议传输和接收消息。
listeners=PLAINTEXT://192.168.36.100:9092

# 指定 Kafka 存储消息日志数据(包括主题内容和索引)的目录,用于持久化消息数据。
log.dirs=/opt/apps/kafka/data

# 配置 ZooKeeper 集群连接信息及 Kafka 元数据存储路径。
# 作用:每个 broker 使用 ZooKeeper 保存元数据信息,如分区 Leader 信息。
# 路径 /kafka 用于存储 Kafka 的元数据。
zookeeper.connect=master:2181,slave1:2181,slave2:2181/kafka

# 允许删除 Kafka 中的主题。
# 默认为 false,设置为 true 以支持删除主题。
delete.topic.enable=true


2.2.3.5 配置生产者(producer.properties)
cd /opt/apps/kafka/config/
# 修改生产者端配置
vi producer.properties
# 指定 Kafka 客户端(生产者或消费者)连接 Kafka 集群的地址列表,用于建立初始连接。
# 格式为 "主机名:端口号",多个地址用逗号分隔
bootstrap.servers=master:9092,slave1:9092,slave2:9092

# 指定消息键的序列化方式,将键序列化为字符串
key.serializer=org.apache.kafka.common.serialization.StringSerializer

# 指定消息值的序列化方式,将值序列化为字符串
value.serializer=org.apache.kafka.common.serialization.StringSerializer

解释:

  1. 配置 Kafka Broker集群地址列表,格式为 "主机名:端口号",多个地址用逗号分隔。 示例:master:9092,slave1:9092,slave2:9092

  2. Kafka 客户端随机选择一个地址作为连接点,若连接失败会自动尝试其他地址,实现负载均衡和故障转移。

  3. 此参数仅用于初始连接,一旦连接成功,客户端会获知整个集群的拓扑信息。

  4. 序列化? 序列化是把数据(如字符串、整数)转换成字节流,方便网络传输或存储。

    反序列化? 反序列化是把收到的字节流还原成可读数据(如字符串、整数)。

    • Kafka 中:消费者从 Kafka 接收字节流后将其还原成原始格式。

    • 作用:反序列化器将字节流解包成原始数据,供消费者使用。

    • Kafka 中:生产者将消息键和值转成字节流后发送。

    • 使用:序列化器将消息打包成字节流,供生产者发送。。


2.2.3.6 配置消费者(consumer.properties)
cd /opt/apps/kafka/config
# 修改消费者端配置
vi consumer.properties
# 指定 Kafka 客户端(生产者或消费者)连接 Kafka 集群的地址列表,用于建立初始连接。
# 格式为 "主机名:端口号",多个地址用逗号分隔
bootstrap.servers=master:9092,slave1:9092,slave2:9092

# 指定消息键的反序列化方式,将消息键反序列化为字符串
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 指定消息值的反序列化方式,将消息值反序列化为字符串
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer


2.2.3.7 分发

分发已经配置好的kafka目录到slave1slave2节点的/opt/apps目录下。

# 在master上操作
scp -r /opt/apps/kafka/  slave1:/opt/apps/
scp -r /opt/apps/kafka/  slave2:/opt/apps/

修改slave1节点上server.properties中的idhost:

# 进入slave1
cd /opt/apps/kafka/config
vi server.properties

# 修改以下2处内容:
# 设置broker.id编号
broker.id=2

# Kafka Broker 通过192.168.36.101的9092 端口监听客户端(生产者或消费者)的连接请求。
# 使用明文传输协议(无加密或认证) PLAINTEXT 进行通信。
# 主机地址为 slave1。
# 即:客户端(生产者或消费者) -> 通过 slave1:9092 -> 连接到 Kafka Broker,发送或接收消息。
listeners=PLAINTEXT://slave1:9092

修改slave2节点上server.properties中的idhost:

# 进入slave2
cd /opt/apps/kafka/config
vim server.properties

# 修改以下2处内容:
# 设置broker.id编号
broker.id=3

# Kafka Broker 通过 9092 端口监听客户端(生产者或消费者)的连接请求。
# 使用明文传输协议 PLAINTEXT 进行通信。
# 主机地址为 slave1。
# 即:客户端(生产者或消费者) -> 通过 slave1:9092 -> 连接到 Kafka Broker,发送或接收消息。
listeners=PLAINTEXT://slave2:9092

kafka的集群配置完成。


2.3 Kafka集群启停

2.3.1 启动 ZooKeeper

启动 ZooKeeper的两种方式

1)集群独立安装的 ZooKeeper
# 启动 ZooKeeper(建议在集群中使用独立安装的 ZooKeeper)
zkServer.sh start



2.3.2 启动 Kafka 集群

所有三台机器都需启动 Kafka Broker


后台运行(守护进程模式,带日志输出在终端)

kafka-server-start.sh /opt/apps/kafka/config/server.properties &
  • 特点:Kafka 以后台方式运行,但日志会打印到终端窗口。

  • 限制:如果关闭终端窗口,Kafka 服务也会停止。

  • 适用场景:临时运行,不建议用于生产环境。



2.3.3 运行命令查看集群信息

# 通过连接master:9092,查看Kafka集群中Broker的API版本信息及集群元数据(如ClusterId和Broker列表)。
kafka-broker-api-versions.sh --bootstrap-server master:9092


2.3.4 关闭 Kafka 和 ZooKeeper** 集群

1)关闭 Kafka

# 挂起运行时
# 即 Kafka 是以前台挂起模式运行(没有 `&` 或 `-daemon`)
ctrl+c

# 后台运行时
# 即 Kafka 是以后台模式运行(通过 `&` 或 `-daemon` 启动)
kafka-server-stop.sh



2)关闭 ZooKeeper

# 挂起运行时
# 即 ZooKeeper 是以前台挂起模式运行(没有 `&` 或 `-daemon`)
ctrl+c

# 后台运行时
# 即 ZooKeeper 是以后台模式运行(通过 `&` 或 `-daemon` 启动)
zookeeper-server-stop.sh



2.3.5 实操:kafka集群启停操作

1.依次在三个节点分别启动zookeeper集群。

[root@master kafka]$ zkServer.sh start
[root@slave1 kafka]$ zkServer.sh start
[root@slave2 kafka]$ zkServer.sh start

2.依次在三个节点启动kafka的broker

# 建议第一次启动使用挂起命令,便于查找错误 
# 加载./config/server.properties配置文件
cd /opt/apps/kafka/
[root@master kafka]# kafka-server-start.sh ./config/server.properties &
[root@slave1 kafka]# kafka-server-start.sh ./config/server.properties &
[root@slave2 kafka]# kafka-server-start.sh ./config/server.properties &

# kafka能正常启动后,建议以后按以下的方法后台启动进程
kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties

image-20230628175818600

image-20230628175748555

image-20230628175722210


2.3.6 集群校验

  • 依次测试每台节点的进程是否启动(三台机器开新窗口)

====master
[hadoop@master kafka]# jps
3109 Jps
2713 QuorumPeerMain
2780 Kafka

=====slave1
[hadoop@slave1 kafka]# jps
2643 Kafka
2967 Jps
2603 QuorumPeerMain

=====slave2
[hadoop@slave2 kafka]# jps
2690 Kafka
2658 QuorumPeerMain
3014 Jps


错误坑点:

Kafka 创建 Topic 报错:

replication factor: 3 larger than available brokers: 1

原因

  1. Topic 副本数超过 Broker 数量:

    • 设置 replication.factor=3,但只有 1 个 Broker 启动。

  2. 集群 ID 不匹配:

    • Kafka 启动时,meta.properties 文件中的 cluster.id 不一致。


解决方案

方法 1:清理日志目录

  1. 找到 server.properties 中的 log.dirs 路径:

    log.dirs=/opt/apps/kafka/data
  2. 删除日志目录中的所有文件:

    rm -rf /opt/apps/kafka/data/*
  3. 重启 Kafka。

方法 2:修改 meta.properties

  1. 找到 meta.properties 文件所在路径:

    /opt/apps/kafka/data/meta.properties
  2. 修改 cluster.id 为与其他 Broker 一致。


总结

  • 测试环境:清空日志目录(方法 1)。  

  • 生产环境:修改 cluster.id 保留数据(方法 2)。


  • 关闭kafka集群

# 方法一:脚本
kafka-server-stop.sh

# 方法二:强制
kill -9 pid



三、kafka实例操作

kafka集群启动成功后,我们就可以对kafka集群进行操作了。

3.1 创建主题

任务:在 Kafka 集群中创建一个名为 test 的新主题

目标:主题 test,1 个分区,3 个副本。


第一种方法:推荐方法(适用于 Kafka 2.2 及以上版本)

kafka-topics.sh --create \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic test --partitions 1 --replication-factor 3

参数解释

  • kafka-topics.sh:Kafka 主题管理脚本。

  • --create:表示创建主题。

  • --bootstrap-server:指定 Kafka 集群的 Broker 地址列表(多个地址用逗号分隔)。

  • --topic:指定主题名称(此处为 test)。

  • --partitions:指定分区数(此处为 1)。

  • --replication-factor:指定副本数(此处为 3,表示每个分区有 3 个副本)。

注意事项

  • 主题名称必须唯一,不能与现有主题重复。

  • 分区(Partition) 是生产/消费的基本单位,对应 Kafka 服务端的一个目录(例如:主题名称-分区编号),目录中存放数据文件和索引文件。


简化命令

kafka-topics.sh --create \
--bootstrap-server master:9092 \
--topic test --partitions 1 --replication-factor 3

区别

  1. 多节点连接

    • --bootstrap-server master:9092,slave1:9092,slave2:9092

    • 指定多个 Kafka Broker,客户端可以连接任意一个 Broker,增强容错性和可用性。

  2. 单节点连接

    • --bootstrap-server master:9092

    • 只指定一个 Kafka Broker。如果该 Broker 不可用,客户端将无法连接到集群。


创建主题时的警告信息

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

解释

  • 原因:由于 Kafka 内部的指标(Metrics)名称限制,含有点(.)或下划线(_)的主题在某些情况下可能会被视为相同,从而导致冲突。

  • 影响:如果同时使用点和下划线作为主题名称的一部分,可能会造成监控和指标系统中的数据错误或冲突。

  • 建议

    • 主题名称中只使用点(.)或下划线(_),不要两者混用。

    • 统一命名规则,保持一致性,避免冲突。


3.2 查看所有主题列表

kafka-topics.sh --list --bootstrap-server master:9092,slave1:9092,slave2:9092


# 简化命令,只指定一个节点作为引导服务器,用于获取集群中的主题列表。
kafka-topics.sh --list --bootstrap-server master:9092


3.3 查看所有topic的详细信息

kafka-topics.sh --describe \
--bootstrap-server master:9092,slave1:9092,slave2:9092

##运行结果
Topic: test     PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824
       Topic: test     Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2


3.4 查看指定topic的详细信息

kafka-topics.sh --describe \
--topic test \
--bootstrap-server master:9092,slave1:9092,slave2:9092

# 查看结果如下:
Topic: test     PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824
       Topic: test     Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2

# 解释
Topic: test:这是主题的名称,即 "test"
PartitionCount: 1:这个主题有一个分区。
ReplicationFactor: 3:这个主题的复制因子是 3,意味着每个分区都会有 3 个副本。
Configs: segment.bytes=1073741824:表示 Kafka 分区的日志存储会被切分成多个文件,每个文件最大为 1 GB。
Partition: 0:这是分区的编号,这个主题只有一个分区,所以编号是 0
Leader: 1  表示分区的当前领导者(Leader)的 ID,这个分区的领导者是 1 号服务器。
Replicas: 1,3,2 表示分区 0 有三个副本,分别存储在 Broker 1、3 和 2 上。
Isr: 1,3,2 此处说明分区 0 的所有副本(1、3、2)都处于同步状态。


3.5 启动控制台生产者

# 通过slave1节点启动一个控制台生产者,建立与Kafka集群的连接,并将用户在控制台输入的消息发送到主题 "test"。
kafka-console-producer.sh --topic test --broker-list master:9092,slave1:9092,slave2:9092

# 解释
- `kafka-console-producer.sh`:Kafka 命令行工具,用于向主题发送消息。  
- `--broker-list master:9092,slave1:9092,slave2:9092`:指定 Kafka 集群中 broker 的地址和端口,用逗号分隔。   
- `--topic test`:指定发送消息的目标主题 `test`


3.6 启动控制台消费者

# 启动一个 Kafka 控制台消费者,从 master:9092 连接 Kafka 集群,订阅主题 test 并从头开始消费消息。
kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server master:9092,slave1:9092,slave2:9092

# 解释
- `kafka-console-consumer.sh`:Kafka 命令行工具,用于消费指定主题的消息。  
- `--broker-list master:9092,slave1:9092,slave2:9092`:指定 Kafka 集群中 broker 的地址和端口,用逗号分隔。    
- `--topic test`:指定目标主题 `test`。  
- `--from-beginning`:从主题的起始位置消费所有历史消息。  


3.7 生产、消费数据

# 在slave1上(生产者控制台)输入(生产)数据
>hello kafka

# 在master上(消费者控制台)接收(消费)数据
hello kafka

# 表示消费端成功消费了生产者生产的消息!


注意:Apache Kafka接收到的每一条消息都存储在日志中,默认情况下,它保持消息168小时,即7天。


3.8 删除主题

在 Kafka 配置中,server.properties配置文件中

log.retention.hours=168 :表示日志文件即消息被写入到 Kafka 主题的某个特定分区后开始计时,该消息将至少被保留在服务器上 168 小时(即 7 天)

log.retention.hours=-1:消息永久保存


1.删除主题 test

kafka-topics.sh --delete \
--topic test \
--bootstrap-server master:9092,slave1:9092,slave2:9092

4. 验证删除状态

使用以下命令查看主题列表,确认主题是否仍然存在:

kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092



四、实验

实验任务

  1. 配置并启动 ZooKeeper 和 Kafka 集群。  

  2. 创建、查看、描述、删除主题。  

  3. 使用生产者发送消息,消费者读取消息。  

  4. 验证消费者组的分区分配和负载均衡。  

  5. 学习多种消费方式并记录日志。  

  6. 删除主题并清理实验环境。


实验目标

  1. 掌握 Kafka 的启动与主题管理。  

  2. 理解消息生产与消费流程。  

  3. 熟悉消费者组机制及负载均衡。  

  4. 掌握不同消费方式和环境清理操作。  

实验结束后,学员可熟练使用 Kafka 的基础功能。

一、启动环境

1. 启动 ZooKeeper

# 依次在三台主机上操作
zkServer.sh start

# 查看 ZooKeeper 的状态
zkServer.sh status

2. 启动 Kafka

# 依次在三台主机上操作
cd /opt/apps/kafka

./bin/kafka-server-start.sh ./config/server.properties &


二、实验操作


3. 查看已存在的主题

cd /opt/apps/kafka

./bin/kafka-topics.sh --list --bootstrap-server master:9092,slave1:9092,slave2:9092

4. 创建主题

创建一个主题 user_logs,用于模拟用户日志处理:

./bin/kafka-topics.sh --create \
--bootstrap-server master:9092,slave1:9092,slave2:9092  \
--topic user_logs \
--partitions 3 \
--replication-factor 2

5. 查看主题的详细信息

# 查看所有主题的明细
./bin/kafka-topics.sh --describe --bootstrap-server master:9092,slave1:9092,slave2:9092

# 查看指定主题的明细
./bin/kafka-topics.sh --describe --topic user_logs \
--bootstrap-server master:9092

6. 消费与生产实验

6.1 启动生产者,发送消息

# 在 slave1 节点启动生产者,发送模拟用户日志数据
./bin/kafka-console-producer.sh \
--broker-list master:9092,slave1:9092,slave2:9092  \
--topic user_logs

6.2 单一消费者的基本操作

# 在 master 节点启动消费者,消费 user_logs 主题的消息
./bin/kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092  \
--topic user_logs

7. 消费者组实验

7.1 启动多个消费者,加入同一组

# 在 master 节点启动第一个消费者,加入组 user_logs_group
./bin/kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--group user_logs_group

# 在 slave1 节点启动第二个消费者,加入组 user_logs_group
./bin/kafka-console-consumer.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs \
--group user_logs_group

7.2 验证分区分配

  1. 打开生产者,发送消息:

    ./bin/kafka-console-producer.sh \
    --broker-list master:9092,slave1:9092,slave2:9092 \
    --topic user_logs
  2. 观察两个消费者终端的消息分布,验证分区分配。

7.3 验证分区重新分配

  1. 停止其中一个消费者(Ctrl+C)。

  2. 观察剩余消费者是否接管分区数据。

7.4 查看消费者组状态

./bin/kafka-consumer-groups.sh \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--group user_logs_group --describe

8. 消费数据的不同方式

8.1 从最早的消息开始消费

./bin/kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:9092 --from-beginning

8.2 从最新的消息开始消费

./bin/kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:9092

8.3 在后台运行消费者

nohup ./bin/kafka-console-consumer.sh \
--topic user_logs \
--bootstrap-server master:9092,slave1:9092,slave2:9092 > /var/log/kafka/consumer.log 2>&1 &

9. 删除主题(实验结束后)

9.1 删除主题

./bin/kafka-topics.sh --delete \
--bootstrap-server master:9092,slave1:9092,slave2:9092 \
--topic user_logs

9.2 确认主题删除

./bin/kafka-topics.sh --list \
--bootstrap-server master:9092,slave1:9092,slave2:9092


【知识点】

五、补充知识


5.1 Topic和Partition的关系

  1. Topic(主题)

    • Topic是Kafka中消息的逻辑容器或类别。

    • 消息被发布到特定的Topic中,并且消费者订阅Topic以接收其中的消息。

    • Topic的名称用于标识和区分不同类型的消息数据,通常与业务或应用程序相关。

  2. Partition(分区)

    • Partition是Topic的一个物理分片或子集。

    • 每个Topic可以分为多个Partition,每个Partition都是独立的存储单元。

    • Partition解决了单台物理机存储和处理大量数据的限制,允许数据在多个分区中分散存储和处理。

关于Topic和Partition的关系:

  • 一个Topic可以有多个Partition,这些Partition可以分布在Kafka集群中的不同Broker上。这允许Topic的数据分布在多台机器上,以实现横向扩展和高吞吐量。

  • 当消息被生产者发布到Topic时,Kafka会根据一定的分区策略将消息写入特定的Partition。这可以是手动指定的分区,也可以是自动轮询分区或根据消息的Key进行分区。

  • 消费者订阅Topic时,它们实际上订阅的是Topic的所有Partition。每个消费者会负责消费一个或多个Partition中的消息。

  • Partition的数量通常根据数据负载和性能需求进行规划。增加Partition数量可以提高并行性和负载均衡,但也需要更多的资源来管理。


5.2 Partition到底是什么?

【对于服务端而言】

Partition是我们进行生产/消费的真正实体,那么这个实体在服务端看起来是什么样子呢?

img

Partition在服务端是一个目录,其名称由主题名称和分区编号组成。

那我们进这个文件夹内部看一下,里面存放了些什么东西的?

img

Partition包含以下内容:

  • .log文件:这是用于进行读写的物理数据文件。所有消息的实际数据都存储在这些日志文件中,按顺序追加。

  • .index文件:稀疏索引文件,通过消息的offset进行构建的稀疏索引。

  • .timeindex文件:稀疏索引文件,通过消息的写入/创建时间进行构建的稀疏索引。

  • checkpoint 检查点文件:在Kafka中用于记录分区的消费者状态信息,以实现恢复和持久化。

Partition在服务端就是一个TopicPartition,本质是一个文件夹,里面放着一些数据文件和对应的索引文件。


【对于生产者而言】

  • 生产者在生产数据时会将数据写入特定TopicPartition的数据文件内。

  • 生产者可以将消息发送到指定分区、自动轮询分区或根据指定的Key来确定发送到哪个分区。


【对于消费者而言,分区是什么】

  • 消费者需要知道要消费的具体分区才能获取数据。

  • 消费者可以手动指定分区或自动分配分区来进行消费。


【对于整个Kafka集群而言,分区的意义是什么】

在生产和消费Kafka中的Topic数据时,当数据量逐渐增加导致单台物理机的磁盘容量不足时,可以考虑横向扩容,即通过增加机器的方式来分散数据。这是分区设计的核心概念,通过将Topic的数据分片成多个Partition,并将这些Partition分布到集群中的不同Broker上,实现了分布式存储,


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1