李翔-大数据技术

Big data technology!

9 《智慧设备运行预测大屏》项目启动步骤

《智慧设备运行预测大屏》项目启动步骤


项目下载:https://share.feijipan.com/s/3Ubdme4i


一、系统架构

image.png

系统架构图


ChatGPT Image 2026年5月27日 11_13_13


二、环境准备(首次搭建必读)

详见《实训项目准备清单》,主要包括:

  • 镜像配置(Flink 1.15.4 / MySQL / ZooKeeper / SELinux)

  • 数据库建表(MySQL / Hive / ClickHouse)

  • Kafka Topic 创建

  • 历史数据上传到 /opt/datas/

  • JAR 包和脚本上传到 /opt/jars/


三、启动顺序总览


步骤服务节点
第 1 步Hadoop(HDFS + YARN)master / slave1
第 2 步ZooKeepermaster / slave1 / slave2
第 3 步Kafkamaster / slave1 / slave2
第 4 步Hive Metastoremaster
第 5 步Flink 集群master
第 6 步设备数据模拟采集master
第 7 步实时分析作业master
第 8 步离线分析作业master
第 9 步Spark ML 预测作业master
第 10 步Spring Boot 后端master
第 11 步Vue 前端本地 / master



✅ 第 1 步:启动 Hadoop集群

# master 节点:启动 HDFS
start-dfs.sh

# slave1 节点:启动 YARN
start-yarn.sh

若 HDFS 处于安全模式,执行以下命令退出:

hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode leave

✅ 第 2 步:启动 ZooKeeper

master、slave1、slave2 上分别执行:

# 启动
zkServer.sh start

# 查看角色(应出现 leader / follower)
zkServer.sh status

✅ 第 3 步:启动 Kafka

master、slave1、slave2 上分别执行:

kafka-server-start.sh -daemon /opt/apps/kafka/config/server.properties

✅ 第 4 步:启动 Hive Metastore

# master 节点执行
nohup hive --service metastore &

Hive Metastore 默认监听端口 9083,Flink / Spark 连接 Hive 时依赖此服务。

✅ 第 5 步:启动 Flink 集群

# # master 节点执行
start-cluster.sh

✅ 第 6 步:启动 数据采集脚本

# 在 master 节点执行:
cd /opt/datas

# 执行 Python 程序持续产生模拟数据并写入 Kafka
python 01_device_status_sim.py | \
  kafka-console-producer.sh \
  --broker-list master:9092 \
  --topic device_status_topic


验证数据是否写入 Kafka(在slave1验证):

kafka-console-consumer.sh \
  --bootstrap-server master:9092 \
  --topic device_status_topic \
  --from-beginning



✅ 第 7 步:提交 Flink 实时分析作业

功能:统计在线设备数、告警数量、温度趋势、告警设备排名

方法 1:开发调试模式(IDEA 本地运行)

打开项目 flink-rt-metrics,运行启动类:

RtMetricsSqlJob

方法 2:集群部署模式

flink run -c com.demo.flink.RtMetricsSqlJob \
  /opt/jars/flink-rt-metrics-1.0.0.jar \
  --jobmanager.memory.process.size 512m \
  --taskmanager.memory.process.size 512m \
  --taskmanager.numberOfTaskSlots 1 \
  --parallelism 1

方法 3:YARN Per-Job 模式

flink run -t yarn-per-job \
  -c com.demo.flink.RtMetricsSqlJob \
  /opt/jars/flink-rt-metrics-1.0.0.jar

验证:查询 MySQL 是否有数据写入:

SELECT * FROM rt.rt_kpi_online_10s ORDER BY ts DESC LIMIT 5;

✅ 第 8 步:完成 离线分析作业

8.1 历史数据入湖

功能:将 /opt/datas/ 目录下的历史 CSV 数据导入 Hive ODS 分区表 iot.ods_device_status_di,为后续历史趋势分析与机器学习训练提供数据底座。

方法 1:开发调试模式(Hive 客户端运行)

进入 Hive 客户端,依次执行:

-- 1. 将 CSV 数据加载到临时表
LOAD DATA LOCAL INPATH '/opt/datas/device_status_2026_01.csv'
INTO TABLE iot.ods_device_status_csv_tmp;

-- 2. 开启动态分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

-- 3. 从临时表写入 ODS 分区表(清洗 + 生成 ts / dt 字段)
INSERT OVERWRITE TABLE iot.ods_device_status_di PARTITION (dt)
SELECT
  event_type, device_id, status, temperature, `load`, voltage, event_time,
  from_unixtime(unix_timestamp(event_time, 'yyyy-MM-dd HH:mm:ss')) AS ts,
  substr(event_time, 1, 10) AS dt
FROM iot.ods_device_status_csv_tmp
WHERE event_type <> 'event_type';

验证: 查看 Hive ODS 表是否已生成分区数据:

SHOW PARTITIONS iot.ods_device_status_di;
SELECT COUNT(*) FROM iot.ods_device_status_di;


8.2 实时数据入湖

功能:通过 Spark Structured Streaming 从 Kafka 持续消费设备数据,每 10 秒写入一批到 Hive ODS 分区表 iot.ods_device_status_di,按日期自动分区。

方法 1:开发调试模式(IDEA 本地运行)

打开项目 spark-ods-ingest,运行启动类:

KafkaToHiveDeviceStatusJob

方法 2:本地模式(调试用)

spark-submit \
  --class com.demo.spark.KafkaToHiveDeviceStatusJob \
  --master local[1] \
  /opt/jars/spark-job-1.0.0.jar

方法 3:YARN Client 模式

spark-submit \
  --class com.demo.spark.KafkaToHiveDeviceStatusJob \
  --master yarn \
  --deploy-mode client \
  /opt/jars/spark-job-1.0.0.jar

