最近有一个项目, 其中某个功能单表数据在可预估的未来达到了亿级, 初步估算在 90 亿左右. 与同事详细讨论后, 决定采用一致性 Hash 算法来完成数据库的自动扩容和数据迁移. 整个程序细节由我同事完成, 我只是将其理解并成文, 供有相同问题的同行参考.
参看此文的兄弟, 默认各位已经熟悉一致性 hash 算法了. 此文仅仅阐述代码细节, 实现语言为 Java.
项目背景
项目是一个实验室项目
其中有一个表叫做试验表, 用于存储车型的试验数据, 每个试验大概有 6000 条数据
总计初期约有 2 万个车型, 每个车型初期包含超过 50 个试验. 后期还会动态增长
试验表中的数据仅需要根据车型试验 ID 能取出来即可, 没有其他更复杂的业务逻辑
方案决策
项目正式上线初期, 数据量不会直接爆发式增长到 90 亿, 需要时间上的积累 (逐步做实验), 最终可能达到 90 亿数据, 甚至超过 90 亿数据.
按照我们实际了解情况, oracle 存储数据量达到 1 千万的时候, 性能擅可. 而 Oracle 官方的说法, 如单表存储 1g 有分区 (大致 500 万数据), 查询效率非常高. 而试验表中仅四个字段, 每条数据数据量较小. 所以我们最终决定以 1000 万为节点, 水平拆表. 当表数据达到 1 千万时, 即增加下一波表. 进行数据自动迁移.
按照 90 亿的总量, 1000 万数据一个表的划分, 最终大致会产生 900 个左右的表. 所以我们最终使用了 4 个数据库. 1 个存储其他业务模块的表, 3 个存储此大数据表. 每个数据库大致有 300 张表. 性能上和数量上都可达到我们的要求.
相关表结构
试验信息表 (EXPERIMENT_MESSAGE), 挂接车型和试验的关系. 试验数据表 (EXPERIMENT_DATA), 存储试验数据
试验信息表:
字段 | 含义 |
---|---|
ID | 主键, 采用 UUID 生成 |
EXPERIMENT_ID | 试验表中的 ID |
CAR_ID | 车型表中的 ID |
... | 其余数十个字段省略 |
试验数据表:
字段 | 含义 |
---|---|
ID | 主键, 采用 UUID 生成 |
EXPERIMENT_MESSAGE_ID | 对应的实验信息 id |
X_VALUE | 试验数据 X 值 |
Y_VALUE | 试验数据 Y 值 |
我们采用作一致性 hash 的 key, 就是试验数据表中的 EXPERIMENT_MESSAGE_ID 字段. 也就是说, 每个试验数据表, 不存则以, 存则一次性大致有 6000 条数据. 取同理.
一致性 Hash 算法实现
一致性 Hash 算法的 hash 部分, 采用了著名的 ketama 算法. 在此, 我们不多讨论 ketama 算法的细节, 若各位有兴趣, 请查阅 ketama 算法 https://github.com/RJ/ketama
- public long hash(String key) {
- if (md5 == null) {
- try {
- md5 = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- throw new IllegalStateException("no md5 algorythm found");
- }
- }
- md5.reset();
- md5.update(key.getBytes());
- byte[] bKey = md5.digest();
- long res = ((long) (bKey[3] & 0xFF) <<24) |
- ((long) (bKey[2] & 0xFF) << 16) |
- ((long) (bKey[1] & 0xFF) << 8) |
- (long) (bKey[0] & 0xFF);
- return res & 0xffffffffL;
- }
有了 Hash 的算法, 接下来就要构造 Hash 环了. Hash 环采用的 SortedMap 数据结构实现.
private final SortedMap<Long, T> circle = new TreeMap<Long, T>();
其中添加节点和移除节点部分, 需要根据 hash 算法得到节点在环上的位置, 具体代码如下:
- /**
- * 添加虚拟节点
- * numberOfReplicas 为虚拟节点的数量, 初始化 hash 环的时候传入, 我们使用 300 个虚拟节点
- * @param node
- */
- public void add(T node) {
- for (int i = 0; i <numberOfReplicas; i++) {
- circle.put(hashFunction.hash(node.toString() + i), node);
- }
- }
- /**
- * 移除节点
- * @param node
- */
- public void remove(T node) {
- for (int i = 0; i < numberOfReplicas; i++) {
- circle.remove(hashFunction.hash(node.toString() + i));
- }
- }
而 hash 环中得到节点部分比较特殊, 根据一致性 hash 算法的介绍, 得到 hash 环中的节点, 实际上是计算出的 hash 值顺时针找到的第一个节点.
- /**
- * 获得一个最近的顺时针节点
- * @param key 为给定键取 Hash, 取得顺时针方向上最近的一个虚拟节点对应的实际节点
- * @return
- */
- public T get(Object key) {
- if (circle.isEmpty()) {
- return null;
- }
- long hash = hashFunction.hash((String) key);
- if (!circle.containsKey(hash)) {
- // 返回此映射的部分视图, 其键大于等于 hash
- SortedMap<Long, T> tailMap = circle.tailMap(hash);
- hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
- }
- return circle.get(hash);
- }
单表拆分实践
上面完成了一致性 hash 算法的实现, 包含了 hash 算法和 hash 环的实现. 接下来就要处理具体业务中, 如何使用这个 hash 环和算法了.
我们业务中, 主要操作这张表的数据, 也就是增删查. 然后我们数据库拆分成了 3 个, 所以需要增删查的操作基本一致, 都是先通过一致性 hash 得到库, 再通过一致性 hash 得到表.
获取数据库名的操作如下, 获取到数据库后, 根据数据库名到对应的连接池中获取连接.
- /**
- * 根据试验信息 id 获取其所在库名
- * DatabaseType 为我们数据的枚举
- * @return 数据库的名称
- **/
- private String getDataBase(String experimentMessageId) {
- // 获取数据源
- DatabaseType[] databasetype = DatabaseType.values();
- List<String> dataBaselist = new ArrayList<>();
- Map<String, DatabaseType> map = new HashMap<>();
- for (DatabaseType d:databasetype) {
- if (!d.equals(DatabaseType.KC)) {
- dataBaselist.add(d.toString());
- map.put(d.toString(), d);
- }
- }
- // 获取数据源 hash
- ConsistentHash<String> dataBaseCon = getConsistentHash(dataBaselist);
- // 获取 id 所在数据源
- String dataBase = dataBaseCon.get(experimentMessageId);
- return dataBase;
- }
获取表名的操作如下, 获取到数据库后, 在对应的数据库中找到需要的表, 再从该表中查询数据.
- /**
- * 根据试验信息 id 获取其试验数据所在表
- * @return
- **/
- public String getTableName(String experimentMessageId) {
- String dataBase = getDataBase(experimentMessageId);
- // 查询所有试验数据表
- List<String> tables = experimentDataEODao.queryTbaleNames(dataBase, tableName);
- ConsistentHash<String> consistentHash = getConsistentHash(tables);
- String tableName = consistentHash.get(experimentMessageId);
- return tableName;
- }
剩下的增删改操作和平常一致, 在此不多赘述.
数据迁移实践
一致性 hash 势必涉及到数据迁移问题, 我们采取的数据迁移方式为定时任务, 针对每个数据库在每天夜里全量扫描一次. 检查是否有数据量超过 1000 万的表, 若存在这样的表, 就把现有的表数量 double.
数据迁移只会在同库之间迁移, 不会涉及跨数据库的情况.
此方案为初步方案, 后续会改进的更加智能, 根据表的数量, 增加不同数量的表. 而不是简单的把表数量翻倍.
表创建后, 将需要迁移的表数据逐个迁移.
在连接到数据源后, 我们做了如下事情进行数据迁移
1. 获取库中所有的表
List<String> tables = getTables(connection, p, d.toString());
2. 遍历表, 检查表中数据是否超过边界线 (我们为 1000 万)
- for (int i = 0; i <tables.size(); i++) {
- // 查询表内数据量
- int num = countByTableName(connection, p, tables.get(i));
- //finalNum 为边界值, 此处为 1000 万
- if (num> finalNum) {
- ......
- }
- ......
- }
3. 根据所有的表计算现有的虚拟节点
ConsistentHash<String> consistentHashOld = getConsistentHash(tables);
4. 把表加倍
- List<String> tablesNew = deepCopy(tables); // 注意一定要采用深复制
- int tableSize = tablesNew.size();
- for (int y = 0; y <tableSize; y++) {
- String tableNameNew = tableName + (tablesNew.size() + 1);
- // 创建表
- createTable(connection, p, d.toString(), tableNameNew);
- tablesNew.add(tableNameNew);
- tableDelete.add(tableNameNew);
- }
5. 计算加倍后的虚拟节点
ConsistentHash<String> consistentHashNew = getConsistentHash(tablesNew);
6. 数据迁移
- for (int z = 0; z <tableSize; z++) {
- String tableNameOld = tablesNew.get(z);
- // 查询试验信息 id 不重复的试验数据信息
- List<String> disData = selectExperimentIdDis(connection, p, tableNameOld);
- List<String> deleteList = new LinkedList<>();
- for (String experimentId : disData) {
- // 如果数据 hash 计算 原所在表与新建表之后不一致, 执行转移
- if (!consistentHashNew.get(experimentId).equals(consistentHashOld.get(experimentId))) {
- // 新增到新表数据
- insertHash(connection, p, experimentId, consistentHashOld.get(experimentId),
- consistentHashNew.get(experimentId));
- // 删除数据集合
- deleteList.add(experimentId);
- // 删除旧表数据
- final int defaultDelNum = 1000;
- if (deleteList.size() == defaultDelNum) {
- deleteInbatch(connection, p, deleteList, tableNameOld);
- deleteList.clear();
- }
- }
- }
- // 删除旧表数据
- if (deleteList.size()> 0) {
- deleteInbatch(connection, p, deleteList, tableNameOld);
- }
- }
总结
以上为我们所做的一致性 hash 实践, 其中还存在很多问题, 比如迁移过程单线程导致迁移较慢, 自动扩容机制不智能, 迁移过程中数据访问不稳定等情况.
我们将会在后续的开发中逐步进行完善改进.
以上就是我们针对一致性 hash 在 oracle 分表中的实践
参考
一致性哈希算法原理
ketama 算法 https://github.com/RJ/ketama
来源: https://www.cnblogs.com/zer0Black/p/9650822.html