在物联网(IoT)爆发的今天,设备连接数呈指数级增长。根据 IDC 预测,到 2025 年全球物联网设备将突破 750 亿台。这些设备需要高效、可靠的通信协议,MQTT(Message Queuing Telemetry Transport)凭借轻量、低带宽占用的特性成为首选。但单机 MQTT 服务器能支撑的连接数有限(通常在 10 万级),如何突破这个瓶颈?答案就是MQTT 集群。
本文将从底层原理到实战部署,全方位解析 MQTT 集群的设计与实现,带你掌握支撑百万级设备连接的核心技术。
MQTT 是基于发布 - 订阅(Pub/Sub)模式的轻量级协议,核心组件包括:
客户端(Client):发送消息(发布者)或接收消息(订阅者)的设备 / 应用服务器(Broker):接收客户端连接,转发消息的中间节点主题(Topic):消息的分类标识,如 "sensor/temp/room1"QoS(Quality of Service):消息传输质量等级(0/1/2)即使是性能优异的 MQTT 服务器(如 EMQX、Mosquitto),单机也会面临以下瓶颈:
连接数上限:受内存和文件描述符限制吞吐量瓶颈:CPU 和网络 IO 的物理限制单点故障:一旦宕机,整个系统瘫痪地理限制:远距离设备连接延迟高集群化部署需实现三大目标:
高可用(High Availability):避免单点故障水平扩展(Horizontal Scaling):通过增加节点提升容量负载均衡(Load Balancing):均匀分配客户端连接构建 MQTT 集群面临三个关键问题:
会话共享:客户端重连到不同节点时需恢复会话状态消息路由:确保订阅者能收到所有相关消息,无论连接到哪个节点负载均衡:如何将客户端连接合理分配到集群节点
原理:所有节点通过共享存储(如 Redis、ZooKeeper)同步会话和订阅信息
优点:架构简单,易于扩展缺点:共享存储可能成为瓶颈,跨节点消息路由效率低

原理:节点间通过专用协议直接通信,同步订阅和路由信息
优点:无中心瓶颈,消息路由效率高缺点:节点发现和网络配置复杂