验证: 查询 Hive ODS 表是否有当天最新数据:

-- 查看当天分区(最后一行应为今日日期)
SHOW PARTITIONS iot.ods_device_status_di;

-- 查看当天最新写入的 5 条数据
SELECT * FROM iot.ods_device_status_di
WHERE dt = '2026-05-27'
ORDER BY ts DESC
LIMIT 5;

⚠️ 实时数据入湖为常驻运行作业,启动后持续消费 Kafka 数据,Structured Streaming 会按日期自动写入对应分区(今天写 dt=今天,跨天后自动切换写 dt=明天),无需人工干预。运行前请确保 Python 模拟脚本已持续写入 Kafka。


8.3 离线数据分析(历史 7 天温度趋势)

功能:基于 Hive ODS 表中的历史明细数据,按天聚合计算每日平均温度与最高温度,结果写入 ClickHouse 的 dws_temp_trend_day 表,供大屏"历史 7 天温度趋势"图表使用。

方法 1:开发调试模式(IDEA 本地运行)

打开项目 spark-ods-ingest,运行启动类:

TempTrendBatchJob

方法 2:本地模式(调试用)

spark-submit \
  --class com.demo.spark.TempTrendBatchJob \
  --master local[1] \
  /opt/jars/spark-job-1.0.0.jar

方法 3:YARN Client 模式

spark-submit \
  --class com.demo.spark.TempTrendBatchJob \
  --master yarn \
  --deploy-mode client \
  /opt/jars/spark-job-1.0.0.jar

验证: 查询 ClickHouse 是否有历史趋势数据:

# 进入 ClickHouse 多行模式
clickhouse-client -m
-- 查看历史温度趋势数据(按日期升序)
SELECT * FROM iot_report.dws_temp_trend_day ORDER BY dt;

-- 查看近 7 天数据
SELECT * FROM iot_report.dws_temp_trend_day
ORDER BY dt DESC
LIMIT 7;

⚠️ 该作业为一次性批量任务,每次运行会先清空 ClickHouse 目标表再全量写入。生产环境通过 DolphinScheduler / Azkaban 等调度工具每天凌晨定时触发,本项目教学环境中手动运行即可。


✅ 第 9 步:启动 Spark ML 预测作业

9.1 上传训练数据
# 在HDFS上创建数据目录
hdfs dfs -mkdir  /datas

# 把master上的历史数据上传到HDFS上
hdfs dfs -put /opt/datas/device_status_with_label_2026_01.csv /datas/


9.2 模型训练作业

功能:使用历史数据(带标签)训练随机森林分类模型,并将训练好的模型保存到 HDFS,供后续实时预测加载使用。

方法 1:开发调试模式(IDEA 本地运行)

打开项目 spark-ods-ingest,运行启动类:

DeviceTrainModelJob

方法 2:本地模式(调试用)

spark-submit \
  --class com.demo.spark.DeviceAbnormalPredictJob \
  --master local[1] \
  /opt/jars/spark-job-1.0.0.jar

方法 3:YARN Client 模式

spark-submit \
  --class com.demo.spark.DeviceAbnormalPredictJob \
  --master yarn \
  --deploy-mode client \
  /opt/jars/spark-job-1.0.0.jar

⚠️ 训练作业为一次性任务,模型训练完成后即退出。模型只需训练一次,后续实时预测会自动加载该模型,无需每次重新训练。


9.3 实时预测作业

功能:对设备状态进行实时预测,结果写入 ClickHouse

方法 1:开发调试模式(IDEA 本地运行)

打开项目 spark-ods-ingest,运行启动类:

DevicePredictStreamJob

方法 2:本地模式(调试用)

spark-submit \
  --class com.demo.spark.DevicePredictStreamJob \
  --master local[1] \
  /opt/jars/spark-job-1.0.0.jar

方法 3:YARN Client 模式

spark-submit \
  --class com.demo.spark.DevicePredictStreamJob \
  --master yarn \
  --deploy-mode client \
  /opt/jars/spark-job-1.0.0.jar

验证:查询 ClickHouse 是否有预测数据:

# 进入 ClickHouse 多行模式
clickhouse-client -m
SELECT *
FROM iot_report.dws_device_pred_detail
ORDER BY created_at DESC
LIMIT 5;

✅ 第 10 步:启动 Spring Boot 后端服务

方法 1:开发调试模式(IDEA 本地运行)

打开项目 smart-screen-backend,运行启动类:

SmartScreenBackendApplication

方法 2:集群部署模式

# master 节点,新开终端执行
java -jar /opt/jars/smart-screen-backend-1.0.0-SNAPSHOT.jar

接口验证


接口地址
KPI 汇总接口http://localhost:8080/api/kpi/summary
温度趋势http://localhost:8080/api/kpi/tempTrend?limit=60
告警 Tophttp://localhost:8080/api/kpi/alarmTop?limit=3
历史 7 天温度http://localhost:8080/api/history/temp7d
风险环形图http://localhost:8080/api/predict/riskPie
高风险 Top3http://localhost:8080/api/predict/topRisk?limit=3



✅ 第 11 步:启动 Vue 前端

方法 1:本地开发模式(Vite)

# 在IDEA的 smart-screen-frontend 项目根目录执行
npm run dev

浏览器访问:http://localhost:5173

方法 2:服务器部署模式(Nginx)

前提:Spring Boot 后端必须在集群中运行

浏览器访问:http://192.168.36.100


四、一键运行项目

准备工作: 将以下文件统一放置到 master 节点的 /opt/jars/ 目录下:

文件说明
flink-rt-metrics-1.0.0.jarFlink 实时计算作业
spark-job-1.0.0.jarSpark 离线分析 + ML 预测作业
smart-screen-backend-1.0.0-SNAPSHOT.jarSpring Boot 后端服务
start-all.sh一键启动脚本
stop-all.sh一键停止脚本

