从单机到集群:MQTT 百万级连接的架构演进与实战指南

  • 时间:2025-11-15 18:55 作者: 来源: 阅读:0
  • 扫一扫,手机访问
摘要:引言:为什么 MQTT 集群是物联网时代的刚需? 在物联网(IoT)爆发的今天,设备连接数呈指数级增长。根据 IDC 预测,到 2025 年全球物联网设备将突破 750 亿台。这些设备需要高效、可靠的通信协议,MQTT(Message Queuing Telemetry Transport)凭借轻量、低带宽占用的特性成为首选。但单机 MQTT 服务器能支撑的连接数有限(通常在 10 万级),如何

引言:为什么 MQTT 集群是物联网时代的刚需?

在物联网(IoT)爆发的今天,设备连接数呈指数级增长。根据 IDC 预测,到 2025 年全球物联网设备将突破 750 亿台。这些设备需要高效、可靠的通信协议,MQTT(Message Queuing Telemetry Transport)凭借轻量、低带宽占用的特性成为首选。但单机 MQTT 服务器能支撑的连接数有限(通常在 10 万级),如何突破这个瓶颈?答案就是MQTT 集群

本文将从底层原理到实战部署,全方位解析 MQTT 集群的设计与实现,带你掌握支撑百万级设备连接的核心技术。

一、MQTT 基础:集群设计的前提认知

1.1 MQTT 协议核心概念

MQTT 是基于发布 - 订阅(Pub/Sub)模式的轻量级协议,核心组件包括:

客户端(Client):发送消息(发布者)或接收消息(订阅者)的设备 / 应用服务器(Broker):接收客户端连接,转发消息的中间节点主题(Topic):消息的分类标识,如 "sensor/temp/room1"QoS(Quality of Service):消息传输质量等级(0/1/2)

1.2 单机 Broker 的局限性

即使是性能优异的 MQTT 服务器(如 EMQX、Mosquitto),单机也会面临以下瓶颈:

连接数上限:受内存和文件描述符限制吞吐量瓶颈:CPU 和网络 IO 的物理限制单点故障:一旦宕机,整个系统瘫痪地理限制:远距离设备连接延迟高

1.3 MQTT 集群的核心目标

集群化部署需实现三大目标:

高可用(High Availability):避免单点故障水平扩展(Horizontal Scaling):通过增加节点提升容量负载均衡(Load Balancing):均匀分配客户端连接

二、MQTT 集群架构:从理论到实践

2.1 集群核心挑战

构建 MQTT 集群面临三个关键问题:

会话共享:客户端重连到不同节点时需恢复会话状态消息路由:确保订阅者能收到所有相关消息,无论连接到哪个节点负载均衡:如何将客户端连接合理分配到集群节点

2.2 主流集群架构对比

2.2.1 共享存储架构

原理:所有节点通过共享存储(如 Redis、ZooKeeper)同步会话和订阅信息

优点:架构简单,易于扩展缺点:共享存储可能成为瓶颈,跨节点消息路由效率低

2.2.2 集群互联架构

原理:节点间通过专用协议直接通信,同步订阅和路由信息

优点:无中心瓶颈,消息路由效率高缺点:节点发现和网络配置复杂

2.2.3 混合架构(主流方案)

原理:节点间直接通信,同时使用元数据存储维护集群状态

优点:兼顾扩展性和可靠性,主流 MQTT 服务器(如 EMQX)采用缺点:实现复杂度高

三、EMQX 集群实战:搭建高可用 MQTT 服务

EMQX 是目前最流行的开源 MQTT 服务器,支持百万级并发连接,我们以 EMQX 5.3.0(最新稳定版)为例,搭建一个高可用集群。

3.1 环境准备

节点IP 地址角色硬件配置
emqx-1192.168.1.101集群节点4 核 8G
emqx-2192.168.1.102集群节点4 核 8G
emqx-3192.168.1.103集群节点4 核 8G
lb192.168.1.100负载均衡器2 核 4G

操作系统:Ubuntu 22.04 LTSDocker 版本:24.0.7Docker Compose 版本:2.21.0