原理:节点间直接通信,同时使用元数据存储维护集群状态
优点:兼顾扩展性和可靠性,主流 MQTT 服务器(如 EMQX)采用缺点:实现复杂度高
EMQX 是目前最流行的开源 MQTT 服务器,支持百万级并发连接,我们以 EMQX 5.3.0(最新稳定版)为例,搭建一个高可用集群。
| 节点 | IP 地址 | 角色 | 硬件配置 |
|---|---|---|---|
| emqx-1 | 192.168.1.101 | 集群节点 | 4 核 8G |
| emqx-2 | 192.168.1.102 | 集群节点 | 4 核 8G |
| emqx-3 | 192.168.1.103 | 集群节点 | 4 核 8G |
| lb | 192.168.1.100 | 负载均衡器 | 2 核 4G |
操作系统:Ubuntu 22.04 LTSDocker 版本:24.0.7Docker Compose 版本:2.21.0
首先在每个节点安装 EMQX:
# 拉取EMQX镜像
docker pull emqx/emqx:5.3.0
# 启动单节点EMQX
docker run -d
--name emqx
-p 1883:1883 # MQTT TCP端口
-p 8083:8083 # MQTT WebSocket端口
-p 8084:8084 # MQTT Secure WebSocket端口
-p 8883:8883 # MQTT TLS端口
-p 18083:18083 # 管理控制台端口
-e EMQX_NODE_NAME="emqx@192.168.1.101" # 节点名称,格式为"emqx@IP"
-e EMQX_DASHBOARD__DEFAULT_USER__PASSWORD="public" # 控制台密码
emqx/emqx:5.3.0
访问
http://192.168.1.101:18083,使用用户名
admin和密码
public登录控制台,确认单节点正常运行。
EMQX 支持多种集群发现方式,这里使用静态节点列表方式:
在 emqx-1 节点执行,将 emqx-2 和 emqx-3 加入集群:
# 进入emqx-1容器
docker exec -it emqx sh
# 加入emqx-2节点
emqx_ctl cluster join emqx@192.168.1.102
# 加入emqx-3节点
emqx_ctl cluster join emqx@192.168.1.103
验证集群状态:
emqx_ctl cluster status
预期输出:
Cluster status: #{running_nodes => ['emqx@192.168.1.101','emqx@192.168.1.102','emqx@192.168.1.103'],
stopped_nodes => []}
使用 Nginx 作为 TCP 负载均衡器,配置如下:
# /etc/nginx/nginx.conf
worker_processes auto;
events {
worker_connections 10240;
}
stream {
upstream mqtt_cluster {
# 轮询策略
server 192.168.1.101:1883 weight=1 max_fails=2 fail_timeout=30s;
server 192.168.1.102:1883 weight=1 max_fails=2 fail_timeout=30s;
server 192.168.1.103:1883 weight=1 max_fails=2 fail_timeout=30s;
}
server {
listen 1883;
proxy_pass mqtt_cluster;
# 保持长连接
proxy_timeout 120s;
proxy_connect_timeout 10s;
}
}
启动 Nginx:
docker run -d
--name nginx-mqtt-lb
-p 1883:1883
-v /etc/nginx/nginx.conf:/etc/nginx/nginx.conf
nginx:1.25.3
编写一个简单的 Java 程序验证集群功能,使用 Eclipse Paho MQTT 客户端(3.4.0 版本):
首先在 pom.xml 中添加依赖:
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.8</version>
</dependency>
</dependencies>
创建发布者类:
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* MQTT消息发布者,用于验证集群消息同步
* @author ken
*/
@Slf4j
public class MqttPublisher {
// 负载均衡器地址
private static final String BROKER = "tcp://192.168.1.100:1883";
// 客户端ID,需唯一
private static final String CLIENT_ID = "publisher-" + System.currentTimeMillis();
// 主题
private static final String TOPIC = "cluster/test";
// QoS等级
private static final int QOS = 1;
public static void main(String[] args) throws MqttException, InterruptedException {
// 创建连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
connOpts.setConnectionTimeout(10);
connOpts.setKeepAliveInterval(60);
// 创建客户端
MqttClient client = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
// 连接服务器
log.info("连接到MQTT服务器: {}", BROKER);
client.connect(connOpts);
log.info("连接成功");
// 发布消息
for (int i = 0; i < 10; i++) {
String content = "集群测试消息 " + i;
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(QOS);
client.publish(TOPIC, message);
log.info("已发布消息: {}", content);
Thread.sleep(1000);
}
// 断开连接
client.disconnect();
log.info("已断开连接");
client.close();
}
}
创建订阅者类:
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* MQTT消息订阅者,用于验证集群消息同步
* @author ken
*/
@Slf4j
public class MqttSubscriber {
// 负载均衡器地址
private static final String BROKER = "tcp://192.168.1.100:1883";
// 客户端ID,需唯一
private static final String CLIENT_ID = "subscriber-" + System.currentTimeMillis();
// 主题
private static final String TOPIC = "cluster/test";
// QoS等级
private static final int QOS = 1;
public static void main(String[] args) throws MqttException {
// 创建连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
connOpts.setConnectionTimeout(10);
connOpts.setKeepAliveInterval(60);
// 创建客户端
MqttClient client = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
// 设置回调
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
log.error("连接丢失", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("收到消息 - 主题: {}, 内容: {}", topic, new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 对于订阅者,此方法不常用
}
});
// 连接服务器
log.info("连接到MQTT服务器: {}", BROKER);
client.connect(connOpts);
log.info("连接成功");
// 订阅主题
client.subscribe(TOPIC, QOS);
log.info("已订阅主题: {}", TOPIC);
// 保持连接
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
log.error("线程中断", e);
}
// 断开连接
client.disconnect();
log.info("已断开连接");
client.close();
}
}
测试步骤:
启动两个订阅者(连接到不同集群节点)启动一个发布者(连接到任意节点)观察两个订阅者是否都能收到所有消息如果两个订阅者都能收到消息,说明集群消息同步正常。
MQTT 集群的消息路由需解决一个核心问题:当发布者和订阅者连接到不同节点时,如何确保消息能正确送达。
EMQX 采用基于主题树的路由表实现跨节点消息路由:

路由表维护了 "主题 - 节点" 映射关系,当节点收到新订阅时,会将订阅信息同步到整个集群,更新所有节点的路由表。
MQTT 客户端通过
cleanSession标志控制会话是否持久化:
cleanSession=true:客户端断开连接后,会话信息(订阅、未确认消息)会被删除
cleanSession=false:客户端断开连接后,会话信息会被保留
在集群环境中,会话信息需要在节点间共享,EMQX 通过以下机制实现:
会话归属:每个客户端会话由固定节点负责(基于客户端 ID 哈希)会话同步:会话创建节点会将会话信息同步到其他节点会话迁移:当客户端重连到新节点时,新节点会从原归属节点获取会话信息
集群节点间通过心跳检测彼此状态,当网络故障导致集群分裂成多个子集群(脑裂)时,EMQX 采用投票机制处理:
每个节点定期向其他节点发送心跳当节点超过阈值时间未收到多数节点的心跳,会认为自己处于 minority 分区minority 分区的节点会自动停止服务,避免数据不一致
EMQX 基于 Erlang VM 运行,合理配置 VM 参数对性能至关重要:
# emqx/etc/emqx.conf
node.process_limit = 2097152 # 最大进程数
node.max_ports = 1048576 # 最大端口数
vm.args += +A 128 # 异步线程池大小
vm.args += +Q 65536 # 端口队列大小
vm.args += +P 2097152 # 最大进程数
vm.args += -env ERL_MAX_ETS_TABLES 20000 # 最大ETS表数量
调整 MQTT 连接相关参数,提高并发能力:
# 最大连接数
listener.tcp.default.max_connections = 1024000
# 心跳超时时间
listener.tcp.default.keepalive = 300s
# 连接握手超时
listener.tcp.default.handshake_timeout = 15s
# 消息队列长度
zone.external.max_mqueue_len = 10000
# 消息队列溢出策略
zone.external.mqueue_overload_policy = drop_oldest
EMQX 支持多种持久化后端,根据场景选择:
内置数据库(默认):适合中小规模部署Redis:适合需要高吞吐量的场景MySQL/PostgreSQL:适合需要事务支持的场景配置 Redis 持久化:
# 启用Redis持久化
persistence.redis.enabled = true
# Redis服务器地址
persistence.redis.server = "redis://192.168.1.200:6379"
# Redis数据库编号
persistence.redis.database = 0
# 密码(如有)
persistence.redis.password = "your_password"
# 连接池大小
persistence.redis.pool_size = 32
使用 EMQX 提供的
emqx_bench工具进行负载测试:
# 测试10万个连接
emqx_bench conn -c 100000 -h 192.168.1.100 -p 1883
# 测试消息吞吐量(100个客户端,每个客户端每秒发送10条消息)
emqx_bench pub -c 100 -I 100 -t "bench/%c" -h 192.168.1.100 -p 1883
预期在 3 节点集群(每节点 4 核 8G)下,可支持:
并发连接:100 万 +消息吞吐量:10 万条 / 秒 +构建完善的监控指标体系,关键指标包括:
| 类别 | 关键指标 | 阈值 |
|---|---|---|
| 连接 | 总连接数、新连接速率、断开连接速率 | 不超过最大连接数的 80% |
| 消息 | 发布 / 订阅速率、消息延迟、消息丢弃数 | 延迟 < 100ms,丢弃数 = 0 |
| 系统 | CPU 使用率、内存使用率、网络 IO | CPU<80%,内存 < 80% |
| 集群 | 节点状态、网络分区、数据同步延迟 | 无网络分区,同步延迟 < 1s |
# emqx/etc/emqx.conf
prometheus.enable = true
prometheus.interval = 15s
prometheus.listen_address = 0.0.0.0:18083
配置 Prometheus 抓取规则:
# prometheus.yml
scrape_configs:
- job_name: 'emqx-cluster'
static_configs:
- targets: ['192.168.1.101:18083', '192.168.1.102:18083', '192.168.1.103:18083']
scrape_interval: 15s
导入 EMQX 官方 Grafana 仪表盘(ID: 10441),可视化监控数据。
配置集中式日志收集,使用 ELK 栈(Elasticsearch+Logstash+Kibana):
配置 EMQX 日志输出格式:
# emqx/etc/emqx.conf
log.to = file
log.level = warning
log.file.dir = /var/log/emqx
log.file.name = emqx.log
log.file.rotation.count = 5
log.file.rotation.size = 100MB
log.format = json
使用 Filebeat 收集日志并发送到 Elasticsearch:
# filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/emqx/*.log
output.elasticsearch:
hosts: ["192.168.1.201:9200"]
index: "emqx-logs-%{+yyyy.MM.dd}"
在 Kubernetes 环境下,可配置 HPA(Horizontal Pod Autoscaler)实现自动扩缩容:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: emqx-cluster
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: StatefulSet
name: emqx
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
当多个订阅者订阅同一主题时,共享订阅可确保消息只被其中一个订阅者接收,避免重复处理:

使用方式:订阅
$share/{group}/{topic}格式的主题,例如
$share/sensor_group/sensor/temp
Java 代码示例:
// 共享订阅示例
client.subscribe("$share/sensor_group/sensor/temp", QOS);
延迟发布允许消息在指定时间后才被送达订阅者,适用于定时任务场景:

使用方式:发布到
$delayed/{delay}/{topic}格式的主题,例如
$delayed/60/sensor/alarm表示 60 秒后发布到
sensor/alarm
Java 代码示例:
// 延迟发布示例(60秒后发布)
client.publish("$delayed/60/sensor/alarm", "高温警报".getBytes(), QOS, false);
MQTT 桥接可连接多个 MQTT 集群,实现跨集群消息同步:

配置 EMQX 桥接:
# 北京集群配置连接上海集群
bridge.mqtt.shanghai_bridge.address = "192.168.2.100:1883"
bridge.mqtt.shanghai_bridge.clientid = "beijing_bridge"
bridge.mqtt.shanghai_bridge.username = "bridge_user"
bridge.mqtt.shanghai_bridge.password = "bridge_pass"
bridge.mqtt.shanghai_bridge.forwards = ["sensor/#"]
bridge.mqtt.shanghai_bridge.subscribe.qos = 1
bridge.mqtt.shanghai_bridge.reconnect_interval = 30s
问题现象:执行
emqx_ctl cluster join命令失败
可能原因:
节点间网络不通(检查 1883、4369、5369 端口)节点 Cookie 不一致(所有节点必须使用相同的 Cookie)节点名称格式错误(必须是 "emqx@IP 地址" 格式)解决方案:
# 检查节点Cookie
emqx_ctl status | grep Cookie
# 修改节点Cookie(需重启)
echo "mysecretcookie" > /opt/emqx/data/configs/cluster_cookie
问题现象:发布的消息未被订阅者收到
可能原因:
QoS 等级设置不当(QoS 0 不保证送达)订阅者离线且未设置持久会话消息队列溢出(超过 max_mqueue_len)集群网络分区解决方案:
# 检查消息丢弃统计
emqx_ctl metrics show | grep "messages.dropped"
# 调整消息队列大小
emqx_ctl zone update external max_mqueue_len 20000
问题现象:客户端连接频繁断开,出现 "Connection lost" 错误
可能原因:
心跳设置不合理(客户端与服务器不一致)网络不稳定服务器资源耗尽(文件描述符、内存等)负载均衡器超时设置过短解决方案:
# 检查文件描述符使用情况
emqx_ctl listeners show | grep "max_conn"
# 检查连接断开原因统计
emqx_ctl metrics show | grep "connections.closed"
MQTT 集群是支撑大规模物联网部署的关键技术,通过本文的讲解,你应该已经掌握了:
MQTT 集群的核心架构与工作原理如何搭建高可用的 EMQX 集群集群性能优化与监控运维的关键实践解决常见问题的方法与技巧希望本文能帮助你构建稳定、高效的 MQTT 集群,为你的物联网项目提供坚实的通信基础。