RocksDB 是一个高性能的嵌入式键值存储引擎,基于 Facebook 开源的 LevelDB 进行了大量的改进和优化。它专门为快速存储环境(如闪存和内存)设计,提供了优秀的读写性能和高效的存储空间利用率。
作为一个嵌入式的数据库,RocksDB 以内存映射文件的方式直接在应用程序进程中运行,避免了传统客户端-服务器架构的网络传输开销。其核心特性包括:
高性能:针对 SSD 和内存进行了专门优化可配置性强:提供了丰富的配置选项以适应不同场景持久化存储:数据持久化保存,支持崩溃恢复ACID 特性:支持原子性操作和一致性保证RocksDB 的发展历程体现了现代存储引擎的演进轨迹:
起源阶段:最初源于 Google 的 LevelDB,Facebook 团队在此基础上进行了大量改进,以满足其大规模分布式系统的存储需求。
快速发展阶段:随着 Facebook 业务的快速增长,RocksDB 在性能、稳定性和功能方面持续优化,逐渐成为业界领先的嵌入式存储引擎。
开源生态阶段:作为开源项目,RocksDB 获得了广泛的社区支持,被众多知名项目采用,如 Apache Kafka、CockroachDB、TiDB 等。
多样化应用阶段:从最初的消息队列存储,扩展到数据库存储引擎、缓存系统、时序数据库等多种应用场景。
选择 RocksDB 作为存储解决方案的原因主要包括:
卓越的写入性能:基于 LSM-Tree 结构,写入操作非常高效,特别适合写多读少的场景。
优秀的读取性能:通过精心设计的缓存机制和索引结构,读取性能同样出色。
灵活的配置选项:可以根据具体应用场景调整各种参数,达到最优性能表现。
成熟稳定的生态:经过多年大规模生产环境验证,稳定性和可靠性得到充分证明。
活跃的开源社区:持续的功能更新和问题修复,保证了技术的先进性。
RocksDB 在多个领域都有广泛的应用:
消息队列存储:Apache Kafka 使用 RocksDB 作为其底层存储引擎,处理海量消息数据。
分布式数据库:作为存储引擎支撑 CockroachDB、TiDB 等分布式数据库的底层数据存储。
缓存系统:在需要持久化特性的缓存场景中表现出色。
时序数据存储:适合存储监控指标、日志等时序数据。
区块链应用:以太坊等区块链项目使用 RocksDB 存储区块数据。
LSM-Tree(Log-Structured Merge-Tree)是 RocksDB 的核心数据结构,它通过将随机写转换为顺序写来大幅提升写入性能。
LSM-Tree 的工作原理:
写入优化:所有写操作首先写入内存中的
MemTable,避免磁盘随机 I/O批量刷盘:当
MemTable 达到一定大小后,批量顺序写入磁盘形成
SSTable分层合并:通过后台的
Compaction 过程,将多个层级的
SSTable 合并优化
这种结构的优势在于写入性能极高,但代价是读取时可能需要查询多个层级的数据。
MemTable与
SSTable这两个核心组件构成了 RocksDB 的存储层次:
MemTable(内存表):
Immutable MemTable
SSTable(有序字符串表):
Compaction 过程进行层级管理和优化
WAL(Write-Ahead Log)机制
WAL 是保证数据持久性和一致性的关键机制:
当写入请求到达时,RocksDB 会按以下顺序处理:
将写操作记录到
WAL 中更新内存中的
MemTable返回写入成功响应
这种机制确保了即使在系统崩溃的情况下,也可以通过重放
WAL 来恢复未持久化的数据,从而保证数据不丢失。
Compaction策略详解
Compaction 是 RocksDB 后台最重要的维护操作,负责:
主要的
Compaction 策略包括:
SSTable 合并Leveled Compaction:按层级组织数据,逐层向上合并
RocksDB 的架构设计体现了分层和模块化的特点:
应用程序层
↓
API接口层 (Java/C++/Python等)
↓
核心引擎层 (DBImpl, VersionSet, Compaction等)
↓
存储管理层 (MemTable, SSTFile, WAL等)
↓
操作系统层 (文件系统, 内存管理等)
各组件协同工作,提供了高性能、高可靠性的键值存储服务。
在开始使用 RocksDB 之前,需要准备好相应的开发环境:
系统要求:
Linux/Unix/macOS/Windows 等主流操作系统C++11 或更高版本编译器至少 4GB 内存(推荐 8GB 以上)依赖库安装:
# Ubuntu/Debian
sudo apt-get install build-essential libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev
# CentOS/RHEL
sudo yum install gcc gcc-c++ gflags-devel snappy-devel zlib-devel bzip2-devel lz4-devel zstd-devel
从源码编译:
git clone https://github.com/facebook/rocksdb.git
cd rocksdb
make shared_lib -j4
sudo make install-shared
RocksDBJniDB)Java 应用可以通过 JNI 方式使用 RocksDB:
Maven 依赖配置:
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>7.10.2</version>
</dependency>
基本初始化:
import org.rocksdb.*;
public class RocksDBExample {
static {
RocksDB.loadLibrary();
}
public static void main(String[] args) throws RocksDBException {
Options options = new Options().setCreateIfMissing(true);
RocksDB db = RocksDB.open(options, "/tmp/rocksdb_example");
// 使用数据库...
db.close();
options.close();
}
}
掌握了基本环境配置后,可以开始进行简单的读写操作:
import org.rocksdb.*;
public class BasicOperations {
public static void main(String[] args) throws RocksDBException {
RocksDB.loadLibrary();
// 数据库配置
Options options = new Options().setCreateIfMissing(true);
RocksDB db = RocksDB.open(options, "/tmp/test_db");
try {
// 写入数据
db.put("key1".getBytes(), "value1".getBytes());
db.put("key2".getBytes(), "value2".getBytes());
// 读取数据
byte[] value1 = db.get("key1".getBytes());
System.out.println("key1: " + new String(value1)); // 输出: value1
// 删除数据
db.delete("key1".getBytes());
// 验证删除
byte[] deletedValue = db.get("key1".getBytes());
System.out.println("deleted key1: " + (deletedValue == null ? "null" : new String(deletedValue)));
} finally {
db.close();
options.close();
}
}
}
RocksDB 提供了丰富的配置选项来优化性能:
基础配置:
Options options = new Options()
.setCreateIfMissing(true) // 如果数据库不存在则创建
.setErrorIfExists(false) // 如果数据库存在则报错
.setIncreaseParallelism(4) // 设置并行度
.setAllowConcurrentMemtableWrite(true); // 允许并发写入MemTable
内存相关配置:
options.setWriteBufferSize(64 * 1024 * 1024) // MemTable大小: 64MB
.setMaxWriteBufferNumber(3) // 最大MemTable数量
.setMinWriteBufferNumberToMerge(1); // 触发合并的最小MemTable数
压缩配置:
options.setCompressionType(CompressionType.SNAPPY_COMPRESSION)
.setCompactionStyle(CompactionStyle.LEVEL);
通过基准测试可以了解 RocksDB 的基本性能表现:
import org.rocksdb.*;
import java.util.concurrent.ThreadLocalRandom;
public class PerformanceTest {
public static void main(String[] args) throws RocksDBException {
RocksDB.loadLibrary();
Options options = new Options()
.setCreateIfMissing(true)
.setWriteBufferSize(64 << 20) // 64MB
.setTargetFileSizeBase(64 << 20)
.setLevelZeroFileNumCompactionTrigger(4);
RocksDB db = RocksDB.open(options, "/tmp/performance_test");
try {
// 写入性能测试
long startTime = System.currentTimeMillis();
WriteOptions writeOpts = new WriteOptions();
for (int i = 0; i < 100000; i++) {
String key = "key_" + i;
String value = "value_" + ThreadLocalRandom.current().nextLong();
db.put(writeOpts, key.getBytes(), value.getBytes());
}
long writeTime = System.currentTimeMillis() - startTime;
System.out.println("写入10万条数据耗时: " + writeTime + "ms");
// 读取性能测试
startTime = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
int randomKey = ThreadLocalRandom.current().nextInt(100000);
db.get(("key_" + randomKey).getBytes());
}
long readTime = System.currentTimeMillis() - startTime;
System.out.println("随机读取1万次耗时: " + readTime + "ms");
} finally {
db.close();
options.close();
}
}
}
为了提高操作效率,RocksDB 提供了批量操作和事务支持:
批量写入:
import org.rocksdb.*;
public class BatchOperations {
public static void batchWriteExample(RocksDB db) throws RocksDBException {
WriteBatch batch = new WriteBatch();
WriteOptions writeOpts = new WriteOptions();
try {
// 添加多个操作到批处理中
batch.put("batch_key1".getBytes(), "batch_value1".getBytes());
batch.put("batch_key2".getBytes(), "batch_value2".getBytes());
batch.delete("old_key".getBytes());
batch.merge("merge_key".getBytes(), "merged_data".getBytes());
// 一次性执行所有操作
db.write(writeOpts, batch);
System.out.println("批量操作执行完成");
} finally {
batch.close();
writeOpts.close();
}
}
}
事务支持:
import org.rocksdb.*;
public class TransactionExample {
public static void transactionExample() throws RocksDBException {
RocksDB.loadLibrary();
// 创建支持事务的数据库选项
TransactionDBOptions txnDbOptions = new TransactionDBOptions();
Options options = new Options().setCreateIfMissing(true);
WriteOptions writeOptions = new WriteOptions();
TransactionDB txnDb = TransactionDB.open(options, txnDbOptions, "/tmp/txn_db");
try {
// 开始事务
Transaction txn = txnDb.beginTransaction(writeOptions);
try {
// 在事务中执行操作
txn.put("txn_key1".getBytes(), "txn_value1".getBytes());
txn.put("txn_key2".getBytes(), "txn_value2".getBytes());
// 提交事务
txn.commit();
System.out.println("事务提交成功");
} catch (RocksDBException e) {
// 回滚事务
txn.rollback();
System.out.println("事务回滚: " + e.getMessage());
} finally {
txn.close();
}
} finally {
txnDb.close();
txnDbOptions.close();
options.close();
writeOptions.close();
}
}
}
RocksDB 提供了强大的迭代器功能,支持高效的范围查询:
import org.rocksdb.*;
public class IteratorExample {
public static void iteratorUsage(RocksDB db) throws RocksDBException {
// 创建迭代器
RocksIterator iterator = db.newIterator();
try {
// 正向遍历所有数据
System.out.println("正向遍历:");
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
String key = new String(iterator.key());
String value = new String(iterator.value());
System.out.println(key + " => " + value);
}
// 反向遍历
System.out.println("
反向遍历:");
for (iterator.seekToLast(); iterator.isValid(); iterator.prev()) {
String key = new String(iterator.key());
String value = new String(iterator.value());
System.out.println(key + " => " + value);
}
// 范围查询
System.out.println("
范围查询 (key2 ~ key5):");
for (iterator.seek("key2".getBytes());
iterator.isValid() && compareKeys(iterator.key(), "key5".getBytes()) <= 0;
iterator.next()) {
String key = new String(iterator.key());
String value = new String(iterator.value());
System.out.println(key + " => " + value);
}
} finally {
iterator.close();
}
}
private static int compareKeys(byte[] key1, byte[] key2) {
return new String(key1).compareTo(new String(key2));
}
}
为了优化特定场景的查询性能,RocksDB 提供了前缀迭代和布隆过滤器功能:
前缀迭代:
import org.rocksdb.*;
public class PrefixIteratorExample {
public static void prefixIterator(RocksDB db) throws RocksDBException {
ReadOptions readOptions = new ReadOptions();
// 设置前缀_same_as_start为true,启用前缀迭代
readOptions.setPrefixSameAsStart(true);
RocksIterator iterator = db.newIterator(readOptions);
try {
// 查找以"user_"为前缀的所有键
byte[] prefix = "user_".getBytes();
for (iterator.seek(prefix);
iterator.isValid() && hasPrefix(iterator.key(), prefix);
iterator.next()) {
String key = new String(iterator.key());
String value = new String(iterator.value());
System.out.println(key + " => " + value);
}
} finally {
iterator.close();
readOptions.close();
}
}
private static boolean hasPrefix(byte[] key, byte[] prefix) {
if (key.length < prefix.length) return false;
for (int i = 0; i < prefix.length; i++) {
if (key[i] != prefix[i]) return false;
}
return true;
}
}
布隆过滤器配置:
import org.rocksdb.*;
public class BloomFilterExample {
public static Options configureBloomFilter() {
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
// 设置布隆过滤器,减少不必要的磁盘I/O
tableConfig.setFilterPolicy(new BloomFilter(10, false));
Options options = new Options()
.setCreateIfMissing(true)
.setTableFormatConfig(tableConfig);
return options;
}
}
Column Family)管理列族是 RocksDB 中实现逻辑分离的重要特性:
import org.rocksdb.*;
import java.util.ArrayList;
import java.util.List;
public class ColumnFamilyExample {
public static void columnFamilyUsage() throws RocksDBException {
RocksDB.loadLibrary();
// 数据库选项
Options options = new Options().setCreateIfMissing(true);
// 列族选项
ColumnFamilyOptions cfOpts = new ColumnFamilyOptions();
// 默认列族描述符
List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts));
// 自定义列族描述符
cfDescriptors.add(new ColumnFamilyDescriptor("user_data".getBytes(), cfOpts));
cfDescriptors.add(new ColumnFamilyDescriptor("order_data".getBytes(), cfOpts));
// 打开数据库和列族
List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
DBOptions dbOptions = new DBOptions().setCreateIfMissing(true);
RocksDB db = RocksDB.open(dbOptions, "/tmp/cf_db", cfDescriptors, cfHandles);
try {
ColumnFamilyHandle userDataCF = cfHandles.get(1);
ColumnFamilyHandle orderDataCF = cfHandles.get(2);
// 在不同列族中存储数据
db.put(userDataCF, "user_001".getBytes(), "Alice".getBytes());
db.put(orderDataCF, "order_001".getBytes(), "Order details".getBytes());
// 从特定列族读取数据
byte[] user = db.get(userDataCF, "user_001".getBytes());
byte[] order = db.get(orderDataCF, "order_001".getBytes());
System.out.println("User: " + new String(user));
System.out.println("Order: " + new String(order));
// 创建新的列族
ColumnFamilyHandle newCF = db.createColumnFamily(
new ColumnFamilyDescriptor("new_data".getBytes(), cfOpts));
cfHandles.add(newCF);
} finally {
// 关闭所有列族句柄
for (ColumnFamilyHandle cfHandle : cfHandles) {
cfHandle.close();
}
db.close();
options.close();
cfOpts.close();
dbOptions.close();
}
}
}
Snapshot)机制快照机制提供了时间点一致性读取的能力:
import org.rocksdb.*;
public class SnapshotExample {
public static void snapshotUsage(RocksDB db) throws RocksDBException {
// 创建快照
Snapshot snapshot = db.getSnapshot();
ReadOptions readOptions = new ReadOptions();
readOptions.setSnapshot(snapshot);
try {
// 在快照时间点读取数据
byte[] valueBeforeChange = db.get(readOptions, "key1".getBytes());
System.out.println("快照读取值: " +
(valueBeforeChange != null ? new String(valueBeforeChange) : "null"));
// 修改数据
db.put("key1".getBytes(), "modified_value".getBytes());
// 快照读取仍能看到旧值
byte[] valueAfterChange = db.get(readOptions, "key1".getBytes());
System.out.println("快照读取值(修改后): " +
(valueAfterChange != null ? new String(valueAfterChange) : "null"));
// 直接读取可以看到新值
byte[] currentValue = db.get("key1".getBytes());
System.out.println("当前值: " +
(currentValue != null ? new String(currentValue) : "null"));
} finally {
// 释放快照资源
db.releaseSnapshot(snapshot);
readOptions.close();
}
}
}
合理的内存配置是发挥 RocksDB 性能的关键:
MemTable 相关配置:
Options options = new Options()
// 设置 MemTable 大小,默认 64MB
.setWriteBufferSize(128 * 1024 * 1024) // 128MB
// 设置最大 MemTable 数量,默认 2
.setMaxWriteBufferNumber(6)
// 设置触发 flush 的最小 MemTable 数量,默认 1
.setMinWriteBufferNumberToMerge(2)
// 设置 MemTable 内存预算
.setArenaBlockSize(16 * 1024 * 1024); // 16MB
缓存配置:
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
// 设置块缓存大小
tableConfig.setBlockCache(new LRUCache(1024 * 1024 * 1024)) // 1GB
// 设置块大小
.setBlockSize(32 * 1024) // 32KB
// 设置布隆过滤器
.setFilterPolicy(new BloomFilter(10, false));
options.setTableFormatConfig(tableConfig);
Compaction参数调优
Compaction 策略直接影响读写性能和存储效率:
层级压缩配置:
Options options = new Options()
// 设置目标文件大小
.setTargetFileSizeBase(64 * 1024 * 1024) // 64MB
// 设置文件大小倍数
.setTargetFileSizeMultiplier(1)
// 设置最大后台 compaction 线程数
.setMaxBackgroundCompactions(4)
// 设置最大后台刷新线程数
.setMaxBackgroundFlushes(2)
// 设置 L0 层触发 compaction 的文件数
.setLevel0FileNumCompactionTrigger(4)
// 设置 L0 层停止写入的文件数
.setLevel0StopWritesTrigger(36)
// 设置 L0 层触发延迟写的文件数
.setLevel0SlowdownWritesTrigger(20);
压缩算法选择:
// 根据数据特性和性能要求选择压缩算法
options.setCompressionType(CompressionType.LZ4_COMPRESSION) // 默认推荐
// 或者为不同层级设置不同压缩算法
.setCompressionPerLevel(Arrays.asList(
CompressionType.NO_COMPRESSION, // L0 不压缩
CompressionType.LZ4_COMPRESSION, // L1 使用 LZ4
CompressionType.ZSTD_COMPRESSION // L2+ 使用 ZSTD
));
通过合理的配置和使用方式可以显著提升性能:
写入优化:
public class WriteOptimization {
public static void optimizedWrite(RocksDB db) throws RocksDBException {
// 批量写入
WriteBatch batch = new WriteBatch();
WriteOptions writeOpts = new WriteOptions()
.setDisableWAL(true) // 禁用 WAL 提升写入性能(有数据丢失风险)
.setSync(false); // 异步写入
try {
// 批量操作
for (int i = 0; i < 1000; i++) {
batch.put(("key_" + i).getBytes(), ("value_" + i).getBytes());
}
db.write(writeOpts, batch);
} finally {
batch.close();
writeOpts.close();
}
}
}
读取优化:
public class ReadOptimization {
public static void optimizedRead(RocksDB db) throws RocksDBException {
ReadOptions readOpts = new ReadOptions()
.setVerifyChecksums(false) // 跳过校验和验证
.setFillCache(true); // 填充块缓存
try {
// 使用迭代器进行批量读取
RocksIterator iterator = db.newIterator(readOpts);
try {
iterator.seekToFirst();
while (iterator.isValid()) {
// 处理数据
iterator.next();
}
} finally {
iterator.close();
}
} finally {
readOpts.close();
}
}
}
充分利用多核处理器提升并发性能:
public class ConcurrencyOptimization {
// 并发读取优化
public static void concurrentReads(RocksDB db) {
ExecutorService executor = Executors.newFixedThreadPool(8);
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
final int taskId = i;
Future<String> future = executor.submit(() -> {
try {
byte[] value = db.get(("key_" + taskId).getBytes());
return value != null ? new String(value) : null;
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
});
futures.add(future);
}
// 收集结果
futures.forEach(future -> {
try {
String result = future.get();
// 处理结果
} catch (Exception e) {
e.printStackTrace();
}
});
executor.shutdown();
}
// 并发写入优化
public static void concurrentWrites(RocksDB db) {
// 启用并发 MemTable 写入
Options options = new Options()
.setAllowConcurrentMemtableWrite(true)
.setEnableWriteThreadAdaptiveyield(true);
}
}
有效的监控和诊断是性能优化的基础:
内置统计信息:
public class PerformanceMonitoring {
public static Options enableStatistics() {
Statistics statistics = new Statistics();
Options options = new Options()
.setCreateIfMissing(true)
.setStatistics(statistics)
.setStatsDumpPeriodSec(60); // 每分钟输出统计信息
return options;
}
public static void printStatistics(Statistics statistics) {
System.out.println("累计写入次数: " + statistics.getTickerCount(TickerType.NUMBER_KEYS_WRITTEN));
System.out.println("累计读取次数: " + statistics.getTickerCount(TickerType.NUMBER_KEYS_READ));
System.out.println("累计压缩次数: " + statistics.getTickerCount(TickerType.COMPACT_WRITE_BYTES));
System.out.println("块缓存命中率: " + statistics.getTickerCount(TickerType.BLOCK_CACHE_HIT) * 100.0 /
(statistics.getTickerCount(TickerType.BLOCK_CACHE_HIT) +
statistics.getTickerCount(TickerType.BLOCK_CACHE_MISS)) + "%");
}
}
RocksDB 可以作为持久化缓存层,提供比内存缓存更大的容量:
public class PersistentCache {
private final RocksDB db;
private final Options options;
public PersistentCache(String path) throws RocksDBException {
RocksDB.loadLibrary();
this.options = new Options()
.setCreateIfMissing(true)
.setWriteBufferSize(256 * 1024 * 1024) // 256MB MemTable
.setTargetFileSizeBase(64 * 1024 * 1024) // 64MB SST文件
.setLevel0FileNumCompactionTrigger(2)
.setCompressionType(CompressionType.LZ4_COMPRESSION);
this.db = RocksDB.open(options, path);
}
public void put(String key, String value, long ttlSeconds) throws RocksDBException {
long expireTime = System.currentTimeMillis() + ttlSeconds * 1000;
String valueWithExpire = value + "|" + expireTime;
db.put(key.getBytes(), valueWithExpire.getBytes());
}
public String get(String key) throws RocksDBException {
byte[] data = db.get(key.getBytes());
if (data == null) return null;
String[] parts = new String(data).split("|");
if (parts.length != 2) return null;
long expireTime = Long.parseLong(parts[1]);
if (System.currentTimeMillis() > expireTime) {
db.delete(key.getBytes());
return null;
}
return parts[0];
}
public void close() {
if (db != null) db.close();
if (options != null) options.close();
}
}
RocksDB 可以为消息队列提供高性能的持久化存储:
public class MessageQueueStorage {
private final RocksDB db;
private final AtomicLong messageIdCounter;
public MessageQueueStorage(String path) throws RocksDBException {
RocksDB.loadLibrary();
Options options = new Options()
.setCreateIfMissing(true)
.setWriteBufferSize(128 * 1024 * 1024)
.setCompressionType(CompressionType.NO_COMPRESSION) // 消息数据通常已压缩
.setAllowConcurrentMemtableWrite(true);
this.db = RocksDB.open(options, path);
this.messageIdCounter = new AtomicLong(loadMaxMessageId());
}
public long storeMessage(String topic, byte[] messageData) throws RocksDBException {
long messageId = messageIdCounter.incrementAndGet();
String key = topic + ":" + String.format("%012d", messageId);
db.put(key.getBytes(), messageData);
return messageId;
}
public byte[] getMessage(String topic, long messageId) throws RocksDBException {
String key = topic + ":" + String.format("%012d", messageId);
return db.get(key.getBytes());
}
public List<byte[]> getMessages(String topic, long startId, int count) throws RocksDBException {
List<byte[]> messages = new ArrayList<>();
String startKey = topic + ":" + String.format("%012d", startId);
RocksIterator iterator = db.newIterator();
try {
iterator.seek(startKey.getBytes());
int retrieved = 0;
while (iterator.isValid() && retrieved < count) {
String key = new String(iterator.key());
if (!key.startsWith(topic + ":")) break;
messages.add(iterator.value());
iterator.next();
retrieved++;
}
} finally {
iterator.close();
}
return messages;
}
private long loadMaxMessageId() {
// 加载当前最大的消息ID
RocksIterator iterator = db.newIterator();
try {
iterator.seekToLast();
if (iterator.isValid()) {
String key = new String(iterator.key());
String[] parts = key.split(":");
if (parts.length == 2) {
return Long.parseLong(parts[1]);
}
}
} catch (Exception e) {
// 忽略错误,返回默认值
} finally {
iterator.close();
}
return 0;
}
}
RocksDB 非常适合存储时间序列数据:
public class TimeSeriesDatabase {
private final RocksDB db;
public TimeSeriesDatabase(String path) throws RocksDBException {
RocksDB.loadLibrary();
Options options = new Options()
.setCreateIfMissing(true)
.setPrefixExtractor(new FixedPrefixTransform(10)) // 按metric名称前缀分区
.setMemTablePrefixBloomSizeRatio(0.1)
.setTableFormatConfig(new BlockBasedTableConfig()
.setFilterPolicy(new BloomFilter(10, false)));
this.db = RocksDB.open(options, path);
}
public void writeMetric(String metricName, long timestamp, double value) throws RocksDBException {
// Key format: metric_name:timestamp
String key = metricName + ":" + String.format("%013d", timestamp);
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putDouble(value);
db.put(key.getBytes(), buffer.array());
}
public List<MetricPoint> queryRange(String metricName, long startTime, long endTime) throws RocksDBException {
List<MetricPoint> points = new ArrayList<>();
String startKey = metricName + ":" + String.format("%013d", startTime);
String endKey = metricName + ":" + String.format("%013d", endTime);
RocksIterator iterator = db.newIterator();
try {
iterator.seek(startKey.getBytes());
while (iterator.isValid()) {
String key = new String(iterator.key());
if (!key.startsWith(metricName + ":") || key.compareTo(endKey) > 0) {
break;
}
String[] parts = key.split(":");
long timestamp = Long.parseLong(parts[1]);
ByteBuffer buffer = ByteBuffer.wrap(iterator.value());
double value = buffer.getDouble();
points.add(new MetricPoint(timestamp, value));
iterator.next();
}
} finally {
iterator.close();
}
return points;
}
public static class MetricPoint {
private final long timestamp;
private final double value;
public MetricPoint(long timestamp, double value) {
this.timestamp = timestamp;
this.value = value;
}
// getters...
}
}
在分布式系统中,RocksDB 可以作为本地存储组件:
public class DistributedStorageNode {
private final RocksDB localDB;
private final ConsistentHashRing hashRing;
private final NetworkClient networkClient;
public DistributedStorageNode(String nodeId, String dbPath) throws RocksDBException {
RocksDB.loadLibrary();
Options options = new Options()
.setCreateIfMissing(true)
.setCompressionType(CompressionType.LZ4_COMPRESSION)
.setLevelCompactionDynamicLevelBytes(true);
this.localDB = RocksDB.open(options, dbPath);
this.hashRing = new ConsistentHashRing();
this.networkClient = new NetworkClient();
}
public void put(String key, byte[] value) throws RocksDBException {
String responsibleNode = hashRing.getNodeForKey(key);
if (responsibleNode.equals(getCurrentNodeId())) {
// 本地存储
localDB.put(key.getBytes(), value);
replicateToPeers(key, value);
} else {
// 转发到负责节点
networkClient.forwardPut(responsibleNode, key, value);
}
}
public byte[] get(String key) throws RocksDBException {
String responsibleNode = hashRing.getNodeForKey(key);
if (responsibleNode.equals(getCurrentNodeId())) {
// 本地读取
return localDB.get(key.getBytes());
} else {
// 从负责节点获取
return networkClient.forwardGet(responsibleNode, key);
}
}
private void replicateToPeers(String key, byte[] value) {
// 异步复制到备份节点
List<String> backupNodes = hashRing.getBackupNodes(key, 2);
for (String node : backupNodes) {
networkClient.asyncReplicate(node, key, value);
}
}
private String getCurrentNodeId() {
// 返回当前节点ID
return "node-1";
}
}
在微服务架构中,RocksDB 可以作为轻量级的本地存储:
@Component
public class LocalDataService {
private RocksDB db;
private Options options;
@PostConstruct
public void init() throws RocksDBException {
RocksDB.loadLibrary();
this.options = new Options()
.setCreateIfMissing(true)
.setWriteBufferSize(64 * 1024 * 1024)
.setCompressionType(CompressionType.SNAPPY_COMPRESSION);
String dbPath = System.getProperty("user.home") + "/.myapp/data";
this.db = RocksDB.open(options, dbPath);
}
public void cacheUserData(String userId, UserData userData) throws RocksDBException {
String key = "user:" + userId;
String value = serializeUserData(userData);
db.put(key.getBytes(), value.getBytes());
}
public UserData getCachedUserData(String userId) throws RocksDBException {
String key = "user:" + userId;
byte[] data = db.get(key.getBytes());
return data != null ? deserializeUserData(new String(data)) : null;
}
public List<UserSession> getActiveSessions(String userId) throws RocksDBException {
List<UserSession> sessions = new ArrayList<>();
String prefix = "session:" + userId + ":";
RocksIterator iterator = db.newIterator();
try {
iterator.seek(prefix.getBytes());
while (iterator.isValid() &&
new String(iterator.key()).startsWith(prefix)) {
UserSession session = deserializeSession(new String(iterator.value()));
if (session.isActive()) {
sessions.add(session);
}
iterator.next();
}
} finally {
iterator.close();
}
return sessions;
}
private String serializeUserData(UserData userData) {
// 实现序列化逻辑
return userData.toString();
}
private UserData deserializeUserData(String data) {
// 实现反序列化逻辑
return new UserData(data);
}
private UserSession deserializeSession(String data) {
// 实现会话反序列化
return new UserSession(data);
}
@PreDestroy
public void cleanup() {
if (db != null) db.close();
if (options != null) options.close();
}
}