潭潭 2019-05-14 17:21:40 浏览 232 评论 0
消息系统
架构
消息队列
索引
表格存储
全文检索
存储
im
Feed 流
timeline
多元索引
摘要: 互联网快速发展的今天, 社交类应用, 消息类功能大行其道, 占据了大量网络流量. 大至钉钉, 微信, 微博, 知乎, 小至各类 App 的推送通知, 消息类功能几乎成为所有应用的标配. 根据场景特点, 我们可以将消息类场景归纳成三大类: IM(钉钉, 微信),Feed 流 (微博, 知乎) 以及常规消息队列.
背景
互联网快速发展的今天, 社交类应用, 消息类功能大行其道, 占据了大量网络流量. 大至钉钉, 微信, 微博, 知乎, 小至各类 App 的推送通知, 消息类功能几乎成为所有应用的标配. 根据场景特点, 我们可以将消息类场景归纳成三大类: IM(钉钉, 微信),Feed 流 (微博, 知乎) 以及常规消息队列. 因此, 如何开发一个简便而又高效 IM 或 Feed 流功能, 成为了很多架构师, 开发人员不得不面对的问题.
Timeline 1.0 版模型
针对消息类场景, 表格存储团队针对 JAVA 语言打造了一个 TableStore-Timeline 1.0 版数据模型模型(简称 Timeline 模型). 基于场景经验与理解, 将消息场景封装成一个数据模型, 提供了表结构设计, 读写方式等解决方案供需求者使用. 用户只需依托模型 API, 直接忽略 Timeline 到底层存储系统之间的架构方案, 直接基于接口实现业务逻辑. 它能满足消息数据场景对消息保序, 海量消息存储, 实时同步等特殊需求. Timeline 1.0 是定义在表格存储之上抽象出来的数据模型, 具体内容参见《TableStore Timeline: 轻松构建千万级 IM 和 Feed 流系统》.
全文检索, 模糊查询需求
在表格存储的 Timeline 模型受到广泛使用的过程中, 我们也逐渐发现消息类数据的全文检索, 模糊查询这一很强需求. 而原有模型的在线查询能力存在一定短板. 随着表格存储支持了 SearchIndex 能力, 使得 Timeline 模型支持在线全文检索, 模糊查询成为了可能. 所以我们基于原有的架构设计, 重新打造了 Timeline 2.0 模型, 引入了强大的查询能力与数据管理新方案.
项目代码目前已经开源在了 GitHub 上: Timeline@GitHub https://github.com/aliyun/tablestore-timeline .
2.0 时代到来
此次推出的 Timeline 模型 2.0 版, 没有直接基于 1.X 版本直接改造. 而是在兼容原有模型架构之上, 定义, 封装了新的使用接口. 重新打造升级新的模型, 增加了如下功能:
增加了 Timeline Meta 的管理能力;
基于多元索引功能, 增加了 Meta,Timeline 的全文检索, 多维组合查询能力;
支持 SequenceId 两种设置方式: 自增列, 手动设置;
支持多列的 Timeline Identifier 设置, 提供 Timeline 的分组管理能力;
兼容 Timeline 1.X 模型, 提供的 TimelineMessageForV1 样例可直接读, 写 1.X 版本消息, 用户也可仿照实现.
架构解析
Timeline 做为表格存储直接支持的一种数据模型, 以『简单』为设计目标, 其存核心模块构成比较清晰明了. Timeline 尽量提升用户的使用自由度, 让用户能够根据自身场景需求选择更为合适的实现方案. 模型的模块架构如上图, 主要包括如下重要部分:
Store: 存储库, 类似数据库的表的概念, 模型包含两类 Store 分别为 Meta Store,Timeline Store.
Identifier: 用于区分 Timeline 的唯一标识, 用以标识单行 Meta, 以及相应的 Timeline Queue.
Meta: 用于描述 Timeline 的元数据, 元数据描述采用 free-schema 结构, 可自由包含任意列.
Queue:Queue 为单 Identifier 对应的所有消息队列, 一个 Timeline 下 Message 按照 Queue 单元存储.
SequenceId:Queue 中消息体的序列号, 需保证递增, 唯一, 模型支持自增列, 自定义两种实现模式.
Message:Timeline 内传递的消息体, 是一个 free-schema 的结构, 可自由包含任意列.
Index: 基于 SearchIndex 实现的索引, 针对不同 Store 分为 Meta Index 和 Message Index 两类, 可对 Meta 或 Message 内的任意列自定义索引, 提供灵活的多条件组合查询和搜索.
性能优势
Timeline 模型是基于 Tablestore 抽象, 封装出的一类场景数据模型, 因而具有 Tablestore 自身的所有优点. 同时结合场景设计的接口, 让用户更直观, 清晰的实现业务逻辑, 总结如下:
支撑海量数据存储: 分布式架构, 高可扩展, 支持 10PB 级的消息.
低存储成本: 表格存储提供低成本的存储方式, 按量付费, 资源包, 预留 Cu.
数据生命周期管理: 不同类型 (表级别) 消息数据, 可自定义不同生命周期.
极高的写入吞吐: 具备极高的写入吞吐能力, 可应对 00M TPS 的消息写入.
低延迟的读: 查询消息延迟低, 毫秒量级.
接口设计: 可读性高, 接口功能全面, 清晰.
Maven 地址
- Timeline Lib
- <dependency>
- <groupId>com.aliyun.openservices.tablestore</groupId>
- <artifactId>Timeline</artifactId>
- <version>2.0.0</version>
- </dependency>
- TableStore Java SDK
Timeline 模型在 TableStore Java SDK>= 4.12.1 作为基本数据模型直接提供, 表格存储老用户可升级 SDK 直接使用
- <dependency>
- <groupId>com.aliyun.openservices</groupId>
- <artifactId>tablestore</artifactId>
- <version>4.12.1</version>
- </dependency>
入手指南
初始化
初始化 Factory
用户将 SyncClient 作为参数, 初始化 StoreFactory, 通过工厂创建 Meta 数据, Timeline 数据的管理 Store. 错误重试的实现依赖 SyncClient 的重试策略, 用户通过设置 SyncClient 实现重试. 如有特殊需求, 可自定义策略(只需实现 RetryStrategy 接口).
- /**
- * 重试策略设置
- * Code: configuration.setRetryStrategy(new DefaultRetryStrategy());
- * */
- ClientConfiguration configuration = new ClientConfiguration();
- SyncClient client = new SyncClient(
- "http://instanceName.cn-shanghai.ots.aliyuncs.com",
- "accessKeyId",
- "accessKeySecret",
- "instanceName", configuration);
- TimelineStoreFactory factory = new TimelineStoreFactoryImpl(client);
初始化 MetaStore
构建 meta 表的 Schema(包含 Identifier,MetaIndex 等参数), 通过 Store 工厂创建并获取 Meta 的管理 Store; 配置参数包含: Meta 表名, 索引, 表名, 主键字段, 索引名, 索引类型等参数.
- TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
- .addStringField("timeline_id").build();
- IndexSchema metaIndex = new IndexSchema();
- metaIndex.addFieldSchema( // 配置索引字段, 类型
- new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord)
- new FieldSchema("create_time", FieldType.Long).setIndex(true)
- );
- TimelineMetaSchema metaSchema = new TimelineMetaSchema("groupMeta", idSchema)
- .withIndex("metaIndex", metaIndex); // 设置索引
- TimelineMetaStore timelineMetaStore = serviceFactory.createMetaStore(metaSchema);
初始化 TimelineStore
构建 timeline 表的 Schema 配置, 包含 Identifier,TimelineIndex 等参数, 通过 Store 工厂创建并获取 Timeline 的管理 Store; 配置参数包含: Timeline 表名, 索引, 表名, 主键字段, 索引名, 索引类型等参数.
消息的批量写入, 基于 Tablestore 的 DefaultTableStoreWriter 提升并发, 用户可以根据自己需求设置线程池数目.
- TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
- .addStringField("timeline_id").build();
- IndexSchema timelineIndex = new IndexSchema();
- timelineIndex.setFieldSchemas(Arrays.asList(// 配置索引的字段, 类型
- new FieldSchema("text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
- new FieldSchema("receivers", FieldType.KEYWORD).setIndex(true).setIsArray(true)
- ));
- TimelineSchema timelineSchema = new TimelineSchema("timeline", idSchema)
- .autoGenerateSeqId() //SequenceId 设置为自增列方式
- .setCallbackExecuteThreads(5) // 设置 Writer 初始线程数为 5
- .withIndex("metaIndex", timelineIndex); // 设置索引
- TimelineStore timelineStore = serviceFactory.createTimelineStore(timelineSchema);
Meta 管理
Meta 管理提供了增, 删, 改, 单行读, 多条件组合查询等接口. 其中多条件组合查询功能基于多元索引, 只有设置了 IndexSchema 的 MetaStore 才支持组合查询功能. 索引类型支持 LONG,DOUBLE,BOOLEAN,KEYWORD,GEO_POINT 等类型, 属性包含 Index,Store 和 Array, 其含义与多元索引相同.
TimelineIdentifer 是区分 Timeline 的唯一标识, 重复的 Identifier 会被覆盖.
- /**
- * 接口使用参数
- * */
- TimelineIdentifier identifier = new TimelineIdentifier.Builder()
- .addField("timeline_id", "group")
- .build();
- TimelineMeta meta = new TimelineMeta(identifier)
- .setField("filedName", "fieldValue");
- /**
- * 创建 Meta 表(如果设置索引则会创建索引)
- * */
- timelineMetaStore.prepareTables();
- /**
- * 插入 Meta 数据
- * */
- timelineMetaStore.insert(meta);
- /**
- * 根据 id 读取单行 Meta 数据
- * */
- timelineMetaStore.read(identifier);
- /**
- * 更新 Meta 数据
- * */
- meta.setField("fieldName", "newValue");
- timelineMetaStore.update(meta);
- /**
- * 根据 id 删除单行 Meta 数据
- * */
- timelineMetaStore.delete(identifier);
- /**
- * 通过 SearchParameter 参数检索
- * */
- SearchParameter parameter = new SearchParameter(
- field("fieldName").equals("fieldValue")
- );
- timelineMetaStore.search(parameter);
- /**
- * 通过 SearchQuery 参数检索(SearchQuery 是 SDK 原生类型, 支持所有多元索引检索条件)
- * */
- TermQuery query = new TermQuery();
- query.setFieldName("fieldName");
- query.setTerm(ColumnValue.fromString("fieldValue"));
- SearchQuery searchQuery = new SearchQuery().setQuery(query);
- timelineMetaStore.search(searchQuery);
- /**
- * 删除 Meta 表(如果存在索引, 同时删除索引)
- * */
- timelineMetaStore.dropAllTables();
Timeline 管理
Timeline 管理提供了消息模糊查询, 多条件组合查询接口. 消息的全文检索依托多元索引, 用户只需将相应字段索引类型设置为 TEXT, 即可通过 Search 接口实现消息的全文检索. Timeline 管理包含消息表的创建, 检索, 删除等.
- /**
- * 接口使用参数
- * */
- SearchParameter searchParameter = new SearchParameter(
- field("text").equals("fieldValue")
- );
- TermQuery query = new TermQuery();
- query.setFieldName("text");
- query.setTerm(ColumnValue.fromString("fieldValue"));
- SearchQuery searchQuery = new SearchQuery().setQuery(query).setLimit(10);
- /**
- * 创建 Meta 表(如果设置索引则会创建索引)
- * */
- timelineStore.prepareTables();
- /**
- * 通过 SearchParameter 参数检索
- * */
- timelineStore.search(searchParameter);
- /**
- * 通过 SearchQuery 参数检索(SearchQuery 是 SDK 原生类型, 支持所有多元索引检索条件)
- * */
- timelineStore.search(searchQuery);
- /**
- * 将 Writer 队列中未发的请求主动触发, 同步等待直到所有消息完成存储
- * */
- timelineStore.flush();
- /**
- * 关闭 Writer 与 Writer 中的线程池
- * */
- timelineStore.close();
- /**
- * 删除 Timeline 表(如果存在索引, 同时删除索引)
- * */
- timelineStore.dropAllTables();
Queue 管理
Queue 是单个消息队列的抽象概念, 对应一个 Store 下单个 Identifier 的所有消息. 通过 Queue 实例管理相应 Identifer 的消息队列, 支持基本的增, 删, 改, 单行查, 范围查等接口.
- /**
- * 接口使用参数
- * */
- TimelineIdentifier identifier = new TimelineIdentifier.Builder()
- .addField("timeline_id", "group")
- .build();
- long sequenceId = 1557133858994L;
- TimelineMessage message = new TimelineMessage().setField("text", "Timeline is fine.");
- ScanParameter scanParameter = new ScanParameter().scanBackward(Long.MAX_VALUE, 0);
- TimelineCallback callback = new TimelineCallback() {
- @Override
- public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {
- // do something when succeed.
- }
- @Override
- public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {
- // do something when failed.
- }
- };
- /**
- * 单个 Identifier 对应的消息队列
- * */
- timelineQueue = timelineStore.createTimelineQueue(identifier);
- /**
- * 存储消息
- * */
- // 同步
- timelineQueue.store(message);
- timelineQueue.store(sequenceId, message);
- // 异步, 支持 callback
- timelineQueue.storeAsync(message, callback);
- timelineQueue.storeAsync(sequenceId, message, callback);
- // 异步批量
- timelineQueue.batchStore(message);
- timelineQueue.batchStore(sequenceId, message);
- // 异步批量, 支持 callback
- timelineQueue.batchStore(message, callback);
- timelineQueue.batchStore(sequenceId, message, callback);
- /**
- * 单行读取, 获取最新一行, 获取最新 SequenceId
- * */
- timelineQueue.get(sequenceId);
- timelineQueue.getLatestTimelineEntry();
- timelineQueue.getLatestSequenceId();
- /**
- * 根据 SequenceId 更新消息
- * */
- message.setField("text", "newValue");
- timelineQueue.update(sequenceId, message);
- timelineQueue.updateAsync(sequenceId, message, callback);
- /**
- * 根据 SequenceId 删除消息
- * */
- timelineQueue.delete(sequenceId);
- /**
- * 根据范围参数, Filter 获取批量消息
- * */
- timelineQueue.scan(scanParameter);
专家服务
表格存储有一批精通 Timeline 领域的技术专家, 在打造 IM,Feed 流场景方面有着独到的见解. 如果您:
渴望寻觅 Timeline 领域高手过招;
调研 Timeline 场景解决方案;
准备入门 Timeline 场景;
对表格存储 (Tablestore) 产品感兴趣;
来源: https://yq.aliyun.com/articles/702419