图书馆Q是一家大型图书馆,图书馆藏书众多,纸质图书600多万册,电子图书7000多万册,总数有八千多万册,这些图书之前都是人工检索维护的,现在需要做一个系统来存储管理这些图书信息。
需求如下:
根据上面这些需求特点,要完成这个管理系统,需要两类系统支持:
如果使用阿里云产品,那么对应的产品就是:
在管理系统中使用上述两个系统的时候,目前需要双写,当新增一本书的时候,需要将详细书本信息写入Table Store,将书本ID和作者,书名写入Elasticsearch,并且对书名,作者建索引。查询的时候,如果是根据书本ID,则直接查询Table Store。如果是根据书名模糊查询,则先查Elasticsearch,获取到匹配的书本的ID后,再到Table Store中查询详细信息。
如果Table Store到Elasticsearch有自动同步通道,那么只需要将新书信息写入Table Store即可,不再需要写Elasticsearch。减少了一次写入操作,且不用再考虑数据一致性问题,系统架构大大简化。那么如何才能实现这个自动同步通道呢?
类似于上面的场景,有很多系统都有这样的需求:拥有PB级海量数据需要持久化存储,同时有一两个字段需要做模糊查询,比如姓名,手机号码等,目前很多解决方案需要双写分布式数据库和Elasticsearch,但这样不仅会带来开发、运维复杂度,而且还有数据不一致的问题。
针对上述问题,Table Store团队联合数据集成(CDP)和Elasticsearch团队上线了近实时的数据同步方案,用户只需要将数据写入Table Store,Table Store会负责将数据在10分钟内自动发送给Elasticsearch建索引。
Table Store:阿里云分布式NoSQL数据库,专注于海量数据的存储服务,目前单表可支持10PB级,10万亿行以上的数据量,且数据量增大后性能仍然保持稳定。Table Store Stream功能是一种增量实时通道服务,类似于MySQL的binlog,可以通过Stream接口实时读取到最新的变化数据(Put/Update/Delete)。
数据集成 :阿里云数据管理平台,支持数据同步等众多数据功能。
Elasticsearch :阿里云Elasticsearch是刚推出的一项新服务,提供基于开源Elasticsearch及商业版X-Pack插件,致力于数据分析、数据搜索等场景服务。在开源Elasticsearch基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。
三种产品在新解决方案中的角色如下:
产品 | Table Store | 数据集成 | Elasticsearch |
---|---|---|---|
角色 | 数据存储 | 数据同步通道 | 查询增强 |
由于Table Store和Elasticsearch不是完全对等的产品,所以如果需要将数据导入Elasticsearch,那么在使用Table Store的时候有一些注意的地方:
无须配置
无须配置
在配置界面,已经提前嵌入了OTSStreamReader和ElasticsearchWriter的模板,每一项配置后面都做了解释。
- {
- "type": "job",
- "version": "1.0",
- "configuration": {
- "setting": {
- "errorLimit": {
- "record": "0"#允许出错的个数,当错误超过这个数目的时候同步任务会失败。
- },
- "speed": {
- "mbps": "1",
- #每次同步任务的最大流量。"concurrent": "1"#每次同步任务的并发度。
- }
- },
- "reader": {
- "plugin": "otsstream",
- #Reader插件的名称。"parameter": {
- "endpoint": "",
- #TableStore中实例的endpoint。"accessId": "",
- #阿里云的AccessKeyID。"accessKey": "",
- #阿里云的AccessKeySecret。"instanceName": "",
- #TableStore的实例名,如果使用DataSource,则需要新增配置项datasource,不再需要配置endpoint,accessId,accessKey和instanceName。"dataTable": "",
- #TableStore中的表名。"statusTable": "TableStoreStreamReaderStatusTable",
- #存储TableStore Stream状态的表,一般不需要修改。"startTimestampMillis": "",
- #开始导出的时间点。"endTimestampMillis": "",
- #结束导出的时间点。"date": "yyyyMMdd",
- #导出哪一天的数据,功能和startTimestampMillis、endTimestampMillis重复,这一项需要删除。"mode": "single_version_and_update_only",
- #TableStore Stream导出数据的格式,目前ElasticSearch只能接收这种格式的,这个不需要修改。如果配置模板中没有则需要增加。"column": [#需要导出TableStore中的哪些列到ElasticSearch中去,如果配置模板中没有则需要增加。 {
- "name": "uid"
- },
- {
- "name": "name"
- },
- {
- "name": "phone"
- }],
- "isExportSequenceInfo": false,
- ##single_version_and_update_only模式下只能是false。"maxRetries": 30#最大重试次数。
- }
- },
- "writer": {
- "plugin": "elasticsearch",
- #Writer插件的名称:ElasticSearchWriter,不需要修改。"parameter": {
- "endpoint": "",
- #ElasticSearch的endpoint,控制台上有。"accessId": "",
- #如果使用了X - PACK插件,则这里需要填写username,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X - PACK插件,这里需要填写username。"accessKey": "",
- #如果使用了X - PACK插件,则这里需要填写password,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X - PACK插件,这里需要填写password。"index": "",
- #ElasticSearch的索引名称,如果之前没有,插件会自动创建。"indexType": "",
- #ElasticSearch中相应索引下的类型名称"cleanup": true,
- #是否在每次导入数据到ElasticSearch的时候清空原有数据,全量导入 / 重建索引的时候需要设置为true,同步增量的时候必须为false,这里因为是同步,则需要设置为false。"discovery": false,
- #是否自动发现,设置为true "batchSize": 1000,
- #每批导出的个数"splitter": ",",
- #如果插入数据是array,就使用指定分隔符。"column": [#ElasticSearch中的列名,顺序和Reader中的Column顺序一致 {
- "name": "uid",
- #TableStore中的主键列是uid,这里也有同名uid,用type:id表示这一列是主键"type": "id"#id表示这一列是主键,id不是ElasticSearch的内置类型,是ElasticSearchWriter提供的虚拟类型
- },
- {
- "name": "name",
- #对应于TableStore中的属性列:name "type": "text"#文本类型,采用默认分词
- }]
- }
- }
- }
- }
其他配置项可以参考:ElasticsearchWriter配置项
即可获取到UUID。
- bash dmidecode | grep UUID
- curl -XGET http://endpoint/index_name/type_name/_count?pretty -d '
- {
- "query": {
- "match_all": {}
- }
- }'
结果类似下面:
- {
- "count" : 1000, # ElasticSearch中index_name索引的type_name类型中的doc数
- "_shards" : { # 这个是ElasticSearch返回数据相关的meta值,表示总共有5个shard,全部成功返回了结果
- "total" : 5,
- "successful" : 5,
- "skipped" : 0,
- "failed" : 0
- }
- }
来源: https://yq.aliyun.com/articles/257489