运行方式:

## 首次运行(需要先赋予执行权限):
# 赋予启动脚本执行权限
chmod +x /opt/jars/start-all.sh
# 赋予关闭脚本执行权限
chmod +x /opt/jars/stop-all.sh
## 后续运行(权限已设置,直接执行):
# 一键启动项目
/opt/jars/start-all.sh
# 一键关闭项目
/opt/jars/stop-all.sh



1.一键启动项目脚本

#!/bin/bash
# =============================================================
#  《智慧设备运行预测大屏》一键启动脚本
#  运行节点:master
#  使用方式:chmod +x start-all.sh && ./start-all.sh
#  重复执行安全:已运行的服务自动跳过
#
#  启动顺序:
#    1. Hadoop(HDFS + YARN)
#    2. ZooKeeper
#    3. Kafka
#    4. Hive Metastore
#    5. Flink 集群 + 实时分析作业
#    6. 设备数据模拟采集
#    7. 离线分析作业(询问执行)
#       └─ 7.1 历史数据入湖(可选)
#       └─ 7.2 实时数据入湖(必执行)
#       └─ 7.3 离线温度趋势分析(可选,等 7.2 产生数据后执行)
#    8. Spark ML 预测作业(询问执行)
#       └─ 8.1 上传训练数据(可选)
#       └─ 8.2 模型训练(可选)
#       └─ 8.3 实时预测(必执行)
#    9. Spring Boot 后端
# =============================================================

# ---------- 颜色定义 ----------
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
CYAN='\033[0;36m'
MAGENTA='\033[0;35m'
NC='\033[0m'

# ---------- 工具函数 ----------
log_step()  { echo ""; echo -e "${BLUE}========================================${NC}"; echo -e "${CYAN}  $1${NC}"; echo -e "${BLUE}========================================${NC}"; }
log_sub()   { echo -e "${MAGENTA}  ▸ $1${NC}"; }
log_info()  { echo -e "${YELLOW}[INFO]${NC}  $1"; }
log_ok()    { echo -e "${GREEN}[OK]${NC}    $1"; }
log_skip()  { echo -e "${CYAN}[SKIP]${NC}  $1"; }
log_error() { echo -e "${RED}[ERROR]${NC} $1"; }
log_fatal() { echo -e "${RED}[FATAL]${NC} $1"; exit 1; }
wait_sec()  { log_info "等待 $1 秒..."; sleep "$1"; }

# 询问 Y/N,默认 N(直接回车 = 不执行)
ask_yes_no() {
    local question="$1"
    local answer
    echo ""
    read -p "$(echo -e ${YELLOW}[ASK]${NC}  $question [y/N]: )" answer
    [ "$answer" = "y" ] || [ "$answer" = "Y" ]
}

# 检查本地 Java 进程
is_running_local() {
    source /etc/profile && jps 2>/dev/null | grep -qw "$1" 2>/dev/null \
        || pgrep -f "$1" > /dev/null 2>&1
}

# 检查远程节点 Java 进程
is_running_remote() {
    local result
    result=$(ssh "$1" "source /etc/profile && jps 2>/dev/null | grep -w '$2'" 2>/dev/null)
    [ -n "$result" ]
}

# 检查本地端口是否占用
port_in_use() { ss -tlnp 2>/dev/null | grep -q ":$1 "; }

# 等待今天的 Hive 分区出现并有数据(最多等 2 分钟)
# 用于确保 7.2 实时入湖已经处理了 Kafka 中积压的数据
wait_for_today_partition() {
    local today=$(date +%Y-%m-%d)
    local hdfs_path="/user/hive/warehouse/iot.db/ods_device_status_di/dt=$today"
    local max_wait=24   # 24 次 x 5 秒 = 2 分钟
    local i

    log_info "等待 7.2 实时入湖把今天($today)的数据写入 Hive..."

    for i in $(seq 1 $max_wait); do
        # 检查今天的分区目录是否存在
        if hdfs dfs -test -e "$hdfs_path" 2>/dev/null; then
            # 检查分区内是否有文件
            local file_count=$(hdfs dfs -ls "$hdfs_path" 2>/dev/null | grep -c "^-")
            if [ "$file_count" -gt 0 ]; then
                log_ok "检测到今天的数据已写入 Hive(共 $file_count 个文件)"
                return 0
            fi
        fi
        log_info "  等待中... ($((i*5))/$((max_wait*5)) 秒)"
        sleep 5
    done

    log_error "等待超时,7.2 可能未正常运行;7.3 将基于现有数据执行"
    return 1
}

# ---------- 配置 ----------
SSH_ENV="source /etc/profile && "
SLAVE1="slave1"
SLAVE2="slave2"
KAFKA_CONFIG="/opt/apps/kafka/config/server.properties"
DATA_DIR="/opt/datas"
SIM_SCRIPT="01_device_status_sim.py"
HISTORY_CSV="device_status_2026_01.csv"
TRAIN_CSV="device_status_with_label_2026_01.csv"
HDFS_TRAIN_DIR="/datas"
KAFKA_TOPIC="device_status_topic"
KAFKA_BROKER="master:9092"
FLINK_JAR="/opt/jars/flink-rt-metrics-1.0.0.jar"
FLINK_CLASS="com.demo.flink.RtMetricsSqlJob"
SPARK_JAR="/opt/jars/spark-job-1.0.0.jar"
SPARK_INGEST_CLASS="com.demo.spark.KafkaToHiveDeviceStatusJob"
SPARK_TREND_CLASS="com.demo.spark.TempTrendBatchJob"
SPARK_TRAIN_CLASS="com.demo.spark.DeviceTrainModelJob"
SPARK_PREDICT_CLASS="com.demo.spark.DevicePredictStreamJob"
BACKEND_JAR="/opt/jars/smart-screen-backend-1.0.0-SNAPSHOT.jar"