3.2 单节点 EMQX 部署

首先在每个节点安装 EMQX:



# 拉取EMQX镜像
docker pull emqx/emqx:5.3.0
 
# 启动单节点EMQX
docker run -d 
  --name emqx 
  -p 1883:1883   # MQTT TCP端口
  -p 8083:8083   # MQTT WebSocket端口
  -p 8084:8084   # MQTT Secure WebSocket端口
  -p 8883:8883   # MQTT TLS端口
  -p 18083:18083   # 管理控制台端口
  -e EMQX_NODE_NAME="emqx@192.168.1.101"   # 节点名称,格式为"emqx@IP"
  -e EMQX_DASHBOARD__DEFAULT_USER__PASSWORD="public"   # 控制台密码
  emqx/emqx:5.3.0

访问 http://192.168.1.101:18083,使用用户名 admin和密码 public登录控制台,确认单节点正常运行。

3.3 构建 EMQX 集群

EMQX 支持多种集群发现方式,这里使用静态节点列表方式:

在 emqx-1 节点执行,将 emqx-2 和 emqx-3 加入集群:


# 进入emqx-1容器
docker exec -it emqx sh
 
# 加入emqx-2节点
emqx_ctl cluster join emqx@192.168.1.102
 
# 加入emqx-3节点
emqx_ctl cluster join emqx@192.168.1.103
验证集群状态:

emqx_ctl cluster status

预期输出:



Cluster status: #{running_nodes => ['emqx@192.168.1.101','emqx@192.168.1.102','emqx@192.168.1.103'],
                  stopped_nodes => []}

3.4 配置负载均衡器

使用 Nginx 作为 TCP 负载均衡器,配置如下:



# /etc/nginx/nginx.conf
worker_processes auto;
events {
    worker_connections  10240;
}
 
stream {
    upstream mqtt_cluster {
        # 轮询策略
        server 192.168.1.101:1883 weight=1 max_fails=2 fail_timeout=30s;
        server 192.168.1.102:1883 weight=1 max_fails=2 fail_timeout=30s;
        server 192.168.1.103:1883 weight=1 max_fails=2 fail_timeout=30s;
    }
    
    server {
        listen 1883;
        proxy_pass mqtt_cluster;
        # 保持长连接
        proxy_timeout 120s;
        proxy_connect_timeout 10s;
    }
}

启动 Nginx:



docker run -d 
  --name nginx-mqtt-lb 
  -p 1883:1883 
  -v /etc/nginx/nginx.conf:/etc/nginx/nginx.conf 
  nginx:1.25.3

3.5 集群数据同步验证

编写一个简单的 Java 程序验证集群功能,使用 Eclipse Paho MQTT 客户端(3.4.0 版本):

首先在 pom.xml 中添加依赖:



<dependencies>
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>3.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.30</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.9</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.4.8</version>
    </dependency>
</dependencies>

创建发布者类:



import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
/**
 * MQTT消息发布者,用于验证集群消息同步
 * @author ken
 */
@Slf4j
public class MqttPublisher {
    // 负载均衡器地址
    private static final String BROKER = "tcp://192.168.1.100:1883";
    // 客户端ID,需唯一
    private static final String CLIENT_ID = "publisher-" + System.currentTimeMillis();
    // 主题
    private static final String TOPIC = "cluster/test";
    // QoS等级
    private static final int QOS = 1;
 
    public static void main(String[] args) throws MqttException, InterruptedException {
        // 创建连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);
        connOpts.setConnectionTimeout(10);
        connOpts.setKeepAliveInterval(60);
 
        // 创建客户端
        MqttClient client = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
        
        // 连接服务器
        log.info("连接到MQTT服务器: {}", BROKER);
        client.connect(connOpts);
        log.info("连接成功");
 
        // 发布消息
        for (int i = 0; i < 10; i++) {
            String content = "集群测试消息 " + i;
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(QOS);
            client.publish(TOPIC, message);
            log.info("已发布消息: {}", content);
            Thread.sleep(1000);
        }
 
        // 断开连接
        client.disconnect();
        log.info("已断开连接");
        client.close();
    }
}

