引言
对数据进行处理的同学,经常会遇到排序需求,无论是内存数据还是磁盘数据.
对于单点的数据,我们的处理比较简单,比如:
存储服务的处理流程一般可抽象如下:
select field_a from table_b order by field_a limit 100, 10;
db.collection_b.find().sort({"field_a":1}).skip(100).limit(10);
信息爆炸的时代,数据早已不是单点所能承载的了,数据一般分布在大量节点上,假设某库中的数据均匀地分布在以下的所有节点上.
这时 sort, limit 的一般方法是选择一个中间节点或者中间件来做合并处理:
一般处理流程的动态表示如下:
我们将过程抽象,流程简化如下:
注意第三步在数据节点中的查询结果范围为 [0,skip+limit].当我们想查询 [skip=1000000, limit=200] 的数据,意味着需要在各节点
上先查询 [skip=0, limit=1000000+200] 的数据,再由归并服务对结果进行 [skip=1000000, limit=200] 的排序, 对存储 IO 与网络 IO 的处理量级与 skip 成正比,
对于 T 级以上规模所数据处理,无法做到实时处理.
下面来讨论另一种方式
分布式框架大牛学习交流群:697579751(没有开发经验勿加!)
理论基础
在一般对数据的处理方法中,我们基于一个共同的假设:各数据存储节点只具备简单的对外查询功能,相互之间的连接功能是很弱的,主要有主从,选举,更一步的功能就少了.
现在我们要改变这一假设.
理论描述
假设各存储节点具备相互对话的能力.比如,"hey, 你那里 skip 为 100 的数据是哪个", "好的,我这里 skip 为 100 的数为 m."
对话分成几种,第一种是扩散请求,当其中一节点收到一次请求后, 此节点会将请求迅速扩散到所有其他相关的节点.
第二种对话是应答式,简单的你问我答型.
假设有一个排序全网排序请求,在某一节点获得请求后,扩散给网络需要对此请求处理的请求,各节点在进经 n 次对话后,产生最终的结果.
概念定义
在一堆数据中,数据 m 前面有 n-1 个数,则 m 的排序索引为 n.
通过问答式查询,我们可以轻而易举地获得某个数在全网中的排序索引,只需要将各节点上排在此节点前的个数相加即可.
推导
简单点,如果我们要在一批数据中查询 skip=100, limit=20 的数据有哪些,我们的目标是在全网数据中获取 b,e.
b 的索引为第 100,
e 的索引为 120.
则所有在 [b,e] 之前的数都是我们的目标数据.
实际上还要考虑数据重复,即在 b 的索引为 98 个, b 个数为 4,e 的索引 118, e 个数为 5,则目标数据以 [b, b] 开头, 以 [e,e,e] 结尾.
技术使用
某节点想知道数据 m 前面有多少个数, 则直接向其他数据节点发送对话,所有节点(包含自身)只需要返回本节点中在 m 前面的数据个数, 假设各节点上的查询结果个数为 n1,n2,n3,...n10,
则全量数据中数据 m 前面的数据个数 n=(n1+n2+n3+.....+n10).
以数据 m 做为递度对象, n 做为结果向 skip, skip+limit 逼近,在全量数据中获取最终的 b,e.
架构设计
请求处理流程
如图所示,对于一次请求我们会分成三个部分
节点确认阶段
此阶段确认哪些节点参与发现.
结果同步阶段
同步的过程是相互的,相互猜测,查询对应数据的索引.
这一步是处理的核心步骤,通过相互确认,最终逼近索引在 [100,120] 之间的数是哪些.
结果合并
各数据节点将数据结果同步出去,如果 skip=100 万, limit=20, 最多也就同步 20 条数据,
不再于 skip 正成比.
模型假设
假设存在 m 个节点,各节点上的数据都是各自排好序的,各节点间平均来回时间为 t1,
单次查询确认程序执行时间为 t2,
每次确认的数据个数为 p,假设结果确认阶段平均某节点的对外请求次数不在于 s.
节点确认时间为 t1
结果确认阶段时间 <=s*(t1+t2)
结果合并阶段时间为 t1
则总共所需时间为 (2 + s)t1 + st2
从上面结果得出,请求所需时间与节点个数不成正比,与节点间的平均网络时间及算法次数相关.
假设各节点在同一个局域网中,相互间的来回网络时间 t1<1ms, 程序执行时间 t2 < 1ms,单节点对外请求次数不超过 100,
则总共所需求时间不超过 (2+100)+100=202ms
理论要求
如果希望在 200ms 内完成一次查询,则平均某个节点对外请求次数不超过 100,对应的查询数据总次数则不超过 100*p, 假设 p 为 100, 则总次确认的总次数可以达到 10000 次.
下面我们来模拟一次真实的操作吧.
模拟操作
数据准备
假设对应的数据为正整型,在 10 个节点中查询 skip=100, limit=20 的所有数据.
则我们要通过对话确认索引分别为 100,120 的数为哪个.
考虑到数据重复,我们为各个数建立向量 (数据,索引,个数),假设索引为 100 的数为 b, 个数为 c(b), 索引为 120 的数为 e, 个数为 c(e), 则我们所要获得的向量为 (b,100,c(b)), (e, 120, c(e)).
首轮
由于所有数据都是正整型,则我们知道最小的数为 0, 最大的数为 2^31,
因此第一次待确认队列里可以包含 [0, max=2*31] 在所有节点上的情况,得到 (0, i(0), c(0)), (max, i(max), c(max)).
同时为了更好地得到逼近效果,先做一次全范围猜测,比如 max/100 做猜测,
以得到(max/100, i(max/100), c(max/100), (max2/100, i(max2/100), c(max*2/100)), .......(max^99/100,i(max^99/100),c(max^99/100)).
其实 0 可以认为是 max0, 则第一次做逼近的数据可以是 (nmax/100), n~[0,100].
目标逼近
经过第一轮猜测后,全网络都知道了 (n*max/100), n~[0,100] 对应的向量.
存在 2 个数 n1, n2 满足 i(n1max/100=s1) + c(s1)<=100, i(n2max/100=s2) + c(s2)>=100.(如果不存在 n2, 则表明不存在这个数,其全局索引 >=100, 因此结果为空,直接跳到数据合并阶段)
存在 2 个数 n3, n4 满足 i(n3max/100=s3) + c(s3)<=120, i(n4max/100=s4) + c(s4)>=120.(如果不存在 n4, 则表明不存在这个数,其全局索引 >=120, 假设存在 n2, 则数据大于等于 n2*max/100 的数都是目标数据.)
则有 s1<= b <=s2, s3<=e<=s4,
我们对再 [s1, s2], [s3, s4] 做相应的逼近,直至获取到最终 b,e, 满足 c(b) + i(b) <= 100, c(e) + i(e) <= 120.
提升规模
从上面的结论来看,数据规模对时间的影响不大,假设数据模块为 T 级或者 P 级, 直接影响的是查询某个数在此数据节点上前面有多少个数.
为了降低响应时间,我们只需要设计好数据结构,以支持快速的向量查询.
假设单个节点的数据是 G 级,假设我们用红黑树存储,是 T/P 级,我们用 B + 数,假设每颗节点存储着其子节点的个数.
我们以经黑树为例:
如果需要获取小于数 150 前面的个数,则只需要找到其所在左支遍历的个数加上根节点的左侧子节点个数.
转化为代码即为:
结论
function getCount(node, child){
if (node.right == child) {
return { node:node.parent, count:node.leftCount + 1 };
}
else {
return { node:node.parent, count:0};
}
}
node=node_150;
var count = node.leftCount;
var nc = {node:node, count:0};
while(node != null){
nc = getCount(node.parent, node);
node = nc.node;
count += nc.count;
}
console.log("node 150's left count:"+count);
我们的理论目标环境:
1. 数据分布在大量的数据节点上,并且在节点上是有序的.
2. 各节点间的网络延时不超过 1mm.
在此分布式环境下可以实现对 T/P 级数据进行最多 200ms 延时的实时快速排序.
分布式框架大牛学习交流群:697579751(没有开发经验勿加!)
来源: https://juejin.im/post/5a6c20626fb9a01c9c1f41b5