参考:千问大模型
在现代仓储物流(WMS)、订单管理(OMS)或电商平台中,首页仪表盘、运营看板、搜索列表等“读多写少”场景极为常见。面对“既要快、又要准、还得稳”的业务诉求,技术团队常陷入一个灵魂拷问:
到底该用实时查库?上缓存?跑定时任务?还是建数据仓库?
本文将结合真实业务场景(如“每小时各仓拣货完成率”),系统梳理四种主流技术路径的适用边界、组合策略与避坑指南,并回答一个高频疑问:什么时候该上 Binlog 实时同步到 Elasticsearch?
在动手前,先回答以下五个问题:
| 问题 | 决策影响 |
|---|---|
| 1. 数据更新频率高吗? | 秒级变更 → 倾向实时;小时级 → 可预计算 |
| 2. 用户能容忍多久延迟? | <1s → 实时查;≤5分钟 → 缓存/批处理;≥1小时 → 数仓 |
| 3. 查询是否复杂? | 多表 JOIN、模糊匹配、多维筛选 → ES 或数仓 |
| 4. QPS 高不高?是否重复查询? | 高频重复 → 必须缓存 |
| 5. 是否需要历史趋势分析? | 需要钻取、对比 → 数仓是唯一解 |
✅ 示例:
GET /order/{orderId}→ 直接查 OMS 主库
⚠️ 注意:命中率低 ≠ TTL 太长!
此时缩短 TTL(如从 60s → 15s)是“主动换血”策略——加速冷数据淘汰,让热点更快进缓存,本质是用稍低的实时性换取更高的可用性。
✅ 示例:WMS 首页“今日拣货完成率” → Redis 缓存,TTL=60s,后台每5分钟预加载
@Scheduled 每 N 分钟跑一次SQL 聚合写入扁平化中间表(如
stats_picking_hourly)同步写入 Redis 供前端快速读取
优势:
将复杂查询压力转移到后台查询接口只需
SELECT * FROM flat_table WHERE ...
容灾设计:
查询范围扩大(如查最近2小时)防漏算提供“手动刷新”按钮应急
✅ 示例:“每小时各仓拣货完成率” → 每5分钟聚合一次,结果存 MySQL + Redis
sql
-- 中间表结构
CREATE TABLE picking_completion_hourly (
warehouse_code VARCHAR(20),
hour_slot DATETIME, -- '2025-11-23 14:00:00'
total_tasks INT,
completed_tasks INT,
completion_rate DECIMAL(5,2) AS (completed_tasks * 100.0 / NULLIF(total_tasks, 0)) STORED,
UNIQUE KEY uk_wh_hour (warehouse_code, hour_slot)
);
✅ 示例:“近7天各仓拣货人效趋势(按班次)” → 同步到数仓,BI 工具展示
| 场景 | 需求特点 | 为什么必须实时? |
|---|---|---|
| 1. 商品/订单搜索 | 关键词 + 多属性筛选 | 用户刚上架商品,5分钟后才能搜到?体验崩坏 |
| 2. 运营复杂筛选 | “近1小时未发货+金额>1000+广东” | DB 无合适索引,查不动;定时同步看不到最新单 |
| 3. 状态变更即时生效 | 商品下架立即消失、订单取消不再展示 | 延迟导致超卖、客诉 |
| 方案 | 适用场景 |
|---|---|
| Flink CDC | 新项目首选,支持 exactly-once、SQL ETL |
| Canal + 自研消费者 | 轻量老系统,需自己处理写入逻辑 |
| Debezium + Kafka + Logstash | 复杂数据管道,需多下游消费 |
✅ 示例:商品搜索同步
sql
编辑
-- Flink SQL 同步 system_item 表到 ES
INSERT INTO es_goods_index
SELECT id, sku, name, barcode, customer_code, status
FROM mysql_system_item
WHERE status = 'ON_SHELF';
delete by id批量写入:用 Bulk API,控制并发防 ES 雪崩最终一致:重要操作(如下单)仍需回查 DB 校验
以“每小时各仓拣货完成率”为例,推荐架构:
text
编辑
[MySQL picking_task]
↓ (每5分钟聚合)
[MySQL picking_completion_hourly] ←→ [Redis Hash + ZSet]
↑ ↓
(降级查询) [Spring Boot API]
↓
[前端 ECharts 轮询]
实时性:≤5 分钟性能:99% 请求命中 Redis容灾:Redis 挂了自动查 MySQL(带限流)扩展性:
要秒级?→ 改 Flink CDC 实时计算要多维分析?→ 同步一份到数仓
| 需求特征 | 推荐方案 |
|---|---|
| 强实时 + 点查 | 实时查 DB(读写分离) |
| 高频重复 + 汇总值 | Redis 缓存(TTL=30~60s) |
| 固定维度 + 分钟级延迟 | 定时任务 + 中间表 |
| 多维分析 + 历史趋势 | 数据仓库(T+1) |
| 全文检索 + 多条件筛选 | Binlog → Elasticsearch |
💡 黄金法则:
缓存只存“结果”,不存“过程”数仓只做“分析”,不做“操作”业务库只服务“交易”,不扛“报表”
最后建议:不要追求“一步到位”。先用 缓存 + 定时任务 快速上线,再根据监控指标(命中率、DB 负载、用户反馈)逐步演进。架构没有银弹,只有不断权衡的艺术。
你有具体的业务指标想优化吗?欢迎留言,我可以为你定制技术路径!
问题:一般WMS首页有一些图表展示单量,状态单量等等,用定时任务提前计算好,做数据仓库,实时性差,直接查又怕请求多,想这样行不行?或者有没有更好的方案,查询到放缓存里,缓存时间1分钟,优先查缓存,缓存查不到查数据库并会写缓存?
可行,但要加三道“保险”才能既保证 1 分钟级实时性,又不把数据库打崩。
分层缓存
• 本地缓存(Caffeine/Guava)放高频访问的“今日单量”“异常单量”等汇总值,TTL 30 秒。
• 分布式缓存(Redis)放需要跨节点共享的明细或 1 分钟级快照,TTL 60 秒。
两级缓存都命中失败时,才落到数据库。
异步刷新 + 预加载
另起一个定时线程,每 30 秒把最新汇总结果预写进 Redis,页面请求直接读缓存即可。这样即使 60 秒 TTL 未到,也能把“冷”查询提前变成“热”数据。
热点探测与降级
• 监控缓存命中率,低于 90 % 时自动把 TTL 缩短到 15 秒,并告警提示扩容。
• 数据库查询加熔断:连续 3 次超时就返回上一次缓存值,并后台异步重刷。
• 对“明细查询”这类低频请求,直接走数据库,不做缓存,避免缓存爆炸。
一句话总结:
“本地 + Redis”两级缓存 + 异步预加载,能把首页图表的 99 % 请求挡在数据库之外,同时保持 1 分钟级实时;再配熔断与降级,系统稳如老狗。
监控命中率低于90%就自动缩短TTL时间,为什么?为什么要告警扩容?
命中率 <90% 说明缓存里“查不到、得回源”的请求比例过高,继续沿用 60 秒 TTL 会导致:
短 TTL → 更快过期 → 更多请求直接打到数据库,雪崩风险陡增;长 TTL → 冷数据滞留 → 热点数据被挤出,命中率进一步下降。把 TTL 主动缩短到 15 秒,相当于“快进快出”:
让新数据更快进入缓存,提高“命中”概率;同时把数据库回源的冲击分散到更短的时间窗口,避免瞬间打满。告警扩容则是“治本”:
低命中率往往意味着热数据集已经超过当前缓存容量;只靠缩短 TTL 只是“把大水漫灌变成小水勤浇”,最终还是要加机器或调大缓存池才能根本解决。一句话:缩短 TTL 是“应急止血”,扩容才是“根治贫血”。
你所说的缓存命中率是指最新数据命中率?
不是。缓存命中率统计的是“所有被缓存过的数据”中,被成功读到的比例,并不区分“新”还是“旧”。
如果想单独衡量“最新数据”的命中情况,需要额外埋点:
这样就能知道“热点最新数据”的命中率,而不只是整体缓存的平均命中率。
命中率低于90%,这不是说明过期太快了吗?为什么还要缩短过期时间?
命中率低于 90 % 并不等于“过期太快”,真正的原因可能是:
缓存空间不足,热点数据被 LRU/LFU 淘汰;过期时间虽然长,但数据更新频率更高,缓存里的内容已经陈旧,业务侧宁愿回源也不用;查询模式分散,大量 Key 只被访问一次,缓存利用率低。此时把 TTL 从 60 秒缩到 15 秒,目的不是“让数据更快过期”,而是:
加快“冷数据”清理,把空间腾给真正高频的热点;降低缓存污染,避免大量低频或陈旧数据占着茅坑;用更短的窗口把“最新数据”快速刷进缓存,提高后续请求的命中概率。换句话说:
“缩短 TTL”是牺牲一点实时性,换取更高命中率的主动换血策略,而不是默认“当前 TTL 已经过期太快”。
读多的,什么时候实时查,什么时候上缓存,什么时候上定时任务提前清洗表,什么时候上数据仓库?
这是一个非常经典且关键的架构决策问题。对于“读多”的场景,选择 实时查、缓存、定时任务预计算、数据仓库,本质上是在 实时性、一致性、性能、成本、复杂度 之间做权衡。
下面用一张表 + 四个维度帮你清晰判断:
| 场景特征 | 实时查询(直连 DB) | 缓存(Redis/Caffeine) | 定时任务预计算(清洗中间表) | 数据仓库(DW/BI) |
|---|---|---|---|---|
| 数据更新频率 | 高频(秒级) | 中高频(秒~分钟) | 低频(分钟~小时) | 极低频(小时~天) |
| 实时性要求 | 强(<1s) | 中(秒~分钟) | 弱(分钟级可接受) | 弱(T+1 或 T+0.5) |
| 查询复杂度 | 简单(点查、简单聚合) | 简单(Key-Value 或汇总值) | 中等(多表 JOIN、窗口函数) | 复杂(多维分析、钻取、指标派生) |
| 并发压力 | 低 | 高 | 低(由后台任务承担) | 中低(通常走 BI 工具或异步报表) |
| 典型例子 | 查订单详情、库存余量 | 首页统计、用户画像快照 | 日报汇总、分仓出库趋势 | 销售分析、履约效率看板 |
💡 原则:操作类看实时,分析类看延迟
💡 缓存适合“热数据复用”,预计算适合“重逻辑轻查询”
💡 DW 不是为了“快”,而是为了“准 + 全 + 可分析”
| 需求 | 推荐方案 |
|---|---|
| 首页今日订单数、异常单量 | 两级缓存 + 异步预加载(如前文代码) |
| 查某订单当前在哪个环节 | 实时查 OMS/WMS 主库(带 ID 的点查) |
| 昨日各仓库出库量排名 | 定时任务每天凌晨汇总到 stats_daily 表 |
| 近 30 天履约时效趋势(按客户等级) | 同步到数据仓库,用 BI 工具展示 |
| 实时监控“当前未拣货订单积压” | Flink 流处理 or Redis SortedSet + TTL(特殊场景) |
实时查用于“操作”,缓存用于“热点聚合”,定时任务用于“轻量汇总”,数据仓库用于“深度分析”。四者不是互斥,而是分层协作。
根据你的具体指标类型、更新频率、用户容忍延迟,就能精准选择。如果提供一个具体场景(比如“我想展示每小时各仓拣货完成率”),我可以给你定制技术路径。
如果提供一个具体场景(比如“我想展示每小时各仓拣货完成率”),我可以给你定制技术路径。
太好了!我们就以你提到的这个典型场景为例:
“我想展示每小时各仓拣货完成率”
我们来一步步拆解需求,并给出端到端的技术路径,涵盖数据采集、计算、存储、查询、展示和容灾。
拣货完成率 = 已完成拣货单数 / 应拣货单数 × 100%
= SUM(已拣商品件数) / SUM(应拣商品件数)维度:
时间:按小时(如 2025-11-23 14:00 ~ 15:00)仓库:WH001, WH002, ...
实时性要求:近实时(延迟 ≤ 5 分钟)查询频率:高(运营/调度人员频繁查看)数据量级:每小时数万~百万级拣货任务
| 层级 | 技术方案 | 原因 |
|---|---|---|
| 1. 数据源 | WMS 拣货任务表(
picking_task)状态字段:
status IN ('CREATED', 'PICKING', 'COMPLETED') | 业务系统是唯一真相源 |
| 2. 实时计算 | Flink SQL 流处理 或 定时批处理(每5分钟) | 要求 5 分钟延迟 → 批处理足够,成本更低 |
| 3. 结果存储 | Redis SortedSet + Hash(用于实时看板) + MySQL 中间汇总表(用于历史追溯) | Redis 支撑高并发读,MySQL 保底可查 |
| 4. 查询接口 | Spring Boot 接口,优先读 Redis,降级读 MySQL | 快 + 稳 |
| 5. 可视化 | 前端 ECharts / AntV,每 30 秒轮询 | 用户体验流畅 |
✅ 结论:不用上数据仓库(DW),因为这是运营监控类指标,非分析型,且要求近实时。
sql
编辑
CREATE TABLE picking_completion_hourly (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
warehouse_code VARCHAR(20) NOT NULL,
hour_slot DATETIME NOT NULL COMMENT '格式: 2025-11-23 14:00:00',
total_tasks INT NOT NULL DEFAULT 0,
completed_tasks INT NOT NULL DEFAULT 0,
completion_rate DECIMAL(5,2) AS (completed_tasks * 100.0 / NULLIF(total_tasks, 0)) STORED,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_wh_hour (warehouse_code, hour_slot)
);
java
编辑
@Component
public class PickingCompletionAggregator {
@Autowired
private JdbcTemplate jdbcTemplate;
// 每5分钟执行,计算最近1小时的数据(覆盖当前小时)
@Scheduled(fixedDelay = 300_000) // 5分钟
public void aggregateHourlyCompletion() {
String sql = """
INSERT INTO picking_completion_hourly (warehouse_code, hour_slot, total_tasks, completed_tasks)
SELECT
warehouse_code,
DATE_FORMAT(create_time, '%Y-%m-%d %H:00:00') as hour_slot,
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) as completed_tasks
FROM picking_task
WHERE create_time >= DATE_SUB(NOW(), INTERVAL 2 HOUR) -- 防止漏算边界
AND create_time < DATE_FORMAT(NOW(), '%Y-%m-%d %H:00:00') + INTERVAL 1 HOUR
GROUP BY warehouse_code, hour_slot
ON DUPLICATE KEY UPDATE
total_tasks = VALUES(total_tasks),
completed_tasks = VALUES(completed_tasks),
updated_at = NOW()
""";
jdbcTemplate.update(sql);
}
}
💡 为什么查 2 小时?防止任务跨小时创建导致漏算。
在聚合完成后,把最新几小时的数据写入 Redis:
java
编辑
// 在 aggregateHourlyCompletion() 最后追加
String redisKey = "picking:completion:" + warehouseCode + ":" + hourSlotStr;
Map<String, String> data = Map.of(
"total", String.valueOf(total),
"completed", String.valueOf(completed),
"rate", String.format("%.2f", rate)
);
redisTemplate.opsForHash().putAll(redisKey, data);
redisTemplate.expire(redisKey, Duration.ofHours(25)); // 保留1天
// 同时维护一个 SortedSet 便于按时间范围查询
String zsetKey = "picking:completion:zset:" + warehouseCode;
zsetKey.add(hourSlotTimestamp, hourSlotStr); // score = timestamp
java
编辑
@GetMapping("/picking/completion")
public List<PickingCompletionVO> getCompletionRate(
@RequestParam String warehouseCode,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH") LocalDateTime startHour,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH") LocalDateTime endHour) {
List<PickingCompletionVO> result = new ArrayList<>();
for (LocalDateTime hour = startHour; !hour.isAfter(endHour); hour = hour.plusHours(1)) {
String key = "picking:completion:" + warehouseCode + ":" + hour.format(HOUR_FORMATTER);
// 1. 先查 Redis
Map<Object, Object> cache = redisTemplate.opsForHash().entries(key);
if (!cache.isEmpty()) {
result.add(buildVO(cache));
} else {
// 2. 降级查 MySQL
PickingCompletionVO dbVO = queryFromDb(warehouseCode, hour);
if (dbVO != null) result.add(dbVO);
}
}
return result;
}
js
编辑
// 前端每30秒请求:/api/picking/completion?warehouseCode=WH001&startHour=2025-11-23+14&endHour=2025-11-23+14
// 展示柱状图 + 数字卡片
| 风险 | 应对方案 |
|---|---|
| 聚合任务失败 | 加日志告警 + 下次执行自动补算(查2小时范围) |
| Redis 宕机 | 自动降级读 MySQL(虽然慢点,但可用) |
| 高峰期 DB 压力大 | 聚合任务错峰执行;WMS 表加索引
(warehouse_code, create_time, status) |
| 数据不准 | 提供“手动刷新”按钮,触发立即聚合 |
“每小时各仓拣货完成率”属于高频、近实时、轻分析的运营指标,采用「MySQL 定时聚合 + Redis 缓存 + 接口降级」三层架构,即可低成本、高可用地支撑,无需引入复杂流计算或数据仓库。
如果你有更多细节(比如是否要分拣货员、是否要排除异常单),我可以进一步细化逻辑。
什么上实时消费binlog 到ES?
这是一个非常关键的架构问题:什么时候应该用“实时消费 Binlog 同步到 Elasticsearch(ES)”?
我们先说结论,再展开场景、原理和替代方案。
当你需要将数据库中的业务数据,以秒级延迟、高一致性的方式,同步到 ES 用于复杂查询(如全文检索、多维聚合、模糊匹配)时,就该上 Binlog 实时同步。
WHERE province='广东' AND amount>1000 AND status='PAID' 没有合适索引 → 慢
定时同步?
延迟高,看不到最新订单
✅ 用 Binlog 实时写入 ES,利用其倒排索引 + 列存加速多维过滤
UPDATE status = 'CANCELLED',秒级更新 ES
| 场景 | 为什么不推荐 Binlog → ES |
|---|---|
| 纯报表类指标(如日销总额) | 用定时聚合 + MySQL 或数仓更简单 |
| 数据只读不搜(如配置表) | 直接查 DB 即可,ES 是过度设计 |
| 写多读少 or 无查询需求 | 同步了也用不上,浪费资源 |
| 对一致性要求极低(如埋点日志) | 用 MQ 异步批量写更划算 |
| 方案 | 工具 | 特点 |
|---|---|---|
| 1. Canal + 自研消费者 | Alibaba Canal | 轻量,需自己写 ES 写入逻辑 |
| 2. Flink CDC + Flink Job | Flink SQL | 支持 exactly-once,可做 ETL 转换 |
| 3. Debezium + Kafka + Logstash | Debezium (Kafka Connect) | 生态成熟,适合复杂管道 |
| 4. Elasticsearch JDBC River(已废弃) | ❌ 不要用 | 官方已弃用,性能差 |
✅ 推荐:Flink CDC(新项目)或 Canal(轻量老系统)
text
编辑
MySQL (goods表)
│
↓ Binlog
Flink CDC Job
│ (可做字段清洗、join category 表)
↓
Elasticsearch (goods_index)
│
↓
Search API → 返回给前端
sql
编辑
-- 读取 MySQL Binlog
CREATE TABLE goods (
id BIGINT,
name STRING,
price DECIMAL(10,2),
stock INT,
status STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'user',
'password' = 'pwd',
'database-name' = 'gts',
'table-name' = 'system_item'
);
-- 写入 ES
INSERT INTO es_goods_index
SELECT id, name, price, stock, status
FROM goods
WHERE status = 'ON_SHELF'; -- 下架商品不索引
delete by id
避免 ES 写入风暴
加 buffer、批量写(bulk)、控制并发
保证最终一致性
ES 本质是最终一致,重要操作(如下单)仍要回查 DB
监控同步延迟
用 Prometheus 监控 Flink/Canal 的 lag
问自己以下问题,如果 ≥3 个回答 “是”,就该上 Binlog → ES:
是否需要 全文检索 or 模糊匹配? 是否要求 变更后 10 秒内可见? 查询条件是否 多维、动态、无法靠 DB 索引覆盖? 数据是否来自 核心业务表(订单/商品/用户)? 是否已有 ES 集群 or 愿意运维?如果答案多为“否”,请优先考虑:
缓存(Redis)定时聚合(MySQL 中间表)读写分离(DB 从库扛查询)需要我针对你的 SystemItemImport 商品表 设计一个 Binlog → ES 的同步方案吗?比如支持“按 SKU、名称、条码、客户编码联合搜索”?