本文以 TermQuery,GlobalOrdinalsStringTermsAggregator 为例, 通过代码, 分析 es,lucene 搜索及聚合流程.
1: 协调节点收到请求后, 将 search 任务发到相关的各个 shard.
相关代码:
- TransportSearchAction.executeSearch
- TransportSearchAction.searchAsyncAction.start
- AbstractSearchAsyncAction.executePhase(SearchQueryThenFetchAsyncAction)
- InitialSearchPhase.performPhaseOnShard
- SearchQueryThenFetchAsyncAction.executePhaseOnShard
2: 数据节点查询及聚合一个 shard.
相关代码:
SearchService.executeQueryPhase
2.1: 根据 request 构造 SearchContext.
SearchContext
包含 Query,Aggregator 等重要信息. 并将记录查询, 聚合结果.
Query
根据 request 创建具体的 query, 如:
TermQuery: 用于 keyword,text 字段. 索引结构为倒排.
PointRangeQuery: 用于数字, 日期, ip,point 等字段. 索引结构为 k-d tree.
Aggregator
此时仅根据 request 创建 AggregatorFactory, 用于后续创建 Aggregator.
相关代码:
SearchService.createAndPutContext
2.2: 根据 SearchContext 构造 Aggregator.
根据 SearchContext 构造具体的 Aggregator, 如:
GlobalOrdinalsStringTermsAggregator: 用于 keyword 字段, 开启 global ordinal 的 term 聚合.
StringTermsAggregator: 用于 keyword 字段, 关闭 global ordinal 的 term 聚合.
LongTermsAggregator: 用于 long 字段的 term 聚合.
TopScoreDocCollector: 用于为 doc 评分并取 topN.
相关代码:
AggregationPhase.preProcess
2.3: 创建 GlobalOrdinalsStringTermsAggregator, 如果 cache 中没有 GlobalOrdinals, 将创建 GlobalOrdinals, 并 cache. 当 shard 下数据发生变化时, 应当清空 cache.
GlobalOrdinals
将所有 segment , 指定 field 的所有 term 排序, 合并成一个 GlobalOrdinals, 并创建 OrdinalMap.collect 时, 使用 doc 的 segment ord 获取 global ord.
OrdinalMap
为每一个 segmentValueCount 小于 globalValueCount 的 segment, 保存了一份 segment ord 到 global ord 的 mapping(LongValues). 对于 segment valueCount 等于 globalValueCount 的 segment, 原本的 segment ord 就是 global ord, 后续获取 ord 时, 直接从 SortedSetDV(dvd)中读取.
value count
指的是不同 term 数量(term 集合的大小). 使用 globalValueCount 用来在 collect 时, 确定结果集的大小.
举例
- segment 1:{
- sorted terms: [aa, bb, cc],ord:[0, 1, 2]
- }.
- segment 2:{
- sorted terms: [bb, cc, dd],ord:[0, 1, 2]
- }.
- segment 3:{
- sorted terms: [aa, bb, cc, dd],ord:[0, 1, 2, 3]
- }.
- GlobalOrdinals:{
- sorted terms: [aa, bb, cc, dd],ord:[0, 1, 2, 3]
- }.
ordinalMap:segment1:[0, 1, 2]->[0, 1, 2],segment2:[0, 1, 2]->[1, 2, 3].segment3 则使用原始的 segment ord.
docCounts
int[globalValueCount], 用来记录 ord 对应的 count.
注: 经查询条件过滤后, 有些 ord 可能没有对应 doc.
bucketOrds
稀疏 (value count 多, 但 doc 少) 时使用, 缩减 docCounts size.
LongHash:globalOrd 与 id (size)映射. collect 时在 id 处 ++,build agg 时取出 id 对应的 count.
当父聚合是 BucketAggregator 聚合时, 子聚合只对父的某个 term 聚合, 所以 doc 会减少, 使用 bucketOrds.
注: 按照此逻辑, 如果 query 本身有 term 过滤条件, 也应该启用 bucketOrds(global_ordinals_hash).
相关代码:
- TermsAggregatorFactory.doCreateInternal.
- // 获取 globalValueCount 决定是否 global_ordinals_low_cardinality, global_ordinals_low_cardinality 中又因不是 ValuesSource.Bytes.FieldData, 创建 global_ordinals.
- ValuesSource$WithOrdinals.globalMaxOrd.
- // 通过获取一个 segment 的 globalOrdinals, 触发如果 cache 中没有一个 shardId+field 对应的 globalOrdinals,load 所有 segment ord, 建立 global ords.
- ValuesSource$FieldData.globalOrdinalsValues.
- SortedSetDVOrdinalsIndexFieldData.loadGlobal.
- IndicesFieldDataCache$IndexFieldCache.load
- SortedSetDVOrdinalsIndexFieldData.localGlobalDirect.
- GlobalOrdinalsBuilder.build.
- //globalOrdinals 主要类
- GlobalOrdinalsIndexFieldData.
- MultiDocValues$OrdinalMap
2.3.1: 从 docValues 中读取单个 segment, 指定 field 的 ordinals,term 等.
相关代码:
- SortedSetDVOrdinalsIndexFieldData.load.
- SortedSetDVBytesAtomicFieldData.getOrdinalsValues.
- // 获取 segment 指定 field 的 SortedSetDocValues
- DocValues.getSortedSet.
- // 获取 segment 的 docValuesReader
- SegmentReader.getDocValuesReader.
- // 读取 field 的 SortedDocValues
- Lucene54DocValuesProducer.getSortedSet.
2.3.2: 对多个 segment 的 SortedSetDocValues 排序, 创建 OrdinalMap.
具体为获取每个 segment 的 SortedDocValuesTermsEnum. 使用多个 SortedDocValuesTermsEnum 构建成小顶堆, 合并成一个.
相关代码:
- MultiDocValues$OrdinalMap.build.
- MultiTermsEnum
- TermMergeQueue
- // 获取一个 segment 的 segment ord 到 global ord 的 mapping.
- MultiDocValues$OrdinalMap.getGlobalOrds
2.4: 查询及聚合数据.
相关代码:
QueryPhase.execute.
2.4.1: 根据 Query 创建具体的 weight.
weigth 将用于 query segment, 并创建 scorer.
scorer 将用于评分和 collect.
如果需要评分, 读取 field 的 fst, 查询 term, 定位 postings 将提前到这里执行.
相关代码:
- IndexSearcher.createNormalizedWeight.
- TermQuery.createWeight.
2.4.2: 为每个 leafReader(segment)创建 leafCollector.
创建 LeafBucketCollector, 获取该 segment 的 globalOrds.
globalOrds
如果 segment 的 value count 等于 global value count, 则返回 segment ords(从 dvd 中读取);
如果不等, 则从 OrdinalMap 中获取该 segment 的 GlobalOrdinalMapping, 且该 segment 的 value count 改为获取 global value count.
singleValues
并判断该 field 的 docValues 是否为 singleValues(keyword single ord,text 则为多 term 多 ord).
相关代码:
- // 串行查询及聚合一个分片下的所有 segment.
- IndexSearcher.search.
- IndexSearcher.search.collector.getLeafCollector.
- GlobalOrdinalsStringTermsAggregator.getLeafCollector.
- // 获取指定 segment 的 globalOrdinals, 如果 cache 中没有该 shardId+field 对应的 globalOrdinals,load 所有 segment ord, 建立 global ords.
- ValuesSource$FieldData.globalOrdinalsValues
- // 获取一个 segment 的 global ords.
- GlobalOrdinalsIndexFieldData$Atomic.getOrdinalsValues
- // 提供获取该 segment ord 对应的 global ord, 使用 globalOrd 获取 termBytes 等方法.
- GlobalOrdinalMapping
- //singleValues
- SingletonSortedSetDocValues
2.4.3:query 该 segment, 获取 DocIdSetIterator, 并构造 scorer.
DocIdSetIterator 即查询出的 docId 集合, 对于倒排是 PostingsEnum, 对于数字使用的是 BitSetIterator.
相关代码:
- IndexSearcher.search.weight.bulkScorer.
- Weight.bulkScorer.
- // 构造 bulkScorer.
- TermQuery$TermWeight.scorer.
- // 查询 segment, 获取 TermsEnum, 并根据搜索关键字, 定位 PostingsEnum 位置.
- TermQuery$TermWeight.getTermsEnum.
query segment 流程如下:
1: 根据 field 读取. tip(fst 索引结构, term index)文件, 获取该 field 下所有 term 前缀构造的索引, 并缓存.
FST(Finite State Transducer, 有限状态传感器)其他用途: 阿里对 hbase rowkey 索引定位 block(类似 lucene tip 索引 term),
自然语言处理中一个单词或汉字下一个状态各个状态的概率.
相关代码:
- BlockTreeTermsReader.terms.
- FieldReader.
- //Load a previously saved FST
- FST.
注: 官方 lucene 在 open IndexReader(es recovery shard)时, 就要通过构造 SegmentReader,BlockTreeTermsReader, 构造 FieldReader, 读取 FST.
相关代码:
DirectoryReader.open
2: 从 fst 中查找 term, 如果能找到的 value(fst 正常结束),value 记录了
该 term 前缀对应的 term dict 所在的 block(.tim,term dictionary)位置, 读取该 block, 查找具体的 term, 获取 posting 所在. doc(postings)的位置.
相关代码:
- TermQuery$TermWeight.getTermsEnum.termsEnum.seekExact.
- SegmentTermsEnum.seekExact.
- SegmentTermsEnumFrame.scanToTerm.
- // 根据 termsEnum(已经设置 term)读取 postings.
- TermQuery$TermWeight.scorer.termsEnum.postings.
- SegmentTermsEnum.postings.
- // 根据 termsEnum 中的 term, 设置 postings 在. doc 中位置.
- SegmentTermsEnum.postings.currentFrame.decodeMetaData.
3: 从. doc 中读取 postings, 返回 PostingsEnum(BlockDocsEnum).
相关代码:
Lucene50PostingsReader.postings.
上述流程如下图:
- postings
- (docID, termFreq, positions), (docID, termFreq, positions),.....
- termFreq
term 在该文档出现的次数.
用于对文档频分.
positions
term 在该文档中每次的位置.
用于短语查询时, 多个 term 是否连续出现, 或者小于指定位置.
2.4.4: 遍历 PostingsEnum(过滤 deleted doc), 评分及 collect 数据.
相关代码:
- acceptDocs:getLiveDocs
- IndexSearcher.search.scorer.score.
- BulkScorer.score.
- DefaultBulkScorer.score.
- // 在查询结果中前进到>=target 的 docID, 并返回 docID.
- Lucene50PostingsReader$BlockDocsEnum.advance(target).
- // 遍历 BlockDocsEnum(PostingsEnum)中的查询结果, collect doc.
- DefaultBulkScorer.scoreRange.
- //collect 一个 doc.
- MultiCollector$MultiLeafCollector.collect.
TopScoreDocCollector 对 doc 评分, 并取 topN 的流程如下:
为该 doc 评分, 并基于 score 构建 N 节点的小顶堆, 用于保留 TopN.
相关代码:
TopScoreDocCollector$SimpleTopScoreDocCollector.collect.
1: 根据设置的 Similarity, 使用 BM25 或 TFIDF 等算法为 doc 评分.
BM25,TFIDF 都使用 freq,norms(NumericDocValues), 算法不同, 可能使用的 NumericDocValues 也不同.
相关代码:
- TermScorer.score.
- BM25Similarity$BM25DocScorer.score.
- TFIDFSimilarity$TFIDFSimScorer.score.
- IndexWriterConfig.setSimilarity.
- IndexSearcher.setSimilarity.
- NumericDocValues.
2: 根据 doc 得到的 score 构建 N 节点的小顶堆.
相关代码:
- TopScoreDocCollector$SimpleTopScoreDocCollector.collect.
- PriorityQueue.updateTop/downHeap/insertWithOverflow.
GlobalOrdinalsStringTermsAggregator 统计各 term doc 数的流程如下:
1: 根据 doc 是否为 singleValues, 获取 doc 的 ord 或 ords.
相关代码:
- //singleValues 获取 ord
- singleValues.getOrd(doc).
- // 获取 ords
- // 设置 doc.
- GlobalOrdinalsStringTermsAggregator$LeafBucketCollector.collect.globalOrds.setDocument(doc)
- AbstractRandomAccessOrds.setDocument(doc).
- // 获取 doc 对应的 term 基数.
- GlobalOrdinalsStringTermsAggregator$LeafBucketCollector.collect.globalOrds.cardinality().
- GlobalOrdinalMapping.cardinality().
- // 遍历 doc ords.
- GlobalOrdinalsStringTermsAggregator$LeafBucketCollector.collect.globalOrds.ordAt(i).
- GlobalOrdinalMapping.ordAt(i).
2:docCounts(IntArray)对应的 ord count++.
如果启用 bucketOrds(稀疏处理, 见 2.3), 则将 ord 映射到 bucketOrd,docCounts 的 bucketOrd 位置 count++.
相关代码:
- // 将 ord 对应 count++. 传入 doc, 用于 sub collect.
- GlobalOrdinalsStringTermsAggregator.collectGlobalOrd.
2.4.5: 取 topDocs.TopScoreDocCollector collect 时仅保留 topN. 在此每次取堆顶元素, 得到逆序的 topN.
相关代码:
TopDocsCollector.topDocs.
2.4.6: 根据聚合数据, 按 docCount 取 topN, 排序.
根据 aggregator 的数据, 按 docCount 构建小顶堆.
每次取走堆顶元素, 逆序放入数组, 得到降序的 topN.
设置 termBytes.
相关代码:
- AggregationPhase.execute.
- GlobalOrdinalsStringTermsAggregator.buildAggregation.
- PriorityQueue.updateTop/downHeap/insertWithOverflow.
- // 根据 globalOrd 从所有 segment 中获取第一个含有该 globalOrd 的 segment, 并从该 segment 中读取 term 值 BytesRef.
- GlobalOrdinalMapping.lookupOrd.
3: 协调节点 reduce 各个 shard 返回的结果.
使用各 shard 返回的有序结果, 构造堆, 合并聚合, 合并 TopDocs.
相关代码:
- InitialSearchPhase.onShardResult.
- InitialSearchPhase.onShardFailure.
- //reduce 结果
- FetchSearchPhase.innerRun.resultConsumer.reduce.
- SearchPhaseController.reducedQueryPhase.
- SearchPhaseController.sortDocs.
- //mergeTopDocs
- SearchPhaseController.mergeTopDocs.
- TopDocs.merge.
- TopDocs.mergeAux.
- PriorityQueue.
4:fetch 数据.
协调发送 fecth 请求到相关 shard, 数据节点从 stored field 中 fetch 结果.
相关代码:
FetchSearchPhase.innerRun.
参考:
- source code: Elasticsearch 5.6.12, lucene 6.6.1.
- https://www.elastic.co/blog/lucene-points-6.0
PointRangeQuery:abstract class 竟然可以有构造方法.
来源: https://www.cnblogs.com/vsop/p/12152207.html