创建订阅者类:



import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
/**
 * MQTT消息订阅者,用于验证集群消息同步
 * @author ken
 */
@Slf4j
public class MqttSubscriber {
    // 负载均衡器地址
    private static final String BROKER = "tcp://192.168.1.100:1883";
    // 客户端ID,需唯一
    private static final String CLIENT_ID = "subscriber-" + System.currentTimeMillis();
    // 主题
    private static final String TOPIC = "cluster/test";
    // QoS等级
    private static final int QOS = 1;
 
    public static void main(String[] args) throws MqttException {
        // 创建连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);
        connOpts.setConnectionTimeout(10);
        connOpts.setKeepAliveInterval(60);
 
        // 创建客户端
        MqttClient client = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
        
        // 设置回调
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                log.error("连接丢失", cause);
            }
 
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                log.info("收到消息 - 主题: {}, 内容: {}", topic, new String(message.getPayload()));
            }
 
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // 对于订阅者,此方法不常用
            }
        });
 
        // 连接服务器
        log.info("连接到MQTT服务器: {}", BROKER);
        client.connect(connOpts);
        log.info("连接成功");
 
        // 订阅主题
        client.subscribe(TOPIC, QOS);
        log.info("已订阅主题: {}", TOPIC);
 
        // 保持连接
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            log.error("线程中断", e);
        }
 
        // 断开连接
        client.disconnect();
        log.info("已断开连接");
        client.close();
    }
}

测试步骤:

启动两个订阅者(连接到不同集群节点)启动一个发布者(连接到任意节点)观察两个订阅者是否都能收到所有消息

如果两个订阅者都能收到消息,说明集群消息同步正常。

四、集群核心机制:消息路由与会话共享

4.1 消息路由原理

MQTT 集群的消息路由需解决一个核心问题:当发布者和订阅者连接到不同节点时,如何确保消息能正确送达。

EMQX 采用基于主题树的路由表实现跨节点消息路由:

路由表维护了 "主题 - 节点" 映射关系,当节点收到新订阅时,会将订阅信息同步到整个集群,更新所有节点的路由表。

4.2 会话共享机制

MQTT 客户端通过 cleanSession标志控制会话是否持久化:

cleanSession=true:客户端断开连接后,会话信息(订阅、未确认消息)会被删除 cleanSession=false:客户端断开连接后,会话信息会被保留

在集群环境中,会话信息需要在节点间共享,EMQX 通过以下机制实现:

会话归属:每个客户端会话由固定节点负责(基于客户端 ID 哈希)会话同步:会话创建节点会将会话信息同步到其他节点会话迁移:当客户端重连到新节点时,新节点会从原归属节点获取会话信息

4.3 集群分区与脑裂处理

集群节点间通过心跳检测彼此状态,当网络故障导致集群分裂成多个子集群(脑裂)时,EMQX 采用投票机制处理:

每个节点定期向其他节点发送心跳当节点超过阈值时间未收到多数节点的心跳,会认为自己处于 minority 分区minority 分区的节点会自动停止服务,避免数据不一致

五、性能优化:让集群支撑百万级连接

5.1 JVM 参数优化

EMQX 基于 Erlang VM 运行,合理配置 VM 参数对性能至关重要:



# emqx/etc/emqx.conf
node.process_limit = 2097152  # 最大进程数
node.max_ports = 1048576      # 最大端口数
vm.args += +A 128             # 异步线程池大小
vm.args += +Q 65536           # 端口队列大小
vm.args += +P 2097152         # 最大进程数
vm.args += -env ERL_MAX_ETS_TABLES 20000  # 最大ETS表数量

5.2 连接参数调优

调整 MQTT 连接相关参数,提高并发能力:



# 最大连接数
listener.tcp.default.max_connections = 1024000
 
# 心跳超时时间
listener.tcp.default.keepalive = 300s
 
# 连接握手超时
listener.tcp.default.handshake_timeout = 15s
 
# 消息队列长度
zone.external.max_mqueue_len = 10000
 
# 消息队列溢出策略
zone.external.mqueue_overload_policy = drop_oldest