PID_FILE="/tmp/bigdata-pids.env"
> "$PID_FILE"


# =============================================================
#  第 1 步:启动 Hadoop 集群(HDFS + YARN)
# =============================================================
log_step "第 1 步:启动 Hadoop 集群(HDFS + YARN)"

if is_running_local "NameNode"; then
    log_skip "HDFS NameNode(已在运行)"
else
    log_info "启动 HDFS..."
    source /etc/profile && start-dfs.sh \
        && log_ok "HDFS 启动成功" \
        || log_error "HDFS 启动失败"
fi

if is_running_remote "$SLAVE1" "ResourceManager"; then
    log_skip "YARN ResourceManager(已在运行)"
else
    log_info "在 slave1 启动 YARN..."
    ssh "$SLAVE1" "${SSH_ENV}start-yarn.sh" \
        && log_ok "YARN 启动成功" \
        || log_error "YARN 启动失败"
fi

wait_sec 5


# =============================================================
#  第 2 步:启动 ZooKeeper(3 个节点)
# =============================================================
log_step "第 2 步:启动 ZooKeeper(master / slave1 / slave2)"

for NODE in master "$SLAVE1" "$SLAVE2"; do
    if [ "$NODE" = "master" ]; then
        STATUS=$(source /etc/profile && zkServer.sh status 2>&1)
    else
        STATUS=$(ssh "$NODE" "${SSH_ENV}zkServer.sh status 2>&1")
    fi

    if echo "$STATUS" | grep -q "Mode:"; then
        log_skip "[$NODE] ZooKeeper(已在运行)"
    else
        log_info "[$NODE] 启动 ZooKeeper..."
        if [ "$NODE" = "master" ]; then
            source /etc/profile && zkServer.sh start \
                && log_ok "[$NODE] ZooKeeper 启动完成" \
                || log_error "[$NODE] ZooKeeper 启动失败"
        else
            ssh "$NODE" "${SSH_ENV}zkServer.sh start" \
                && log_ok "[$NODE] ZooKeeper 启动完成" \
                || log_error "[$NODE] ZooKeeper 启动失败"
        fi
    fi
done

wait_sec 3


# =============================================================
#  第 3 步:启动 Kafka(3 个节点)
# =============================================================
log_step "第 3 步:启动 Kafka(master / slave1 / slave2)"

for NODE in master "$SLAVE1" "$SLAVE2"; do
    if [ "$NODE" = "master" ]; then
        if [ -n "$(source /etc/profile && jps 2>/dev/null | grep -w Kafka)" ]; then
            log_skip "[$NODE] Kafka(已在运行)"
        else
            log_info "[$NODE] 启动 Kafka..."
            source /etc/profile && kafka-server-start.sh -daemon "$KAFKA_CONFIG" \
                && log_ok "[$NODE] Kafka 启动完成" \
                || log_error "[$NODE] Kafka 启动失败"
        fi
    else
        if [ -n "$(ssh "$NODE" "source /etc/profile && jps 2>/dev/null | grep -w Kafka")" ]; then
            log_skip "[$NODE] Kafka(已在运行)"
        else
            log_info "[$NODE] 启动 Kafka..."
            ssh "$NODE" "${SSH_ENV}kafka-server-start.sh -daemon $KAFKA_CONFIG" \
                && log_ok "[$NODE] Kafka 启动完成" \
                || log_error "[$NODE] Kafka 启动失败"
        fi
    fi
done

wait_sec 8


# =============================================================
#  第 4 步:启动 Hive Metastore
# =============================================================
log_step "第 4 步:启动 Hive Metastore"

if port_in_use 9083; then
    log_skip "Hive Metastore(端口 9083 已占用)"
else
    log_info "后台启动 Hive Metastore..."
    nohup hive --service metastore > /tmp/hive-metastore.log 2>&1 &
    HIVE_PID=$!
    echo "HIVE_PID=$HIVE_PID" >> "$PID_FILE"
    log_ok "Hive Metastore 已后台启动,PID=$HIVE_PID"
fi

wait_sec 10


# =============================================================
#  第 5 步:启动 Flink 集群 + 提交实时分析作业
# =============================================================
log_step "第 5 步:启动 Flink 集群 + 实时分析作业"

if port_in_use 8081; then
    log_skip "Flink 集群(端口 8081 已占用)"
else
    log_info "启动 Flink 集群..."
    source /etc/profile && start-cluster.sh \
        && log_ok "Flink 集群启动成功" \
        || log_error "Flink 集群启动失败"
fi

wait_sec 5

if [ ! -f "$FLINK_JAR" ]; then
    log_error "Flink JAR 不存在:$FLINK_JAR,跳过作业提交"
else
    RUNNING_COUNT=$(curl -s http://localhost:8081/jobs/overview 2>/dev/null \
        | grep -o '"state":"RUNNING"' | wc -l)
    if [ "$RUNNING_COUNT" -gt 0 ]; then
        log_skip "Flink 作业(已有 $RUNNING_COUNT 个作业在运行)"
    else
        log_info "提交 Flink 作业..."
        source /etc/profile && flink run -c "$FLINK_CLASS" "$FLINK_JAR" \
            --jobmanager.memory.process.size 512m \
            --taskmanager.memory.process.size 512m \
            --taskmanager.numberOfTaskSlots 1 \
            --parallelism 1 \
            && log_ok "Flink 作业提交成功" \
            || log_error "Flink 作业提交失败"
    fi
fi

wait_sec 3


# =============================================================
#  第 6 步:启动设备数据模拟采集(Python -> Kafka)
# =============================================================
log_step "第 6 步:启动设备数据模拟采集"

