移动互联网时代,微信和微博已经成为这个时代的两大支柱类社交应用。
这两类应用,其中一个是 IM 产品,一个是 Feed 流产品,微信的朋友圈也属于 Feed 流。
如果再细心去发现,会发现基本所有移动 App 都有 Feed 流的功能:消息广场、个人关注、通知、新闻聚合和图片分享等等。各种各样的 Feed 流产品占据了我们生活的方方面面。
IM 和 Feed 流功能已经基本成为所有 App 标配,如何开发一个 IM 或者 Feed 流功能是很多架构师、工程师要面临的问题。虽然是一个常见功能,但仍然是一个巨大的挑战,要考虑的因素非常多,比如:
为了解决上述问题,我们之前推出了三篇文章来阐述:
上述三篇文章推出后,用户反响很好,在各个平台的传播很广,为很多用户提供了设计一款 IM 和 Feed 流产品的架构思路,但是从这里到完全实现一个可靠的 IM、Feed 流系统平台还有很长的路,比如:
针对上述三个问题,在 《现代 IM 系统中消息推送和存储架构的实现》 中引入了一个逻辑模型概念:Timeline 模型。
在《现代 IM 系统中消息推送和存储架构的实现》中基于 IM 系统提出了 Timeline 模型,进一步会发现 Timeline 模型适用场景可以更广泛:
IM 和 Feed 流产品完全匹配上述四个特征,所以 Timeline 模型可以完全适用于 IM 和 Feed 流场景中。
下面我们来看看如何在各个场景中使用 Timeline:
每个用户只有一个同步库 Timeline,就算用户 A 在 10 个群里面,那么这个 10 个群的同步消息都是发送给用户 A 的这一个同步库 Timeline 中。
如果是微博,粉丝可能会达到上亿级别,这时候会比朋友圈稍微复杂些:
从上面分析可以看出来,不管是 IM,还是 Feed 流产品都可以将底层的存储、同步逻辑抽象成一个对多个 Timeline 进行读写的模型。
有了 Timeline 概念模型后,从 IM/Feed 流应用映射到 Timeline 就比较容易了,但是从 Timeline 映射到存储、同步系统仍然很复杂,主要体现在:
这些问题涉及的内容光,细节多,深度大,坑较多等,整体上很繁杂,这一部分在耗费了大量人力之后,结果可能并不理想。
针对上述问题,只要存储系统和推送系统确定后,剩余的工作都是类似的,可以完全将经验封装起来成为一个 LIB,将表结构设计,读写方式,隐患等等都解决好,然后供后来者使用,后来者可以不用再关心 Timeline 到底层存储系统之间的事情了。
所以,我们基于 JAVA 语言实现了一个 TableStore-Timeline LIB,简称 Timeline LIB。
目前已经开源在了 GitHub 上: Timeline@GitHub 。
Timeline LIB 的结构如下:
整个 Timeline 分为两层,上层的 Timeline 层和下层的 Store 层。
Timeline 层,提供最终的读写接口,用户操作的也是 Timeline 的接口。
Store 层,负责存储系统的交互,目前 Timeline LIB 中提供了 DistributeTimelineStore,基于 Table Store,同时实现了分布式的存储和同步。后续会继续实现 GlobalTimelineStore 等。如果有用户有其他系统需求,比如 MySQL,Redis,可以通过实现 IStore 接口来新增 MySQLStore 和 RedisStore。
也欢迎大家将自己实现的 Store 通过 GitHub 的 PullRequest 共享出来。
有了 Timeline LIB 之后,如果要实现一个 IM 或者 Feed 流,只需要创建两种类型 Timeline(存储类,同步类),然后调用 Timeline 的读写接口即可。
接下来,我们看下 Timeline LIB 的 API。
Timeline LIB 中面向最终用户的是 Timeline 类,用于对每个 Timeline 做读写操作。
Timeline 的接口主要分为三类:
- /**
- * Timeline的构造函数。
- * @param timelineID 此Timeline对应的ID。唯一标识一个Timeline,需要全局唯一,如果业务场景中需要多个字段才能唯一标识一个TimelineID,此时可以将多个字段拼接成一个字段。
- * @param store 此Timeline关联的Store,一般为存储Store或同步Store。实现了IStore接口类的对象,目前LIB中默认实现了DistributeTimelineStore,可以使用此store。除此之外,用户还可以自己实现自己的Store类,用于适配其他系统。
- */
- public Timeline(String timelineID, IStore store);
- /**
- * 写入一个消息到此Timeline中。
- * @param message 消息对象,需实现IMessage接口。用户需要通过实现IMessage接口创造符合自己业务场景的消息类,LIB中默认实现了StringMessage类。
- * @return 完整的TimelineEntry,包括消息和顺序ID。
- */
- public TimelineEntry store(IMessage message);
- /**
- * 异步写入消息接口。
- * @param message 消息对象,需实现IMessage接口。
- * @param callback 回调函数。
- * @return Future对象,异步模式下,Future和callback需要二选一。
- */
- public Future < TimelineEntry > storeAsync(IMessage message, TimelineCallback < IMessage > callback);
- /**
- * 批量写入消息接口。
- * 此接口只是把消息加入到本地的一个buffer中,当buffer满或者超时(默认10s,可配置)才会统一写入。
- * 此接口返回时并不一定消息已经写入成功。
- * @param message 消息对象,需实现IMessage接口。
- */
- public void batch(IMessage message) {
- /**
- * 同步读取接口,通过制定一个唯一的顺序ID读取目标TimelineEntry。
- * @param sequenceID 顺序ID。此顺序ID可由store或scan接口获取到。
- * @return 完整的TimelineEntry,包括消息和顺序ID。
- */
- public TimelineEntry get(Long sequenceID);
- /**
- * 异步读取接口,通过制定一个唯一的顺序ID读取目标TimelineEntry。
- * @param sequenceID 顺序ID。
- * @param callback 读取结束后的回调函数。
- * @return Future对象,异步模式下,Future和Callback需要二选一。
- */
- public Future < TimelineEntry > getAsync(Long sequenceID, TimelineCallback < Long > callback);
- /**
- * 顺序读取一段范围内或固定数目的消息,支持逆序,正序。
- * @param parameter 顺序读取的参数,包括方向、from、to和maxCount。
- * @return TimelineEntry的迭代器,通过迭代器可以遍历到待读取的所有消息。
- */
- public Iterator < TimelineEntry > scan(ScanParameter parameter);
在 Maven 工程中使用 Timeline LIB 只需在 pom.xml 中加入相应依赖即可:
- <dependency>
- <groupId>com.aliyun.openservices.tablestore</groupId>
- <artifactId>timeline</artifactId>
- <version>1.0.0</version>
- </dependency>
使用之前,需要先实现一个满足自己业务特点的 Message 类,此 Message 类能表示业务中的一条完整消息。
需要实现 IMessage 的下列接口:
在一个 IM 或 Feed 流产品中,一般会有两个子系统,一个是存储系统,一个是同步系统。
需要为这两个系统各自生成一个 Store 对象。
Store 生成好后就可以构造最终的 Timeline 对象了,Timeline 对象分为两类,一类是存储库 Timeline,一个是同步库 Timeline。
当在 IM 中发布消息或者 Feed 流产品中发布状态时,就是对相应存储库 Timeline 和同步库 Timeline 的消息写入 (store/storeAsync)。
当在 IM 或 Feed 流产品中读取最新消息时,就是对相应同步库 Timeline 的范围读取 (scan)。
当在 IM 或 Feed 流产品中读取历史消息时,就是对相应存储库 Timeline 的范围读取 (scan)。
如果是推拉结合的微博模式,则读取最新消息时,就是对相应存储库 Timeline 和同步库 Timeline 的同时范围读取 (scan)。
Timeline LIB 中会抛出 TimelineException,TimelineException 提供了两种接口:getType() 和 getMessage(),getType() 返回此 TimelineException 的类型,包括了 TET_ABORT,TET_RETRY,TET_INVALID_USE,TET_UNKNOWN:
这一节会演示下如何使用 Timeline LIB 实现 IM 的群组功能。
构造两个 store,一个用来存储,一个用例同步。
- IStore store = new DistributeTimelineStore(storeConfig);
- IStore sync = new DistributeTimelineStore(syncConfig);
- // 构造群成员列表,群成员列表可以存储在Table Store。
- List < String > groupMembers = Arrays.asList("user_A", "user_B", "user_C");
user_A 发一条群消息:"有人吗"。
- // 存储会话消息
- Timeline timeline1 = new Timeline("11789671", store);
- timeline1.store(new StringMessage("user_A:有人吗"));
- // 发送同步消息
- for (String user: groupMembers) {
- Timeline timeline = new Timeline(user, sync);
- timeline.store(new StringMessage("user_A:有人吗"));
- }
user_C 读取自己最新的同步消息
- Timeline timeline = new Timeline("user_C", sync);
- ScanParameter scanParameter = ScanParameterBuilder
- .scanForward()
- .from(last_sequence_id)
- .to(Long.MAX_VALUE)
- .maxCount(100)
- .build();
- Iterator<TimelineEntry> iterator = timeline.scan(scanParameter);
- while(iterator.hasNext()) {
- TimelineEntry entry = iterator.next();
- // 处理消息
- }
上面的示例演示了如何用 Timeline LIB 实现 IM 中的群组功能。其他的朋友圈,微博等也类似,这里就不赘述了。
我们目前在 Timeline Samples@GitHub 上实现两个场景的实例:
也欢迎大家共享其他场景的实现代码。
我们使用阿里云 ECS 做了性能测试,效果较理想。
在阿里云共享型 1 核 1G 的 ECS 机器上,使用 DistributeTimelineStore,Timeline LIB 的 storeAsync 接口可以完成每秒 1.2 万消息的写入,如果使用 batch 批量接口,则可以完成每秒 5.3 万消息的写入。
如果使用一台 8 核的 ECS,只需要 3 秒钟就可以完成 100 万条消息的写入。
由于 DistributeTimelineStore 使用了 Table Store 作为存储和同步系统,Table Store 是阿里云的一款服务化 NoSQL 服务,支持的 TPS 在理论上无上限,实际中仅受限于集群大小,所以整个 Timeline LIB 的写入能力和压力器的 CPU 成正比。
下面的图展示了不同机型上完成 1000 万条消息写入的延迟:
在一台 8 核 ECS 上只需要 27 秒就可以完成 1000 万写入,由于写入能力和 CPU 成线线性关系,如果用两台 16 核的,则只需要 7 秒就可以完成 1000 万消息的写入。
我们再来看一下 scan 读取的性能,读取 20 条 1KB 长度消息,LIB 端延迟一直稳定在 3.4ms,Table Store 服务端延迟稳定在 2ms。
这个量级和能力,可以撑得住目前所有的 IM 和 Feed 流产品的压力。
Timeline LIB 的想法来源于 Table Store 的真实场景需求,并且为了用户可以更加简单的使用,增加了主键列自增功能。
目前 Timeline LIB 的 store 层实现了 DistributeTimelineStore 类,DistributeTimelineStore 可同时适用于存储 store 和同步 store。
DistributeTimelineStore 是基于 Table Store 的,但是为了便于用户使用其他系统,在 Timeline LIB 中将 Store 层独立了出来。
如果用户希望使用其他系统,比如 MySQL 作为存储系统,可以实现 IStore 接口构造自己的 Store 类。我们也欢迎大家提供自己的各种 Store 层实现,最终希望为社交场景的架构师和开发者提供一套完整的易用性开发框架。
如果在使用过程中有任何问题或者建议,可以通过下列途径联系我们:
来源: https://yq.aliyun.com/articles/319138