5.3 持久化优化

EMQX 支持多种持久化后端,根据场景选择:

内置数据库(默认):适合中小规模部署Redis:适合需要高吞吐量的场景MySQL/PostgreSQL:适合需要事务支持的场景

配置 Redis 持久化:



# 启用Redis持久化
persistence.redis.enabled = true
 
# Redis服务器地址
persistence.redis.server = "redis://192.168.1.200:6379"
 
# Redis数据库编号
persistence.redis.database = 0
 
# 密码(如有)
persistence.redis.password = "your_password"
 
# 连接池大小
persistence.redis.pool_size = 32

5.4 负载测试

使用 EMQX 提供的 emqx_bench工具进行负载测试:



# 测试10万个连接
emqx_bench conn -c 100000 -h 192.168.1.100 -p 1883
 
# 测试消息吞吐量(100个客户端,每个客户端每秒发送10条消息)
emqx_bench pub -c 100 -I 100 -t "bench/%c" -h 192.168.1.100 -p 1883

预期在 3 节点集群(每节点 4 核 8G)下,可支持:

并发连接:100 万 +消息吞吐量:10 万条 / 秒 +

六、监控与运维:保障集群稳定运行

6.1 监控指标体系

构建完善的监控指标体系,关键指标包括:

类别关键指标阈值
连接总连接数、新连接速率、断开连接速率不超过最大连接数的 80%
消息发布 / 订阅速率、消息延迟、消息丢弃数延迟 < 100ms,丢弃数 = 0
系统CPU 使用率、内存使用率、网络 IOCPU<80%,内存 < 80%
集群节点状态、网络分区、数据同步延迟无网络分区,同步延迟 < 1s

6.2 基于 Prometheus+Grafana 的监控方案

配置 EMQX 暴露 Prometheus 指标:


# emqx/etc/emqx.conf
prometheus.enable = true
prometheus.interval = 15s
prometheus.listen_address = 0.0.0.0:18083
配置 Prometheus 抓取规则:


# prometheus.yml
scrape_configs:
  - job_name: 'emqx-cluster'
    static_configs:
      - targets: ['192.168.1.101:18083', '192.168.1.102:18083', '192.168.1.103:18083']
    scrape_interval: 15s
导入 EMQX 官方 Grafana 仪表盘(ID: 10441),可视化监控数据。

6.3 日志管理

配置集中式日志收集,使用 ELK 栈(Elasticsearch+Logstash+Kibana):

配置 EMQX 日志输出格式:


# emqx/etc/emqx.conf
log.to = file
log.level = warning
log.file.dir = /var/log/emqx
log.file.name = emqx.log
log.file.rotation.count = 5
log.file.rotation.size = 100MB
log.format = json
使用 Filebeat 收集日志并发送到 Elasticsearch:


# filebeat.yml
filebeat.inputs:
- type: log
  paths:
    - /var/log/emqx/*.log
output.elasticsearch:
  hosts: ["192.168.1.201:9200"]
  index: "emqx-logs-%{+yyyy.MM.dd}"

6.4 自动扩缩容

在 Kubernetes 环境下,可配置 HPA(Horizontal Pod Autoscaler)实现自动扩缩容:



apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: emqx-cluster
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: StatefulSet
    name: emqx
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

七、高级特性:提升集群能力的关键功能

7.1 共享订阅

当多个订阅者订阅同一主题时,共享订阅可确保消息只被其中一个订阅者接收,避免重复处理:

使用方式:订阅 $share/{group}/{topic}格式的主题,例如 $share/sensor_group/sensor/temp

Java 代码示例:



// 共享订阅示例
client.subscribe("$share/sensor_group/sensor/temp", QOS);

7.2 延迟发布

延迟发布允许消息在指定时间后才被送达订阅者,适用于定时任务场景:

使用方式:发布到 $delayed/{delay}/{topic}格式的主题,例如 $delayed/60/sensor/alarm表示 60 秒后发布到 sensor/alarm

Java 代码示例:



// 延迟发布示例(60秒后发布)
client.publish("$delayed/60/sensor/alarm", "高温警报".getBytes(), QOS, false);

7.3 桥接功能

MQTT 桥接可连接多个 MQTT 集群,实现跨集群消息同步:

配置 EMQX 桥接:



# 北京集群配置连接上海集群
bridge.mqtt.shanghai_bridge.address = "192.168.2.100:1883"
bridge.mqtt.shanghai_bridge.clientid = "beijing_bridge"
bridge.mqtt.shanghai_bridge.username = "bridge_user"
bridge.mqtt.shanghai_bridge.password = "bridge_pass"
bridge.mqtt.shanghai_bridge.forwards = ["sensor/#"]
bridge.mqtt.shanghai_bridge.subscribe.qos = 1
bridge.mqtt.shanghai_bridge.reconnect_interval = 30s

八、常见问题与解决方案

8.1 集群节点无法加入

问题现象:执行 emqx_ctl cluster join命令失败

可能原因

节点间网络不通(检查 1883、4369、5369 端口)节点 Cookie 不一致(所有节点必须使用相同的 Cookie)节点名称格式错误(必须是 "emqx@IP 地址" 格式)

解决方案



# 检查节点Cookie
emqx_ctl status | grep Cookie
 
# 修改节点Cookie(需重启)
echo "mysecretcookie" > /opt/emqx/data/configs/cluster_cookie

8.2 消息丢失

问题现象:发布的消息未被订阅者收到

可能原因

QoS 等级设置不当(QoS 0 不保证送达)订阅者离线且未设置持久会话消息队列溢出(超过 max_mqueue_len)集群网络分区

解决方案



# 检查消息丢弃统计
emqx_ctl metrics show | grep "messages.dropped"
 
# 调整消息队列大小
emqx_ctl zone update external max_mqueue_len 20000

8.3 连接频繁断开

问题现象:客户端连接频繁断开,出现 "Connection lost" 错误

可能原因

心跳设置不合理(客户端与服务器不一致)网络不稳定服务器资源耗尽(文件描述符、内存等)负载均衡器超时设置过短

解决方案



# 检查文件描述符使用情况
emqx_ctl listeners show | grep "max_conn"
 
# 检查连接断开原因统计
emqx_ctl metrics show | grep "connections.closed"

九、总结与展望

MQTT 集群是支撑大规模物联网部署的关键技术,通过本文的讲解,你应该已经掌握了:

MQTT 集群的核心架构与工作原理如何搭建高可用的 EMQX 集群集群性能优化与监控运维的关键实践解决常见问题的方法与技巧

希望本文能帮助你构建稳定、高效的 MQTT 集群,为你的物联网项目提供坚实的通信基础。

附录:参考资料

EMQX 官方文档:https://docs.emqx.com/zh/emqx/latest/MQTT 协议规范:https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html《MQTT 实战》(Katherine Oliver)《物联网通信技术:MQTT 与 CoAP》(刘川意)
  • 全部评论(0)
最新发布的资讯信息
【系统环境|】Docker安装部署Oracle/Sql Server(2025-11-15 19:07)
【系统环境|】Python教程(五):字符串操作 — 从基础到格式化(2025-11-15 19:07)
【系统环境|】Python极速入门指南(附避坑攻略与实战清单)(2025-11-15 19:06)
【系统环境|】GeoServer安装及使用教程(2025-11-15 19:06)
【系统环境|】【干货】微信搜一搜的排名规则解析+微信SEO操作指南(2025-11-15 19:05)
【系统环境|】通过anaconda安装python及人工智能框架pytorch安装(2025-11-15 19:05)
【系统环境|】【Dify系】30分钟部署生产级Dify全攻略:避坑指南(2025-11-15 19:04)
【系统环境|】Nuxt @nuxt/icon 模块完全指南:轻松集成图标系统(2025-11-15 19:04)
【系统环境|】yarn 的安装和使用(2025-11-15 19:03)
【系统环境|】Spring Batch避坑指南:精准规避常见雷区,让批处理性能飙升!(2025-11-15 19:03)
手机二维码手机访问领取大礼包
返回顶部