if is_running_local "$SIM_SCRIPT"; then
    log_skip "数据采集(已在运行)"
else
    if [ ! -f "$DATA_DIR/$SIM_SCRIPT" ]; then
        log_error "模拟脚本不存在:$DATA_DIR/$SIM_SCRIPT"
    else
        log_info "后台启动 Python 模拟 -> Kafka 管道..."
        nohup bash -c "cd $DATA_DIR && python $SIM_SCRIPT | \
            kafka-console-producer.sh \
            --broker-list $KAFKA_BROKER \
            --topic $KAFKA_TOPIC" \
            > /tmp/kafka-producer.log 2>&1 &
        PRODUCER_PID=$!
        echo "PRODUCER_PID=$PRODUCER_PID" >> "$PID_FILE"
        log_ok "数据采集已后台启动,PID=$PRODUCER_PID"
    fi
fi

wait_sec 3


# =============================================================
#  第 7 步:完成离线分析作业
# =============================================================
log_step "第 7 步:完成离线分析作业"

# 标记本轮是否启动了 7.2(用于决定 7.3 之前是否需要等待)
INGEST_JUST_STARTED=0

if ask_yes_no "是否进行离线数据分析?(包含历史数据入湖 + 离线温度趋势分析)"; then

    # ---------- 7.1 历史数据入湖 ----------
    log_sub "7.1 历史数据入湖"

    # 检查 CSV 文件是否存在
    if [ ! -f "$DATA_DIR/$HISTORY_CSV" ]; then
        log_fatal "历史数据文件不存在:$DATA_DIR/$HISTORY_CSV,请先上传后重试"
    fi

    # 检查 Hive 表是否已有数据
    HIVE_COUNT=$(hive -e "SELECT COUNT(*) FROM iot.ods_device_status_di;" 2>/dev/null | tail -1)
    if [ -n "$HIVE_COUNT" ] && [ "$HIVE_COUNT" -gt 0 ] 2>/dev/null; then
        log_skip "历史数据已入湖(共 $HIVE_COUNT 条),跳过 7.1"
    else
        log_info "执行历史数据入湖(CSV -> Hive ODS)..."
        hive -e "
            LOAD DATA LOCAL INPATH '$DATA_DIR/$HISTORY_CSV'
            INTO TABLE iot.ods_device_status_csv_tmp;

            SET hive.exec.dynamic.partition=true;
            SET hive.exec.dynamic.partition.mode=nonstrict;

            INSERT OVERWRITE TABLE iot.ods_device_status_di PARTITION (dt)
            SELECT
              event_type, device_id, status, temperature, \`load\`, voltage, event_time,
              from_unixtime(unix_timestamp(event_time, 'yyyy-MM-dd HH:mm:ss')) AS ts,
              substr(event_time, 1, 10) AS dt
            FROM iot.ods_device_status_csv_tmp
            WHERE event_type <> 'event_type';
        " > /tmp/hive-load.log 2>&1 \
            && log_ok "7.1 历史数据入湖完成" \
            || log_error "7.1 历史数据入湖失败,日志:/tmp/hive-load.log"
    fi

    # ---------- 7.2 实时数据入湖(必执行)----------
    log_sub "7.2 实时数据入湖"

    if is_running_local "KafkaToHiveDeviceStatusJob"; then
        log_skip "实时数据入湖(已在运行)"
    elif [ ! -f "$SPARK_JAR" ]; then
        log_error "Spark JAR 不存在:$SPARK_JAR,跳过 7.2"
    else
        log_info "后台启动 Spark 实时入湖作业..."
        nohup spark-submit \
            --class "$SPARK_INGEST_CLASS" \
            --master local[1] \
            "$SPARK_JAR" \
            > /tmp/spark-ingest.log 2>&1 &
        INGEST_PID=$!
        echo "INGEST_PID=$INGEST_PID" >> "$PID_FILE"
        log_ok "7.2 实时入湖作业已后台启动,PID=$INGEST_PID"
        INGEST_JUST_STARTED=1
    fi

    # ---------- 7.3 离线温度趋势分析 ----------
    # 关键:如果本轮刚启动了 7.2,必须等它把今天的数据写入 Hive 后再跑 7.3,
    # 否则 7.3 读不到今天的最新分区,ClickHouse 会缺少最新数据。
    log_sub "7.3 离线温度趋势分析"

    if [ "$INGEST_JUST_STARTED" = "1" ]; then
        wait_for_today_partition
    else
        log_info "7.2 已在运行中,跳过等待,直接执行 7.3"
    fi

    if [ ! -f "$SPARK_JAR" ]; then
        log_error "Spark JAR 不存在:$SPARK_JAR,跳过 7.3"
    else
        log_info "执行离线温度趋势分析(Hive -> ClickHouse)..."
        spark-submit \
            --class "$SPARK_TREND_CLASS" \
            --master local[4] \
            --driver-memory 2g \
            --conf spark.sql.shuffle.partitions=4 \
            --conf spark.ui.enabled=false \
            "$SPARK_JAR" \
            > /tmp/spark-trend.log 2>&1 \
            && log_ok "7.3 离线温度趋势分析完成" \
            || log_error "7.3 离线温度趋势分析失败,日志:/tmp/spark-trend.log"
    fi

else
    # ---------- 未选择离线分析,但仍需启动实时入湖(常驻服务)----------
    log_info "跳过离线分析(7.1、7.3),仅启动实时入湖(7.2)"
    log_sub "7.2 实时数据入湖"

    if is_running_local "KafkaToHiveDeviceStatusJob"; then
        log_skip "实时数据入湖(已在运行)"
    elif [ ! -f "$SPARK_JAR" ]; then
        log_error "Spark JAR 不存在:$SPARK_JAR,跳过 7.2"
    else
        log_info "后台启动 Spark 实时入湖作业..."
        nohup spark-submit \
            --class "$SPARK_INGEST_CLASS" \
            --master local[1] \
            "$SPARK_JAR" \
            > /tmp/spark-ingest.log 2>&1 &
        INGEST_PID=$!
        echo "INGEST_PID=$INGEST_PID" >> "$PID_FILE"
        log_ok "7.2 实时入湖作业已后台启动,PID=$INGEST_PID"
    fi
fi

wait_sec 3


# =============================================================
#  第 8 步:Spark ML 预测作业
# =============================================================
log_step "第 8 步:Spark ML 预测作业"

if ask_yes_no "是否进行模型训练?(包含上传训练数据 + 训练模型)"; then

    # ---------- 8.1 上传训练数据 ----------
    log_sub "8.1 上传训练数据到 HDFS"

    # 检查本地训练数据是否存在
    if [ ! -f "$DATA_DIR/$TRAIN_CSV" ]; then
        log_fatal "训练数据文件不存在:$DATA_DIR/$TRAIN_CSV,请先上传后重试"
    fi

    # 检查 HDFS 上是否已存在
    if hdfs dfs -test -e "$HDFS_TRAIN_DIR/$TRAIN_CSV" 2>/dev/null; then
        log_skip "训练数据已存在于 HDFS:$HDFS_TRAIN_DIR/$TRAIN_CSV,跳过 8.1"
    else
        log_info "创建 HDFS 目录 $HDFS_TRAIN_DIR..."
        hdfs dfs -mkdir -p "$HDFS_TRAIN_DIR" 2>/dev/null
        log_info "上传训练数据到 HDFS..."
        hdfs dfs -put "$DATA_DIR/$TRAIN_CSV" "$HDFS_TRAIN_DIR/" \
            && log_ok "8.1 训练数据上传成功" \
            || log_error "8.1 训练数据上传失败"
    fi

    # ---------- 8.2 模型训练 ----------
    log_sub "8.2 模型训练"

    if [ ! -f "$SPARK_JAR" ]; then
        log_error "Spark JAR 不存在:$SPARK_JAR,跳过 8.2"
    else
        log_info "执行模型训练(DeviceTrainModelJob)..."
        spark-submit \
            --class "$SPARK_TRAIN_CLASS" \
            --master local[1] \
            "$SPARK_JAR" \
            > /tmp/spark-train.log 2>&1 \
            && log_ok "8.2 模型训练完成" \
            || log_error "8.2 模型训练失败,日志:/tmp/spark-train.log"
    fi
else
    log_info "跳过模型训练(8.1、8.2),直接启动实时预测(8.3)"
fi

# ---------- 8.3 实时预测(必执行)----------
log_sub "8.3 实时预测"

if [ ! -f "$SPARK_JAR" ]; then
    log_error "Spark JAR 不存在:$SPARK_JAR,跳过 8.3"
elif is_running_local "DevicePredictStreamJob"; then
    log_skip "实时预测作业(已在运行)"
else
    log_info "后台启动 Spark 实时预测作业..."
    nohup spark-submit \
        --class "$SPARK_PREDICT_CLASS" \
        --master local[1] \
        "$SPARK_JAR" \
        > /tmp/spark-predict.log 2>&1 &
    PREDICT_PID=$!
    echo "PREDICT_PID=$PREDICT_PID" >> "$PID_FILE"
    log_ok "8.3 实时预测作业已后台启动,PID=$PREDICT_PID"
fi

wait_sec 3


# =============================================================
#  第 9 步:启动 Spring Boot 后端
# =============================================================
log_step "第 9 步:启动 Spring Boot 后端"

if [ ! -f "$BACKEND_JAR" ]; then
    log_error "后端 JAR 不存在:$BACKEND_JAR,跳过本步骤"
elif port_in_use 8080; then
    log_skip "Spring Boot 后端(端口 8080 已占用)"
else
    log_info "后台启动 Spring Boot..."
    nohup java -jar "$BACKEND_JAR" \
        > /tmp/backend.log 2>&1 &
    BACKEND_PID=$!
    echo "BACKEND_PID=$BACKEND_PID" >> "$PID_FILE"
    log_ok "后端服务已后台启动,PID=$BACKEND_PID"
fi


# =============================================================
#  启动完成汇总
# =============================================================
echo ""
echo -e "${GREEN}============================================${NC}"
echo -e "${GREEN}  ✅  所有服务启动完毕!${NC}"
echo -e "${GREEN}============================================${NC}"
echo ""
echo -e "  ${CYAN}后台进程 PID:${NC}"
cat "$PID_FILE" | while IFS='=' read -r NAME PID; do
    echo -e "    ${CYAN}${NAME}${NC} = $PID"
done
echo ""
echo -e "  ${CYAN}日志文件:${NC}"
echo -e "    Hive Metastore    -> /tmp/hive-metastore.log"
echo -e "    Kafka 数据采集    -> /tmp/kafka-producer.log"
echo -e "    Spark 实时入湖    -> /tmp/spark-ingest.log"
echo -e "    Spark 离线趋势    -> /tmp/spark-trend.log"
echo -e "    Spark 模型训练    -> /tmp/spark-train.log"
echo -e "    Spark 实时预测    -> /tmp/spark-predict.log"
echo -e "    Spring Boot       -> /tmp/backend.log"
echo ""
echo -e "  ${CYAN}服务验证:${NC}"
echo -e "    Flink UI   -> http://master:8081"
echo -e "    YARN UI    -> http://slave1:8088"
echo -e "    后端接口   -> http://master:8080/api/history/temp7d"
echo -e "    前端大屏   -> http://192.168.36.100"
echo ""


2.一键关闭项目脚本

#!/bin/bash
# =============================================================
#  大数据项目一键关闭脚本
#  运行节点:master
#  使用方式:chmod +x /opt/jars/stop-all.sh && ./opt/jars/stop-all.sh
#  停止顺序:与启动顺序相反,避免依赖服务先被关闭
# =============================================================

# ---------- 颜色定义 ----------
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
CYAN='\033[0;36m'
NC='\033[0m'

# ---------- 工具函数 ----------
log_step()  { echo ""; echo -e "${BLUE}========================================${NC}"; echo -e "${CYAN}  $1${NC}"; echo -e "${BLUE}========================================${NC}"; }
log_info()  { echo -e "${YELLOW}[INFO]${NC}  $1"; }
log_ok()    { echo -e "${GREEN}[OK]${NC}    $1"; }
log_skip()  { echo -e "${CYAN}[SKIP]${NC}  $1(未在运行,跳过)"; }
log_error() { echo -e "${RED}[ERROR]${NC} $1"; }
wait_sec()  { log_info "等待 $1 秒..."; sleep "$1"; }

# 检查本地是否有某个 jps 关键字对应的进程(用 jps 避免 pgrep 长命令行截断问题)
is_running_local() {
    # 先尝试 jps 精确匹配(适用于 Java 进程)
    # 若 jps 找不到则回退 pgrep(适用于 Python/Shell 进程)
    source /etc/profile && jps 2>/dev/null | grep -qw "$1" 2>/dev/null ||     pgrep -f "$1" > /dev/null 2>&1
}

# 检查远程节点是否有某个 jps 关键字对应的进程
is_running_remote() {
    local result
    result=$(ssh "$1" "source /etc/profile && jps 2>/dev/null | grep -w '$2'" 2>/dev/null)
    [ -n "$result" ]
}

# 用 pkill 停止本地进程,带超时等待确认
kill_local() {
    local keyword="$1"
    local name="$2"
    if is_running_local "$keyword"; then
        log_info "停止 $name..."
        pkill -f "$keyword"
        local i=0
        while is_running_local "$keyword" && [ $i -lt 10 ]; do
            sleep 1; i=$((i+1))
        done
        if is_running_local "$keyword"; then
            log_info "$name 未响应,强制终止..."
            pkill -9 -f "$keyword"
        fi
        log_ok "$name 已停止"
    else
        log_skip "$name"
    fi
}

# 用 pkill 停止远程进程
kill_remote() {
    local node="$1"
    local keyword="$2"
    local name="$3"
    if is_running_remote "$node" "$keyword"; then
        log_info "[$node] 停止 $name..."
        ssh "$node" "pkill -f '$keyword' 2>/dev/null; sleep 2; pkill -9 -f '$keyword' 2>/dev/null; true"
        log_ok "[$node] $name 已停止"
    else
        log_skip "[$node] $name"
    fi
}

# ---------- 配置(与 start-all.sh 保持一致) ----------
SSH_ENV="source /etc/profile && "
SLAVE1="slave1"
SLAVE2="slave2"
KAFKA_CONFIG="/opt/apps/kafka/config/server.properties"
SIM_SCRIPT="01_device_status_sim.py"
SPARK_CLASS="com.demo.spark.DevicePredictStreamJob"
BACKEND_JAR="/opt/jars/smart-screen-backend-1.0.0-SNAPSHOT.jar"
FLINK_CLASS="com.demo.flink.RtMetricsSqlJob"

PID_FILE="/tmp/bigdata-pids.env"


# =============================================================
#  第 8 步(逆序):停止 Spring Boot 后端
# =============================================================
log_step "第 1 步(逆序):停止 Spring Boot 后端"

kill_local "$BACKEND_JAR" "Spring Boot 后端"


# =============================================================
#  第 7 步(逆序):停止 Spark ML 作业
# =============================================================
log_step "第 2 步(逆序):停止 Spark ML 作业"

kill_local "$SPARK_CLASS" "Spark ML 作业"
# 同时清理残留的 spark-submit 进程
is_running_local "spark-submit" && pkill -9 -f "spark-submit" 2>/dev/null


# =============================================================
#  第 6 步(逆序):取消 Flink 作业并停止 Flink 集群
# =============================================================
log_step "第 3 步(逆序):停止 Flink 作业 & 集群"

# 先通过 REST API 取消正在运行的 Flink 作业
log_info "查询并取消所有 Flink 作业..."
# 用 jq 解析,若无 jq 则用 grep 匹配 "jid":"xxxxx" 格式
JOBS=$(curl -s http://localhost:8081/jobs/overview 2>/dev/null \
    | grep -o '"jid":"[^"]*"' | grep -o '[0-9a-f]\{32\}')

if [ -z "$JOBS" ]; then
    log_skip "Flink 作业(无运行中的作业)"
else
    for JOB_ID in $JOBS; do
        log_info "取消 Flink 作业 ID: $JOB_ID"
        curl -s -X PATCH "http://localhost:8081/jobs/${JOB_ID}?mode=cancel" > /dev/null
        log_ok "作业 $JOB_ID 已取消"
    done
    wait_sec 3
fi

wait_sec 3

# 再停止 Flink 集群
if is_running_local "StandaloneSessionClusterEntrypoint"; then
    log_info "停止 Flink 集群..."
    source /etc/profile && stop-cluster.sh \
        && log_ok "Flink 集群已停止" \
        || log_error "Flink 集群停止失败"
else
    log_skip "Flink 集群"
fi


# =============================================================
#  第 5 步(逆序):停止 Kafka 数据采集
# =============================================================
log_step "第 4 步(逆序):停止 Kafka 数据采集"

kill_local "$SIM_SCRIPT" "Kafka 数据采集(Python 模拟)"
# 同时停止 kafka-console-producer
kill_local "kafka-console-producer" "kafka-console-producer"


# =============================================================
#  第 3 步(逆序):停止 Hive Metastore
# =============================================================
log_step "第 5 步(逆序):停止 Hive Metastore"

kill_local "RunJar" "Hive Metastore"


# =============================================================
#  第 2 步(逆序):停止 Kafka 和 ZooKeeper
# =============================================================
log_step "第 6 步(逆序):停止 Kafka"

for NODE in master "$SLAVE1" "$SLAVE2"; do
    if [ "$NODE" = "master" ]; then
        KAFKA_PID=$(jps 2>/dev/null | grep -w 'Kafka' | awk '{print $1}')
        if [ -n "$KAFKA_PID" ]; then
            log_info "[$NODE] 停止 Kafka(PID=$KAFKA_PID)..."
            kill -15 "$KAFKA_PID" 2>/dev/null
            sleep 3
            # 若还存活则强制终止
            jps 2>/dev/null | grep -w 'Kafka' | awk '{print $1}' | xargs -r kill -9 2>/dev/null
            sleep 1
            STILL=$(jps 2>/dev/null | grep -w 'Kafka')
            if [ -n "$STILL" ]; then
                log_error "[$NODE] Kafka 停止失败,请手动处理"
            else
                log_ok "[$NODE] Kafka 已停止"
            fi
        else
            log_skip "[$NODE] Kafka"
        fi
    else
        KAFKA_PID=$(ssh "$NODE" "source /etc/profile && jps 2>/dev/null | grep -w 'Kafka' | awk '{print \$1}'" 2>/dev/null)
        if [ -n "$KAFKA_PID" ]; then
            log_info "[$NODE] 停止 Kafka(PID=$KAFKA_PID)..."
            ssh "$NODE" "kill -15 $KAFKA_PID 2>/dev/null; sleep 3; kill -9 $KAFKA_PID 2>/dev/null; true"
            log_ok "[$NODE] Kafka 已停止"
        else
            log_skip "[$NODE] Kafka"
        fi
    fi
done

wait_sec 3

log_step "第 7 步(逆序):停止 ZooKeeper"

# ZooKeeper 必须在 Kafka 完全停止后再关,否则 Kafka offset 可能丢失
for NODE in master "$SLAVE1" "$SLAVE2"; do
    if [ "$NODE" = "master" ]; then
        if is_running_local "QuorumPeerMain"; then
            log_info "[$NODE] 停止 ZooKeeper..."
            source /etc/profile && zkServer.sh stop \
                && log_ok "[$NODE] ZooKeeper 已停止" \
                || log_error "[$NODE] ZooKeeper 停止失败"
        else
            log_skip "[$NODE] ZooKeeper"
        fi
    else
        if is_running_remote "$NODE" "QuorumPeerMain"; then
            log_info "[$NODE] 停止 ZooKeeper..."
            ssh "$NODE" "${SSH_ENV}zkServer.sh stop" \
                && log_ok "[$NODE] ZooKeeper 已停止" \
                || log_error "[$NODE] ZooKeeper 停止失败"
        else
            log_skip "[$NODE] ZooKeeper"
        fi
    fi
done

wait_sec 3


# =============================================================
#  第 1 步(逆序):停止 YARN 和 HDFS
# =============================================================
log_step "第 8 步(逆序):停止 YARN 和 HDFS"

if is_running_remote "$SLAVE1" "ResourceManager"; then
    log_info "在 slave1 停止 YARN..."
    ssh "$SLAVE1" "${SSH_ENV}stop-yarn.sh" \
        && log_ok "YARN 已停止" \
        || log_error "YARN 停止失败"
else
    log_skip "YARN ResourceManager"
fi

wait_sec 3

if is_running_local "NameNode"; then
    log_info "停止 HDFS..."
    source /etc/profile && stop-dfs.sh \
        && log_ok "HDFS 已停止" \
        || log_error "HDFS 停止失败"
else
    log_skip "HDFS NameNode"
fi


# =============================================================
#  清理 PID 文件
# =============================================================
if [ -f "$PID_FILE" ]; then
    rm -f "$PID_FILE"
    log_info "已清理 PID 文件:$PID_FILE"
fi


# =============================================================
#  回滚完成汇总
# =============================================================
echo ""
echo -e "${GREEN}============================================${NC}"
echo -e "${GREEN}  ✅  所有服务已停止!${NC}"
echo -e "${GREEN}============================================${NC}"
echo ""
echo -e "  各节点最终进程状态(通过 SSH 检查):"
for NODE in master "$SLAVE1" "$SLAVE2"; do
    if [ "$NODE" = "master" ]; then
        PROCS=$(source /etc/profile && jps 2>/dev/null | grep -v Jps)
    else
        PROCS=$(ssh "$NODE" "source /etc/profile && jps 2>/dev/null | grep -v Jps")
    fi
    echo -e "  ${CYAN}[$NODE]${NC}"
    if [ -z "$PROCS" ]; then
        echo -e "    (无 Java 进程)✅"
    else
        echo "$PROCS" | while read -r line; do
            echo -e "    ${YELLOW}$line${NC}"
        done
    fi
done
echo ""


五、常见问题排查


现象排查方向
大屏无数据Kafka 未启动 / 模拟脚本未运行
MySQL 无数据Flink 作业未提交或已失败
ClickHouse 预测为空Spark ML 作业未运行
接口请求报错Spring Boot 后端未启动
前端白屏后端端口配置错误(默认 8080)
Flink 提交报类冲突检查 flink-conf.yaml 类加载配置
Hive 连接失败Metastore 端口 9083 未启动



六、系统数据流向

ChatGPT Image 2026年5月27日 11_48_34



发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

Powered By Z-BlogPHP 1.7.3

版权:李翔
备案/许可证编号为:新ICP备2024006115号-1