目录
- Intro
- Basics
- Configuration
数据模型(Schema)
- Gremlin
- JanusGraph Server
服务部署
- ConfiguredGraphFactory
- Multi-node
- Indexing
Transaction 事务
缓存机制(JanusGraph Cache)
事务日志(Transaction Log)
其他常见问题
技术上的限制(Technical Limitations)
后端存储(Storage Backends)
- Apache Cassandra
- Apache HBase
- InMemory Storage Backend
后端索引(Index Backends)
查询谓语和数据类型
索引参数和全局搜索
- Direct Index Query
- Elasticsearch
高级功能
- Advanced Schema
- Eventually-Consistent Storage Backends
- Failure & Recovery
索引的管理
大规模导入(Bulk Loading)
- Graph Partitioning
- JanusGraph with TinkerPop's Hadoop-Gremlin
注 2: 此手册根据 2018 年 10 月 JanusGraph 的官方文档进行整理, 后续会陆续更新新知识
注 3: 本博文原始地址, 两个博客由同一博主管理.
JanusGraph 作为 Titan 的继承者, 汲取了 Titan 和 TinkerPop 框架诸多的优点, 成为了当前热门的图数据库领域的佼佼者, 然后, JanusGraph 的中文文档却相对匮乏, 成为了 JanusGraph 入门学习者的障碍之一. 本文以 JanusGraph 中文文档为基础, 结合作者个人的思考, 总计了 JanusGraph 中文文档中的关键点, 希望给大家带来帮助.
Intro
JanusGraph is a graph database engine. JanusGraph itself is focused on compact graph serialization, rich graph data modeling, and efficient query execution. In addition, JanusGraph utilizes Hadoop for graph analytics and batch graph processing. JanusGraph implements robust, modular interfaces for data persistence, data indexing, and client access. JanusGraph's modular architecture allows it to interoperate with a wide range of storage, index, and client technologies; it also eases the process of extending JanusGraph to support new ones.
总的来说, JanusGraph 的一大特性是和其他组件的交互能力强, 比如像存储的 Hbase, full-text search 的 Elasticsearch 等, 也可以很方便地和 graphx 等 olap 引擎交互.
Broadly speaking, applications can interact with JanusGraph in two ways:
Embed JanusGraph inside the application executing Gremlin queries directly against the graph within the same JVM. Query execution, JanusGraph's caches, and transaction handling all happen in the same JVM as the application while data retrieval from the storage backend may be local or remote.
Interact with a local or remote JanusGraph instance by submitting Gremlin queries to the server. JanusGraph natively supports the Gremlin Server component of the Apache TinkerPop stack.
应用程序可以通过两种方式使用 JanusGraph,
直接把 JanusGraph 集成到自己的系统中, 在同一个 JVM 中执行 Query
通过将 query submit 到 JanusGraph 的 instance 来执行
架构图:
- Basics
- Configuration
要启动一个 Janus Instance, 必须要提供一个 configuration 文件.
configuration 里面至少要说明 storage backend 是什么, 参考此.
如果需要高级功能(e.g full-text search, geo search, or range queries), 就需要一个 indexing backend.
一个基于 Cassandra+Elasticsearch 的配置例子:
- storage.backend=cql
- storage.hostname=localhost
- index.search.backend=Elasticsearch
- index.search.hostname=100.100.101.1, 100.100.101.2
- index.search.Elasticsearch.client-only=true
数据模型(Schema)
数据的 Schema 是 JanusGraph 的一个很重要的部分, 数据 schema 主要定义在三个元素上: 顶点, 边和属性. 听起来可能有点绕口, schema 是干什么呢? 其实就是去定义顶点的 label, 边的 label 的 Property 的 key 有哪些, 再通俗点讲, 就是有哪些类型的点(比如 god 和 human), 有哪些类型的边(比如 trade 和 friends), 然后点和边的属性列表都有哪些 key(比如 human 有一个 property key 是 name 和 age).
需要注意的几点:
schema 可以显示地定义也不可以不显示地定义, 但还是建议提前定义好.
Schema 可以在数据库使用的过程中更改和进化, 这样并不会影响数据库正常的服务.
Vertex label
vertice label 描述的是 vertice 的语义, 不过 vertice label 是 optional 的, 用来区分不同类型的 vertice, 比如 user 和 product.
利用
makeVertexLabel(String).make()
来创建 vertice label
vertice label 必须保持全局唯一性
下面是个创建 vertex label 的例子:
- mgmt = graph.openManagement()
- person = mgmt.makeVertexLabel('person').make()
- mgmt.commit()
- // Create a labeled vertex
- person = graph.addVertex(label, 'person')
- // Create an unlabeled vertex
- v = graph.addVertex()
- graph.tx().commit()
注意, 需要显示地调用 builder 的 make() 函数来完成 vertex label 的定义.
Edge label
Edge label 主要描述的是 relationship 的语义(比如 friends)
在一个事务中, 用
makeEdgeLabel(String)
来定义一个 edge label, 注意, edge label 必须是唯一的, 这个方法会返回一个 builder, 这个 builder 可以用来获取这种 edge label 的多度关系 multiplicity, 这个指标限定了每对 (pair) 之间 edge 最大的数量.
multiplicity 包含的类型有: multi, simple, many2one, one2many, one2one (结合数据库 E-R model 这个概念不难理解).
默认的 multiplicity 是 MULTI.
下面是创建 edge label 的一个例子:
- mgmt = graph.openManagement()
- follow = mgmt.makeEdgeLabel('follow').multiplicity(MULTI).make()
- mother = mgmt.makeEdgeLabel('mother').multiplicity(MANY2ONE).make()
- mgmt.commit()
注意, 需要显示地调用 builder 的 make() 函数来完成 edge label 的定义.
Property keys
属性 Property 定义在顶点和边上, 当然也可以用来定义其他东西, property 是 key-value 对的形式. 举个例子,'name' = 'John' 这就可以看做一个属性, 它定义了 name 这个属性, 这个属性的 value 是'John'.
通过 makePropertyKey(String) 方法来常见 Property key. 属性名应该尽可能避免使用空格和特殊字符.
属性需要关注两点:
Property Key Data Type
每个属性都有自己的数据类型, 也就是说它的 value 必须是某种支持的数据类型, 可以通过 dataType(Class) 方法来定义数据类型. JanusGraph 会保证加入进来的属性数据都满足该数据类型.
属性数据类型可以申明为 Object.class, 但这并不推荐, 最好控制好数据类型, 一来可以节省空间, 二来可以让 JanusGraph 来帮我们检查导入的数据类型是不是符合要求.
数据类型必须使用具体的类, 不能是接口或者抽象类. JanusGraph 使用的是完全相当去判断数据类型是否符合, 所以使用子类的数据去导入到父类的属性中也是不会成功的.
Property Key Cardinality
用 cardinality(Cardinality) 方法来定义某一个顶点的某个属性的基底(cardinality).
基底有三种情况:
SINGLE: 每一个元素对于这个 key 只能有一个 value, 比如 birthDate 就是一个 single 基底, 因为每个人最多只能有一个生日.
LIST: 每个元素对于这个 key 可以有任意数量的值, 比如我们建模传感器(sensor), 其有一个属性是 sensorRecords, 那么, 对于这个属性, 可能有一系列的值. 注意, LIST 是允许有重复元素的.
SET: 与 LIST 相同, 不同的是, SET 不允许有重复的元素.
默认的 cardinality 是 single. 注意, 对于边属性来说, property key 的基底始终是 single.
下面是在定义 property key 的时候定义对应的 cardinality 的例子:
- mgmt = graph.openManagement()
- birthDate = mgmt.makePropertyKey('birthDate').dataType(Long.class).cardinality(Cardinality.SINGLE).make()
- name = mgmt.makePropertyKey('name').dataType(String.class).cardinality(Cardinality.SET).make()
- sensorReading = mgmt.makePropertyKey('sensorReading').dataType(Double.class).cardinality(Cardinality.LIST).make()
- mgmt.commit()
- Relation types
Edge label 和 property key 共同地被称为 relation type.
可以通过 containsRelationType() 方法来检测 relation type 是否存在.
下面是个例子:
- mgmt = graph.openManagement()
- if (mgmt.containsRelationType('name'))
- name = mgmt.getPropertyKey('name')
- mgmt.getRelationTypes(EdgeLabel.class)
- mgmt.commit()
改变 schema 的元素(Changing Schema Elements)
一旦我们创建好 Vertex label, edge label 和 property key 后, 就不能更改了, 我们唯一能改的是 schema 的名字, 比如下面这个例子:
- mgmt = graph.openManagement()
- place = mgmt.getPropertyKey('place')
- mgmt.changeName(place, 'location')
- mgmt.commit()
上面这个例子中, 我们把 property key 的名字从 place 改成了 location.
Schema Constraints
多个 property 可以绑定到同一个 vertex 或者 edge 上面, 用 JanusGraphManagement.addProperties(VertexLabel, PropertyKey...) 方法:
- // 例子 1
- mgmt = graph.openManagement()
- person = mgmt.makeVertexLabel('person').make()
- name = mgmt.makePropertyKey('name').dataType(String.class).cardinality(Cardinality.SET).make()
- birthDate = mgmt.makePropertyKey('birthDate').dataType(Long.class).cardinality(Cardinality.SINGLE).make()
- mgmt.addProperties(person, name, birthDate)
- mgmt.commit()
- // 例子 2
- mgmt = graph.openManagement()
- follow = mgmt.makeEdgeLabel('follow').multiplicity(MULTI).make()
- name = mgmt.makePropertyKey('name').dataType(String.class).cardinality(Cardinality.SET).make()
- mgmt.addProperties(follow, name)
- mgmt.commit()
有一种很通俗的定义关系的方法:
- mgmt = graph.openManagement()
- person = mgmt.makeVertexLabel('person').make()
- company = mgmt.makeVertexLabel('company').make()
- works = mgmt.makeEdgeLabel('works').multiplicity(MULTI).make()
- mgmt.addConnection(works, person, company)
- mgmt.commit()
这里, 用到是 addConnection() 方法.
Gremlin
Gremlin 是 JanusGraph 默认的 query language.
Gremlin 用来从 JanusGraph 里面查询数据, 修改数据, Gremlin 是一种 traversal 查询语言, 可以很方便地查询此类查询:
从小明开始, 遍历到他的父亲, 再从他的父亲遍历到他父亲的父亲, 然后返回他父亲的父亲的名字
Gremlin 是 Apache TinkerPop 项目的一部分.
关于 Gremlin 这里就不多做介绍了, 汪文星的文档里做了很详细的说明.
更多关于 JanusGraph 中 Gremlin 相关的说明, 参考这个文档 https://docs.janusgraph.org/latest/gremlin.html .
JanusGraph Server
JanusGrpah Server 其实就是 Gremlin server
两种交互方式: 分别是 webSocket 和 HTTP
使用方式
使用预先打好的包
这种方式主要是用来学习使用 JanusGraph, 生产环境下不建议用这种配置.
./bin/janusgraph.sh start
就可以启动了, 会自动 fork cassandra 的包, Elasticsearch 的包, gremlin-server 的包, 并连接到对应的服务器
- $ bin/janusgraph.sh start
- Forking Cassandra...
- Running `nodetool statusthrift`.. OK (returned exit status 0 and printed string "running").
- Forking Elasticsearch...
- Connecting to Elasticsearch (127.0.0.1:9300)... OK (connected to 127.0.0.1:9300).
- Forking Gremlin-Server...
- Connecting to Gremlin-Server (127.0.0.1:8182)... OK (connected to 127.0.0.1:8182).
- Run gremlin.sh to connect.
使用完毕后关系 JanusGraph, 使用
./bin/janusgraph.sh stop
命令就可以了
- $ bin/janusgraph.sh stop
- Killing Gremlin-Server (pid 59687)...
- Killing Elasticsearch (pid 59494)...
- Killing Cassandra (pid 58809)...
使用 WebSocket 的方式
上面 pre-package 的方式其实是使用的内置的 backend 和 index backend(在这个例子里面, 分别是 cassandra 和 Elasticsearch), 其实我们也可以把 JanusGraph 连接到自己的 HBase 等 Backend.
使用 Http 的方式
配置方式和使用 WebSocket 的方式基本相同, 可以用 Curl 命令来 test 服务的可用性:
curl -XPOST -Hcontent-type:application/JSON -d '{"gremlin":"g.V().count()"}' http://[IP for JanusGraph server host]:8182
同时使用 WebSocket 和 Http 的方式
修改 gremlin-server.YAML 文件, 更改 channelizer 为:
channelizer: org.apache.tinkerpop.gremlin.server.channel.WsAndHttpChannelizer
一些高级用法
还有一些高级用法, 比如认证(Authentication over HTTP), 具体这里就不细说了, 可以参考官方文档 https://docs.janusgraph.org/latest/server.html .
服务部署
JanusGraph Server 本身是个服务器服务, 这个服务背后很多 Backends(es, Hbase 等等), 客户端应用 (application) 主要通过 Gremlin 查询语句和 JanusGraph instance 交互, JanusGraph instance 交互然后和对应的后端交互来执行对应的 query.
没有 master JanusGraph server 的概念, application 可以连接任何 JanusGraph instance, 当然, 也可以用负载均衡的方法来分流到不同的 JanusGraph instance.
各个 JanusGraph instance 之间并不相互交互.
部署方式:
方式 1: 每个 server 上起一个 JanusGraph Instance, 并在同一个 server 起对应的 backend 和 index
方式 2:JanusServe 和 backend server, index server 分离
方法 3: 直接 embedded 到客户端 appplication 中, 相当于引入了一个 jar 包
ConfiguredGraphFactory
这个概念比较难以理解.
ConfiguredGraphFactory 是一个 singleton, 和 JanusGraphFactory 一样.
它们提供了一套 API(methods, 方法)来动态地操作服务器上的图.
在 gremlin-console 下我们可以直接用这个接口去操作图, 如下:
gremlin> :remote connect tinkerpop.server conf/remote.YAML
- ==>Configured localhost/127.0.0.1:8182
- gremlin> :remote console
- ==>All scripts will now be sent to Gremlin Server - [localhost/127.0.0.1:8182] - type ':remote console' to return to local mode
- gremlin> ConfiguredGraphFactory.open("graph1")
- ==>standardjanusgraph[cassandrathrift:[127.0.0.1]]
- gremlin> graph1
- ==>standardjanusgraph[cassandrathrift:[127.0.0.1]]
- gremlin> graph1_traversal
- ==>graphtraversalsource[standardjanusgraph[cassandrathrift:[127.0.0.1]], standard]
先来谈一谈 JanusGraphFactory, 它是我们在 gremlin-console 里面操作一个图的时候的 entry-point, 每当我们访问一个图的时候, 系统就为我们创建了一个 Configuration 类的实例.
可以将这个东西和 spark-shell 里面的 sparkSession 做类比来方便理解.
ConfiguredGraphFactory 不太一样, 它也是我们访问, 操作一个图的时候的 entry-point, 但配置是通过另一个 singleton 来实现的, 叫 ConfigurationManagementGraph.
ConfigurationManagementGraph 使我们可以很方便地管理图的配置.
就像上面例子一样, 我们可以通过下面两种方法来访问一个图:
ConfiguredGraphFactory.create("graphName")
或者
ConfiguredGraphFactory.open("graphName")
可以通过下面的方法来罗列所有配置好了的图. 配置好是指之前有用 ConfigurationManagementGraph 的 API 配置过:
ConfiguredGraphFactory.getGraphNames()
用下面的放来来 drop 一个 graph database:
ConfiguredGraphFactory.drop("graphName")
如果想使用 ConfiguredGraphFactory 这个接口, 比如在启动前 JanusGraph server 前配置好. 修改 gremlin-server.YAML 文件, 在 graphs 这个 section 下面, 添加一行:
- graphs: {
- ConfigurationManagementGraph: conf/JanusGraph-configurationmanagement.properties
- }
在这个例子中, ConfigurationManagementGraph 这个 graph 便是使用位于 onf/JanusGraph-configurationmanagement.properties 下的配置文件来配置, 下面是配置文件的一个例子:
- gremlin.graph=org.janusgraph.core.JanusGraphFactory
- storage.backend=cql
- graph.graphname=ConfigurationManagementGraph
- storage.hostname=127.0.0.1
具体 ConfigurationManagementGraph 怎么用呢? 下面是一个例子(在 gremlin-console 下):
- map = new HashMap<String, Object>();
- map.put("storage.backend", "cql");
- map.put("storage.hostname", "127.0.0.1");
- map.put("graph.graphname", "graph1");
- ConfiguredGraphFactory.createConfiguration(new MapConfiguration(map));
- // then access the graph
- ConfiguredGraphFactory.open("graph1");
graph.graphname 这个属性指定了上述配置是针对哪张 graph 的.
Multi-node
如果我们希望在 JanusGraph server 启动后去动态地创建图, 就要用到上面章节提到的
ConfiguredGraphFactory
, 这要求 JanusGraph 集群的每个 server 的配置文件里面都声明了使用 JanusGraphManager 和
ConfigurationManagementGraph
, 具体方法, 见这个链接.
为了保证 graph 的 consistency, 每个 server node 都要使用
- ConfiguredGraphFactory
- (保持集群上每个 node server 的配置一致性).
- Indexing
支持两类 graph indexing: Graph Index 和 Vertex-centric Index.
graph index 包含两类: Composite Index 和 Mixed Index.
Composite Index
通过下面的方法创建 Composite Index:
- graph.tx().rollback() //Never create new indexes while a transaction is active
- mgmt = graph.openManagement()
- name = mgmt.getPropertyKey('name')
- age = mgmt.getPropertyKey('age')
- mgmt.buildIndex('byNameComposite', Vertex.class).addKey(name).buildCompositeIndex()
- mgmt.buildIndex('byNameAndAgeComposite', Vertex.class).addKey(name).addKey(age).buildCompositeIndex()
- mgmt.commit()
- //Wait for the index to become available
- ManagementSystem.awaitGraphIndexStatus(graph, 'byNameComposite').call()
- ManagementSystem.awaitGraphIndexStatus(graph, 'byNameAndAgeComposite').call()
- //Reindex the existing data
- mgmt = graph.openManagement()
- mgmt.updateIndex(mgmt.getGraphIndex("byNameComposite"), SchemaAction.REINDEX).get()
- mgmt.updateIndex(mgmt.getGraphIndex("byNameAndAgeComposite"), SchemaAction.REINDEX).get()
- mgmt.commit()
Composite Index 方式不需要特殊地去配置底层的存储引擎(比如 cassandra), 主要的底层存储引擎都支持这种方式.
通过 Composite Index 可以来保证 Property key 的唯一性, 用下面这种方法:
- graph.tx().rollback() //Never create new indexes while a transaction is active
- mgmt = graph.openManagement()
- name = mgmt.getPropertyKey('name')
- mgmt.buildIndex('byNameUnique', Vertex.class).addKey(name).unique().buildCompositeIndex()
- mgmt.commit()
- //Wait for the index to become available
- ManagementSystem.awaitGraphIndexStatus(graph, 'byNameUnique').call()
- //Reindex the existing data
- mgmt = graph.openManagement()
- mgmt.updateIndex(mgmt.getGraphIndex("byNameUnique"), SchemaAction.REINDEX).get()
- mgmt.commit()
通过下面的方式来创建 Mixed Index:
- graph.tx().rollback() //Never create new indexes while a transaction is active
- mgmt = graph.openManagement()
- name = mgmt.getPropertyKey('name')
- age = mgmt.getPropertyKey('age')
- mgmt.buildIndex('nameAndAge', Vertex.class).addKey(name).addKey(age).buildMixedIndex("search")
- mgmt.commit()
- //Wait for the index to become available
- ManagementSystem.awaitGraphIndexStatus(graph, 'nameAndAge').call()
- //Reindex the existing data
- mgmt = graph.openManagement()
- mgmt.updateIndex(mgmt.getGraphIndex("nameAndAge"), SchemaAction.REINDEX).get()
- mgmt.commit()
Mixed Index 方式需要特殊地去配置底层的存储引擎 (比如 cassandra) 的索引.
Mixed Index 比 Composite Index 更加灵活, 但是对于含有相等关系的谓语关系的查询效率更慢.
buildMixedIndex 方法的参数 string 要和 properties 文件中配置的一致, 比如:
index.search.backend
这个配置中间的部分 search 就是 buildMixedIndex 方法的参数.
有了 Mixed Index, 这面这些 query 就支持用索引来加速了:
- g.V().has('name', textContains('hercules')).has('age', inside(20, 50))
- g.V().has('name', textContains('hercules'))
- g.V().has('age', lt(50))
- g.V().has('age', outside(20, 50))
- g.V().has('age', lt(50).or(gte(60)))
- g.V().or(__.has('name', textContains('hercules')), __.has('age', inside(20, 50)))
总结两种 Graph Index 的区别:
Use a composite index for exact match index retrievals. Composite indexes do not require configuring or operating an external index system and are often significantly faster than mixed indexes.
a. As an exception, use a mixed index for exact matches when the number of distinct values for query constraint is relatively small or if one value is expected to be associated with many elements in the graph (i.e. in case of low selectivity).
- Use a mixed indexes for numeric range, full-text or geo-spatial indexing. Also, using a mixed index can speed up the order().by() queries.
- Vertex-centric Indexes
- graph.tx().rollback() //Never create new indexes while a transaction is active
- mgmt = graph.openManagement()
- time = mgmt.getPropertyKey('time')
- rating = mgmt.makePropertyKey('rating').dataType(Double.class).make()
- battled = mgmt.getEdgeLabel('battled')
- mgmt.buildEdgeIndex(battled, 'battlesByRatingAndTime', Direction.OUT, Order.decr, rating, time)
- mgmt.commit()
- //Wait for the index to become available
- ManagementSystem.awaitRelationIndexStatus(graph, 'battlesByRatingAndTime', 'battled').call()
- //Reindex the existing data
- mgmt = graph.openManagement()
- mgmt.updateIndex(mgmt.getRelationIndex(battled, 'battlesByRatingAndTime'), SchemaAction.REINDEX).get()
- mgmt.commit()
注意上面的 Index 是有顺序的, 先对 rating 做 index, 再对 time 做 index, 因此:
- h = g.V().has('name', 'hercules').next()
- g.V(h).outE('battled').property('rating', 5.0) //Add some rating properties
- g.V(h).outE('battled').has('rating', gt(3.0)).inV() // query-1
- g.V(h).outE('battled').has('rating', 5.0).has('time', inside(10, 50)).inV() // query-2
- g.V(h).outE('battled').has('time', inside(10, 50)).inV() // query-3
上述 3 个 query 中, 前两个被加速了, 但第三没并没有.
JanusGraph automatically builds vertex-centric indexes per edge label and property key. That means, even with thousands of incident battled edges, queries like g.V(h).out('mother') or g.V(h).values('age') are efficiently answered by the local index.
Transaction 事务
JanusGraph 具有事务安全性, 可以在多个并行的线程中同时使用.
通常, 使用:
graph.V(...) and graph.tx().commit()
这样的 ThreadLocal 接口来执行一次事务.
事务处理的一个例子
根据 TinkerPop 框架事务机制的描述, 每一个线程在它执行第一个操作的时候开启一个事务:
- graph = JanusGraphFactory.open("berkeleyje:/tmp/janusgraph")
- juno = graph.addVertex() //Automatically opens a new transaction
- juno.property("name", "juno")
- graph.tx().commit() //Commits transaction
在上面这个例子中, addVertex()函数执行的时候, 事务被开启, 然后当我们显示执行 graph.tx().commit()的时候, 事务关闭.
当事务还没有完成, 却调用了 graph.close()关闭了数据库, 那么这个事务的后期行为是不得而知的, 一般情况下, 事务应该会被回滚. 但执行 close()的线程所对应的事务会被顺利地 commit.
事务处理范围(scope)
根据 TinkerPop 框架事务机制的描述, 每一个线程在它执行第一个操作的时候开启一个事务, 所有的图操作元素 (顶点, 边, 类型等变量等) 均和该事务自动绑定了, 当我们使用 commit()或者 rollback()关闭 / 回滚一个事务的时候, 这些图操作元素就会失效, 但是, 顶点和类型会被自动地转移到下一个事务中, 如下面的例子:
- graph = JanusGraphFactory.open("berkeleyje:/tmp/janusgraph")
- juno = graph.addVertex() //Automatically opens a new transaction
- graph.tx().commit() //Ends transaction
- juno.property("name", "juno") //Vertex is automatically transitioned
但是, 边不能自动转移到下一个事务中, 需要显式地刷新, 如下面的例子:
- e = juno.addEdge("knows", graph.addVertex())
- graph.tx().commit() //Ends transaction
- e = g.E(e).next() //Need to refresh edge
- e.property("time", 99)
事务失败(failure)
当我们在创建一个事务, 并做一系列操作的时候, 应该事先考虑到事务失败的可能性(IO exceptions, network errors, machine crashes or resource unavailability...), 所以推荐用下面的方式处理异常:
- try {
- if (g.V().has("name", name).iterator().hasNext())
- throw new IllegalArgumentException("Username already taken:" + name)
- user = graph.addVertex()
- user.property("name", name)
- graph.tx().commit()
- } catch (Exception e) {
- //Recover, retry, or return error message
- println(e.getMessage())
- }
上面的例子描述了一个注册功能, 先检查名字存不存在, 如果不存在, 则创建该 user, 然后 commit.
如果事务失败了, 会抛出 JanusGraphException 异常, 事务失败有很多种可能, JanusGraph 区分了两种常见的 failure:
potentially temporary failure
potentially temporary failure 主要与是 IO 异常 (IO hiccups (e.g. network timeouts)) 和资源可用情况 (resource unavailability) 有关.
JanusGrpah 会自动去尝试从 temporary failure 中恢复, 重新尝试 commit 事务, retry 的次数可以配置 https://docs.janusgraph.org/latest/config-ref.html .
permanent failure
permanent failure 主要与完全的连接失败, 硬件故障和锁挣脱有关(complete connection loss, hardware failure or lock contention).
锁的争夺, 比如两个人同时同时以 "juno" 的用户名去注册, 其中一个事务必然失败. 根据事务的语义, 可以通过重试失败的事务来尝试从锁争夺中恢复(比如使用另一个用户名).
一般有两种典型的情形
PermanentLockingException(Local lock contention)
: 另一个线程已经被赋予了竞争锁
- PermanentLockingException(Expected value mismatch for X: expected=Y vs actual=Z):
- Tips: The verification that the value read in this transaction is the same as the one in the datastore after applying for the lock failed. In other words, another transaction modified the value after it had been read and modified.
多线程事务(Multi-Threaded Transactions)
多线程事务指的是同一个事务可以充分利用机器的多核架构来多线程执行, 见 TinkerPop 关于 threaded transaction 的描述.
可以通过 createThreadedTx()方法来创建一个线程独立的事务:
- threadedGraph = graph.tx().createThreadedTx(); // Create a threaded transaction
- threads = new Thread[10];
- for (int i=0; i<threads.length; i++) {
- threads[i]=new Thread({
- println("Do something with'threadedGraph''");
- });
- threads[i].start();
- }
- for (int i=0; i<threads.length; i++) threads[i].join();
- threadedGraph.tx().commit();
createThreadedTx()方法返回了一个新的 Graph 对象, tx()对象在创建线程的时候, 并没有为每个线程创建一个事务, 也就是说, 所有的线程都运行在同一个事务中, 这样我们就实现了 threaded transaction.
JanusGraph 可以支持数百个线程运行在同一个事务中.
通过 createThreadedTx()接口可以很轻松的创建并行算法(Concurrent Algorithms), 尤其是那些适用于并行计算的算法.
createThreadedTx()接口的另一个应用是创建嵌套式的事务(Nested Transactions), 具体的例子见这里 https://docs.janusgraph.org/latest/tx.html , 这对于那些 long-time running 的事务尤其有作用.
事务处理的一些常见问题
再次强调一下, JanusGraph 的逻辑是, 当我们对一个 graph 进行第一次操作的时候, 事务就自动被打开了, 我们并不需要去手动的创建一个事务, 除非我们希望创建一个 multi-threaded transaction.
每个事务都要显式地用 commit()和 rollback()方法去关闭.
一个事务在开始的时候, 就会去维持它的状态, 在多线程应用的环境中, 可能会出现不可预知的问题, 比如下面这个例子:
- v = g.V(4).next() // Retrieve vertex, first action automatically starts transaction
- g.V(v).bothE()
- >> returns nothing, v has no edges
- //thread is idle for a few seconds, another thread adds edges to v
- g.V(v).bothE()
- >> still returns nothing because the transactional state from the beginning is maintained
这种情况在客户端应用端很常见, server 会维持多个线程去相应服务器的请求. 比较好的一个习惯是, 在没完成一部分工作后, 就去显式地 terminate 任务, 如下面这个例子:
- v = g.V(4).next() // Retrieve vertex, first action automatically starts transaction
- g.V(v).bothE()
- graph.tx().commit()
- //thread is idle for a few seconds, another thread adds edges to v
- g.V(v).bothE()
- >> returns the newly added edge
- graph.tx().commit()
多线程事务 (Multi-Threaded Transactions) 还可以通过 newTransaction()方法来创建, 但要注意的是, 在该事务中创建的点和边只在该事务中可用, 事务被关闭以后, 访问这些点和边就会抛出异常, 如果还想使用这些点和边怎么办? 答案是显式地去刷新这些点和边, 如下面这个例子:
- g.V(existingVertex)
- g.E(existingEdge)
事务的相关配置(configuration)
JanusGraph.buildTransaction() 也可以启动一个多线程的事务, 因此它和上面提到的 newTransaction()方法其实是一样的功能, 只不过 buildTransaction()方法提供了附加的配置选项.
buildTransaction()会返回一个 TransactionBuilder 实例, TransactionBuilder 实例可以配置选项, 这里就详述了.
配置好后, 接着可以调用 start()方法来启动一个线程, 这样会返回一个 JanusGraphTransaction 实例.
缓存机制(JanusGraph Cache)
Transaction-level caching
通过 graph.buildTransaction().setVertexCacheSize(int)可以来设置事务的缓存大小(cache.tx-cache-size).
当一个事务被打开的时候, 维持了两类缓存:
- Vertex cache
- Index cache
- Vertex cache
Vertex cache 主要包含了事务中使用到的点, 以及这些点的邻接点列表 (adjacency list) 的一个子集, 这个子集包括了在同一个事务中被取出的该点的邻接点. 所以, heap 中 vertex cache 使用的空间不仅与事务中存在的顶点数有关, 还与这些点的邻接点数目有关.
Vertex cache 中能维持的最多的顶点数等于 transaction cache size.Vertex cache 能显著地提高 iteractive traversal 的速度. 当然, 如果同样的 vertex 在后面并没有被重新访问, 那 vertex cache 就没有起到作用.
Index cache
如果在一个事务中, 前面部分的 query 用到了某个索引(index), 那么后面使用这个 query 的时候, 获取结果的速度会大大加快, 这就是 index cache 的作用, 当然, 如果后面并没有再使用相同的 index 和相同的 query, 那 Index query 的作用也就没有体现了.
Index cache 中的每个条目都会被赋予一个权重, 这个权重等与 2 + result set size, 整个 index cache 的权重不会超过 transaction cache size 的一半.
Database-level caching
Database-level caching 实现了多个 transaction 之间 cache 的共享, 从空间利用率上来说, database-lvel caching 更经济, 但访问速度比 transaction-level 稍慢.
Database-level cache 不会在一个事务结束后马上失效, 这使得处于多个事务中的读操作速度显著地提高.
Cache Expiration Time
Cache expiration time 通过 cache.db-cache-time (单位为: milliseconds)参数来调整.
这里有一个 trick, 如果我们只启动了一个 JanusGraph instance, 因为没有另一个 instance 去改变玩我们的图(graph),cache expiration time 便可以设置为 0, 这样的话, cache 就会无限期保留处于 cache 中的元素(除非因为空间不足被顶替).
如果我们启动了多个 JanusGraph 实例, 这个时间应该被设置成当一个 instance 修改了图, 另一个 instance 能够看到修改所需要等待的最大时间. 如果希望修改被其他的 instance 迅速能够看到, 那么应该停止使用 Database-level cache. 允许的时延越长, cache 的效率越高.
当然, 一个 JanusGraph instance 始终能够马上看到它自己做出的修改.
Cache Size
Cache size 通过 cache.db-cache-size 参数来控制 Database-level 的 cache 能使用多少 heap space,cache 越大, 效率越高, 但由此带来的 GC 性能问题却不容小觑.
cache size 也可以配置成百分比(占整个剩余的 heap 空间的比例).
注意 cache size 的配置是排他性的, 也就是说 cache 会独占你配置的空间, 其他资源 (e.g. Gremlin Server, embedded Cassandra, etc) 也需要 heap spave, 所以不要把 cache size 配置得太过分, 否则可能会引起 out of memory 错误或者 GC 问题.
Clean Up Wait Time
还有一个需要注意的参数是 cache.db-cache-clean-wait, 当一个顶点被修改后, 所有与该顶点有关的 Database-level 的 cache 都会失效并且会被驱逐. JanusGraph 会从底层的 storage backend 重新 fetch 新的顶点数据, 并 re-populate 缓存.
cache.db-cache-clean-wait 这个参数可以控制, cache 会等待 cache.db-cache-clean-wait milliseconds 时间再 repopulate 新的 cache.
Storage Backend Caching
底层的 storage backend 也会有自己的 cache 策略, 这个就要参考对应的底层存储的文档了.
事务日志(Transaction Log)
可以启动记录事务的变化日志, 日志可以用来在后期进行处理, 或者作为一个 record 备份.
在启动一个事务的时候指定是否需要采集日志:
- tx = graph.buildTransaction().logIdentifier('addedPerson').start()
- u = tx.addVertex(label, 'human')
- u.property('name', 'proteros')
- u.property('age', 36)
- tx.commit()
这里有个 log processor 的概念, 其实就是内置的日志监听处理器. 例如, 可以统计在一个 transaction 里面, 某一个 label 下被添加的顶点的数目.
其他常见问题
Accidental type creation
默认地, 当遇到一个新的 type 的时候, janusGrpah 会自动地去创建 property keys 和边的 label. 对于 schema 这个问题, 还是建议用户自己去定义 schema, 而不要使用自动发现 schema 的方式, 可以在配置文件里面如下申明关闭自动 infer schema 的功能:
schema.default = none
创建 type 的操作也不宜放在不同的线程中, 因为这会引起不可知的后果, 最好放到一个 batch 操作中把所有的 type 都事先创建好, 然后再去做其他的图操作.
Custom Class Datatype
可以自己去创建 class, 作为 value 的类别.
Transactional Scope for Edges
边应该先取出来, 再操作, 每个 transaction 都是有 scope 的, 如果超出了这个 scope, 去访问之前的边, 会报错.
Ghost Vertices
这个概念比较新奇, 指的是: 我们在一个事务中删除了一个 vertex, 却同时在另一个 transaction 中修改了它. 这种情况下, 这个 vertex 还是会依然存在的, 我们称这种 vertex 为 ghost vertex.
对于这个问题的解决办法最好是暂时允许 ghost vertices, 然后定期地写脚本去删除它们.
Debug-level Logging Slows Execution
Log level 如果被设置成了 DEBUG, 输出可能会很大, 日志中会包括一个 query 如何被编译, 优化和执行的过程, 这会显著地影响处理的性能, 在生产环境下, 不建议使用 DEBUG level, 建议是用 INFO level.
Elasticsearch OutOfMemoryException
当有很多客户端连接到 Elasticsearch 的时候, 可能会报 OutOfMemoryException 异常, 通常这不是一个内存溢出的问题, 而是 OS 不允许运行 ES 的用户起太多的进程.
可以通过调整运行 es 的用户可运行的进程数 (number of allowed processes) 也许可以解决这个问题.
Dropping a Database
删除一个 graph instance, 可以使用:
JanusGraphFactory.drop(graph)
接口
- graph = JanusGraphFactory.open('path/to/configuration.properties')
- JanusGraphFactory.drop(graph);
- ConfiguredGraphFactory
接口
- graph = ConfiguredGraphFactory.open('example')
- ConfiguredGraphFactory.drop('example');
- Note:
0.3.0 以前的版本除了需要执行上面的命令, 还需要显示地调用 JanusGraphFactory.close(graph)和 ConfiguredGraphFactory.close("example")来关闭这个 graph, 以防 cache 中还存在这个 graph, 造成错误.
技术上的限制(Technical Limitations)
这个部分可以理解 JanusGrpah 还存在的一些可改进或者无法改进的地方.
设计上的限制
下面这些缺陷是 JanusGraph 天然设计上的一些缺陷, 这些缺陷短期内是得不到解决的.
- Size Limitation
- JanusGraph can store up to a quintillion edges (2^60) and half as many vertices. That limitation is imposed by JanusGraph's id scheme.
- DataType Definitions
当我们用 dataType(Class) 接口去定义 property key 的数据类型的时候, JanusGraph 会 enforce 该 key 的所有属性都严格是该类型. 这是严格的类型判断, 子类也不可行. 比如, 如果我们定义的数据类型是 Number.class, 使用的却是 Integer 或者 Long 型, 这种情况大多数情况下会报错.
用 Object.class 或许可以解决这个问题, 比较灵活, 但带来的问题也显而易见, 那就是性能上会降低, 同时数据类型的 check 也会失效.
Type Definitions cannot be changed
Edge label, vertex label 和 property key 一旦创建就不能改变了, 当然, type 可以被重命名, 新的类型也可以在 runtime 中创建, 所以 schema 是支持 envolving 的.
保留的关键字(Reserved Keywords)
下面是 JanusGraph 保留的关键字, 不要使用这些关键字作为变量名, 函数名等.
- vertex
- element
- edge
- property
- label
- key
临时的缺陷
下面这些缺陷在将来的 release 中会被逐渐解决.
Limited Mixed Index Support
Mixed Index 只支持 JanusGraph 所支持的数据类型 (Data type) 的一个子集, Mixed Index 目前也不支持 SET 和 LIST 作为基数 (cardinality) 的 property key.
Batch Loading Speed
可以通过下面的 configuration 来开启 batch loading mode:
Name | Description | Datatype | Default Value | Mutability |
---|---|---|---|---|
storage.batch-loading | Whether to enable batch loading into the storage backend | Boolean | false | LOCAL |
这个 trick 其实并没有使用底层的 backend 的 batch loading 技术.
另一个限制是, 如果同时向一个顶点导入数百万条边, 这种情况下很可能 failure, 我们称这种 loading 为 supernode loading, 这种 loading 之所以失败是受到了后端存储的限制, 具体这里就不细数了.
后端存储(Storage Backends)
Apache Cassandra
JanusGraph 后端通过四种方式来支持整合 Cassandra:
- cql - CQL based driver (推荐使用)
- cassandrathrift - JanusGraph's Thrift connection pool driver (v2.3 以后退休了, 也不建议使用)
cassandra - Astyanax driver. The Astyanax project is retired.
- embeddedcassandra - Embedded driver for running Cassandra and JanusGraph within the same JVM(测试可以, 但生产环境不建议使用这种方式)
- Local Server Mode
Cassandra 可以作为一个 standalone 的数据库与 JanusGraph 一样运行在 localhost, 在这种情况下, JanusGraph 和 Cassandra 之间通过 socket 通信.
通过下面的步骤配置 JanusGraph on Cassandra:
- Download Cassandra http://cassandra.apache.org/download/ , unpack it, and set filesystem paths in conf/cassandra.YAML and
- conf/log4j-server.properties
Connecting Gremlin Server to Cassandra using the default configuration files provided in the pre-packaged distribution requires that Cassandra Thrift is enabled. To enable Cassandra Thrift open conf/cassandra.YAML and update start_rpc: false to start_rpc: true. If Cassandra is already running Thrift can be started manually with bin/nodetool enablethrift. the Thrift status can be verified with bin/nodetool statusthrift.
- Start Cassandra by invoking bin/cassandra -f on the command line in the directory where Cassandra was unpacked. Read output to check that Cassandra started successfully.
- Now, you can create a Cassandra JanusGraph as follows
- JanusGraph g = JanusGraphFactory.build().
- set("storage.backend", "cql").
- set("storage.hostname", "127.0.0.1").
- open();
- Local Container Mode
通过 docker 安装 Cassandra, 去 https://github.com/JanusGraph/janusgraph/releases 界面看一下 JanusGraph 版本测试通过的 Cassandra 版本, 使用下面的 docker 命令运行 Cassandra:
docker run --name jg-cassandra -d -e CASSANDRA_START_RPC=true -p 9160:9160 -p 9042:9042 -p 7199:7199 -p 7001:7001 -p 7000:7000 cassandra:3.11
Note: Port 9160 is used for the Thrift client API. Port 9042 is for CQL native clients. Ports 7000, 7001 and 7099 are for inter-node communication.
Remote Server Mode
集群模式下, 有一个 Cassandra 集群, 然后所有的 JanusGraph 的 instance 通过 socket 的方式去读写 Cassandra 集群, 客户端应用程序也可以和 JanusGraph 的实例运行在同一个 JVM 中.
举个例子, 我们启动了一个 Cassandra 的集群, 其中一个机器的 IP 是 77.77.77.77, 我们可以通过以下方式连接:
- JanusGraph graph = JanusGraphFactory.build().
- set("storage.backend", "cql").
- set("storage.hostname", "77.77.77.77").
- open();
- Remote Server Mode with Gremlin Server
可以在 JanusGraph server 外面包上一层 Gremlin server, 这样, 不仅可以和 JanusGraph server 交互, 也可以和 Gremlin server 交互.
通过:
bin/gremlin-server.sh
启动 Gremlin server, 然后通过 bin/gremlin.sh 打开 Gremlin 的终端, 然后运行:
:plugin use tinkerpop.server
:remote connect tinkerpop.server conf/remote.YAML
:> g.addV()
便可以了.
JanusGraph Embedded Mode
Cassandra 也可以整合到 JanusGraph 中, 在这种情况下, JanusGraph 和 Cassandra 运行在同一个 JVM 中, 本次通过进程间通信而不是网络传输信息, 这种情况通过在 performance 上有很大的帮助.
如果想使用 Cassandra 的 embedded mode, 需要配置 embeddedcassandra 作为存储后端.
这种 embedded 模式推荐通过 Gremlin server 来暴露 JanusGraph.
需要注意的是, embedded 方式需要 GC 调优.
- Apache HBase
- Local Server Mode
从此处 http://www.apache.org/dyn/closer.cgi/hbase/stable/ 下载一个 HBase 的 stable release.
Start HBase by invoking the start-hbase.sh script in the bin directory inside the extracted HBase directory. To stop HBase, use stop-hbase.sh.
然后通过:
- JanusGraph graph = JanusGraphFactory.build()
- .set("storage.backend", "hbase")
- .open();
连接到 HBase.
Remote Server Mode
集群模式, JanusGraph 的实例通过 socket 与 HBase 建立连接, 并进行读写操作.
假设我们启动了一个 HBase 并使用 zookeeper 作为协调器, zk 的 IP 是 77.77.77.77, 77.77.77.78 和 77.77.77.79, 那么我们通过下面的方式连接到 HBase:
- JanusGraph g = JanusGraphFactory.build()
- .set("storage.backend", "hbase")
- .set("storage.hostname", "77.77.77.77, 77.77.77.78, 77.77.77.79")
- .open();
- Remote Server Mode with Gremlin Server
和 Cassandra 章节讲的一样, 我们可以在 JanusGraph server 外面再包一层 Gremlin server:
- http://gremlin-server.janusgraph.machine1/mygraph/vertices/1
- http://gremlin-server.janusgraph.machine2/mygraph/tp/gremlin?script=g.v(1).out('follows').out('created')
Gremlin-server 的配置文件也要做响应的修改, 下面是个例子:
- ...
- graphs: {
- g: conf/janusgraph-hbase.properties
- }
- scriptEngines: {
- gremlin-groovy: {
- plugins: { org.janusgraph.graphdb.tinkerpop.plugin.JanusGraphGremlinPlugin: {},
- org.apache.tinkerpop.gremlin.server.jsr223.GremlinServerGremlinPlugin: {},
- org.apache.tinkerpop.gremlin.tinkergraph.jsr223.TinkerGraphGremlinPlugin: {},
- org.apache.tinkerpop.gremlin.jsr223.ImportGremlinPlugin: {classImports: [java.lang.Math], methodImports: [java.lang.Math#*]},
- org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin: {files: [scripts/empty-sample.groovy]}}}}
- ...
HBase 的配置
在配置说明 https://docs.janusgraph.org/latest/config-ref.html 章节, 有一系列配置选项, 要注意 storage.hbase.table 参数, 默认 table 的名字是 janusgraph.
Gloabl Graph Operations
JanusGraph over HBase 支持全局的点和边的遍历, 但这种情况下, 会把所有的点和边导入到内存中, 可能会报 OutOfMemoryException 错误. 可以使用 Gremlin-hadoop https://docs.janusgraph.org/latest/hadoop-tp3.html 的方式去遍历.
InMemory Storage Backend
JanusGraph 支持使用纯内存, 可以通过下面的属性配置:
storage.backend=inmemory
可以在 Gremlin-console 直接打开内存中的图:
graph = JanusGraphFactory.build().set('storage.backend', 'inmemory').open()
这种情况下, 如果关闭该图或者停止 graph 的进程, 图的所有数据都会丢失. 这种模式也只支持单点模式, 不支持多个 JanusGraph 的图实例共享.
这种存储策略不适合在生产中使用.
后端索引(Index Backends)
查询谓语和数据类型
比较谓语
- eq (equal)
- neq (not equal)
- gt (greater than)
- gte (greater than or equal)
- lt (Less than)
- lte (Less than or equal)
文本操作谓语(Text Predicate)
主要可以用这些 operator 做 full-text search, 常见的有两类:
String 中以词为粒度的
- textContains
- textContainsPrefix
- textContainsRegex
- textContainsFuzzy
以整个 String 为粒度的
- textPrefix
- textRegex
- textFuzzy
区间操作谓语(Geo Predicate)
区间操作谓语包括:
- geoIntersect
- geoWithin
- geoDisjoint
- geoContains
查询样例
- g.V().has("name", "hercules")
- // 2) Find all vertices with an age greater than 50
- g.V().has("age", gt(50))
- // or find all vertices between 1000 (inclusive) and 5000 (exclusive) years of age and order by increasing age
- g.V().has("age", inside(1000, 5000)).order().by("age", incr)
- // which returns the same result set as the following query but in reverse order
- g.V().has("age", inside(1000, 5000)).order().by("age", decr)
- // 3) Find all edges where the place is at most 50 kilometers from the given latitude-longitude pair
- g.E().has("place", geoWithin(Geoshape.circle(37.97, 23.72, 50)))
- // 4) Find all edges where reason contains the Word "loves"
- g.E().has("reason", textContains("loves"))
- // or all edges which contain two words (need to chunk into individual words)
- g.E().has("reason", textContains("loves")).has("reason", textContains("breezes"))
- // or all edges which contain words that start with "lov"
- g.E().has("reason", textContainsPrefix("lov"))
- // or all edges which contain words that match the regular expression "br[ez]*s" in their entirety
- g.E().has("reason", textContainsRegex("br[ez]*s"))
- // or all edges which contain words similar to "love"
- g.E().has("reason", textContainsFuzzy("love"))
- // 5) Find all vertices older than a thousand years and named "saturn"
- g.V().has("age", gt(1000)).has("name", "saturn")
数据类型(Data Type Support)
Composite index 可以支持任何类型的 index,mixed index 只支持下面的数据类型:
- Byte
- Short
- Integer
- Long
- Float
- Double
- String
- Geoshape
- Date
- Instant
- UUID
- Geoshape Data Type
只有 mixed indexes 支持 Geoshape Data Type, 支持的数据类型有 point, circle, box, line, polygon, multi-point, multi-line 和 multi-polygon.
集合类型(Collections)
如果使用 Elasticsearch, 可以索引 cardinality 为 SET 或者 LIST 的属性, 如下面的例子:
- mgmt = graph.openManagement()
- nameProperty = mgmt.makePropertyKey("names").dataType(String.class).cardinality(Cardinality.SET).make()
- mgmt.buildIndex("search", Vertex.class).addKey(nameProperty, Mapping.STRING.asParameter()).buildMixedIndex("search")
- mgmt.commit()
- //Insert a vertex
- person = graph.addVertex()
- person.property("names", "Robert")
- person.property("names", "Bob")
- graph.tx().commit()
- //Now query it
- g.V().has("names", "Bob").count().next() //1
- g.V().has("names", "Robert").count().next() //1
索引参数和全局搜索
当我们定义一个 Mixed index 的时候, 每一个被添加到索引中的 property key 都有一系列参数可以设置.
Full-Text Search
全局索引, 这是一个很重要的功能. 当我们去索引字符串类型的 property key 的时候, 我们可以选择从 character 层面后者 text 层面去索引, 这需要改变 mapping 参数.
当我们从 text 层面去索引的时候, 字符串会被 tokenize 成 bag of words, 用户便可以去 query 是否包含一个或多个词, 这叫做 full-text search.
当我们从 char 层面去索引的时候, string 会直接和 char 串做 match, 不会有 futher analysis 后者 tokenize 操作. 这可以方便我们去查找是否包含某个字符序列, 这也叫做 string search.
下面分开讲:
Full-Text Search
默认地, string 会使用 text 层面的索引, 可以通过下面的方式显示地去创建:
- mgmt = graph.openManagement()
- summary = mgmt.makePropertyKey('booksummary').dataType(String.class).make()
- mgmt.buildIndex('booksBySummary', Vertex.class).addKey(summary, Mapping.TEXT.asParameter()).buildMixedIndex("search")
- mgmt.commit()
可以看到, 这和普通的创建索引唯一的一个区别是我们在调用 addKey() 方法的时候, 多添加了一个 Mapping.TEXT 映射参数.
前面我们提到过, 如果是使用 text 层面的 index,JanusGraph 会自己去维护一个 bag of words,JanusGraph 默认的 tokenization 方案是: 它会使用非字母数字的字段去 split, 然后会移除到所有小于 2 个字符的 token.
当我们使用 text 层面的 index 的时候, 只有全局索引的谓语才真正用到了我们创建的索引, 包括 textContains 方法, textContainsPrefix 方法, textContainsRegex 方法和 textContainsFuzzy 方法, 注意, full-text search 是 case-insensitive 的, 下面是具体的例子:
- import static org.janusgraph.core.attribute.Text.*
- g.V().has('booksummary', textContains('unicorns'))
- g.V().has('booksummary', textContainsPrefix('uni'))
- g.V().has('booksummary', textContainsRegex('.*corn.*'))
- g.V().has('booksummary', textContainsFuzzy('unicorn'))
- String Search
首先要明确的是, string search 会把数据 load 到内存中, 这其实是非常 costly 的.
可以通过下面的方式去显示地创建 string search:
- mgmt = graph.openManagement()
- name = mgmt.makePropertyKey('bookname').dataType(String.class).make()
- mgmt.buildIndex('booksBySummary', Vertex.class).addKey(name, Mapping.STRING.asParameter()).buildMixedIndex("search")
- mgmt.commit()
这种 bookname 会按照 as-is 的方式去分析, 包括 stop Word 和 no-letter character.
当我们使用 string 层面的 index 的时候, 只有下面的谓语才真正用到了我们创建的索引, 包括 eq,neq,textPrefix,textRegex 和 textFuzzy. 注意, string search 是 case-insensitive 的, 下面是具体的例子:
- import static org.apache.tinkerpop.gremlin.process.traversal.P.*
- import static org.janusgraph.core.attribute.Text.*
- g.V().has('bookname', eq('unicorns'))
- g.V().has('bookname', neq('unicorns'))
- g.V().has('bookname', textPrefix('uni'))
- g.V().has('bookname', textRegex('.*corn.*'))
- g.V().has('bookname', textFuzzy('unicorn'))
同时使用 text 和 string 层面的 full-text search
如果我们使用 Elasticsearch 作为后端, 这样就可以用所有的谓语去做精确或者模糊的查询了.
通过下面的方式创建这种叫做 Mapping.TEXTSTRING 的 full-text search 方案:
- mgmt = graph.openManagement()
- summary = mgmt.makePropertyKey('booksummary').dataType(String.class).make()
- mgmt.buildIndex('booksBySummary', Vertex.class).addKey(summary, Mapping.TEXTSTRING.asParameter()).buildMixedIndex("search")
- mgmt.commit()
- Geo Mapping
默认地, JanusGraph 支持索引点 (point) 的索引, 并且去查询 circle 或者 box 类型的 property, 如果想索引一个非 - 点类型的 property, 需要使用 Mapping.PREFIX_TREE:
- mgmt = graph.openManagement()
- name = mgmt.makePropertyKey('border').dataType(Geoshape.class).make()
- mgmt.buildIndex('borderIndex', Vertex.class).addKey(name, Mapping.PREFIX_TREE.asParameter()).buildMixedIndex("search")
- mgmt.commit()
- Direct Index Query
可以直接向 index backend 发送 query, 下面是个例子:
- ManagementSystem mgmt = graph.openManagement();
- PropertyKey text = mgmt.makePropertyKey("text").dataType(String.class).make();
- mgmt.buildIndex("vertexByText", Vertex.class).addKey(text).buildMixedIndex("search");
- mgmt.commit();
- // ... Load vertices ...
- for (Result<Vertex> result : graph.indexQuery("vertexByText", "v.text:(farm uncle berry)").vertices()) {
- System.out.println(result.getElement() + ":" + result.getScore());
- }
需要指明两个元素:
想要查询的 index backend 的 index 名字, 在上面的例子中是 vertexByText .
查询语句, 在上面的例子中是
- v.text:(farm uncle berry)
- .
- Elasticsearch
- Running Elasticsearch
下载包里面本身包含兼容的 Elasticsearch 的 distribution, 通过:
Elasticsearch/bin/Elasticsearch
来运行 Elasticsearch. 要注意的是, es 不能使用 root 运行.
ES 配置
JanusGraph 支持通过 HTTP 客户端连接到正在运行的 ES 集群.
在配置文件中, Elasticsearch client 需要通过下面这一行指明:
index.search.backend=Elasticsearch
通过 index.[X].hostname 指明某一个或者一系列 es 的实例的地址:
- index.search.backend=Elasticsearch
- index.search.hostname=10.0.0.10:9200
可以通过下面的方式绑定要一段连续的 IP:PORT 对:
- index.search.backend=Elasticsearch
- index.search.hostname=10.0.0.10, 10.0.0.20:7777
- REST Client
REST client 可以通过 index.[X].bulk-refresh 参数控制改变多久能被索引到.
REST Client 既可以配置成 HTTP 的方式也可以配置成 HTTPS 的方式.
HTTPS authentification
可以通过 index.[X].Elasticsearch.ssl.enabled 开启 HTTP 的 SSL 支持. 注意, 这可能需要修改 index.[X].port 参数, 因为 ES 的 HTTPS 服务的端口号可能和通常意义的 REST API 端口 (9200) 不一样.
HTTP authentification
可以通过配置 index.[X].Elasticsearch.http.auth.basic.realm 参数来通过 HTTP 协议做认证.
- index.search.Elasticsearch.http.auth.type=basic
- index.search.Elasticsearch.http.auth.basic.username=httpuser
- index.search.Elasticsearch.http.auth.basic.password=httppassword
- tips:
可以自己实现 class 来实现认证:
- index.search.Elasticsearch.http.auth.custom.authenticator-class=fully.qualified.class.Name
- index.search.Elasticsearch.Elasticsearch.http.auth.custom.authenticator-args=arg1,arg2,...
自己实现的 class 必须实现 org.janusgraph.diskstorage.es.REST.util.RestClientAuthenticator 接口.
高级功能
- Advanced Schema
- Static Vertex
Vertex label 可以定义为 static 的, 一旦创建, 就不能修改了.
- mgmt = graph.openManagement()
- tweet = mgmt.makeVertexLabel('tweet').setStatic().make()
- mgmt.commit()
- Edge and Vertex TTL
边和顶点可以配置对应的 time-to-live(TTL), 这个概念有点类似于数据库中的临时表的概念, 用这种方式创建的点和边在使用一段时间以后会被自动移除掉.
- Edge TTL
- mgmt = graph.openManagement()
- visits = mgmt.makeEdgeLabel('visits').make()
- mgmt.setTTL(visits, Duration.ofDays(7))
- mgmt.commit()
需要注意的是, 这种方法后端数据库必须支持 cell level TTL, 目前只有 Cassandra 和 HBase 支持.
- Property TTL
- mgmt = graph.openManagement()
- sensor = mgmt.makePropertyKey('sensor').cardinality(Cardinality.LIST).dataType(Double.class).make()
- mgmt.setTTL(sensor, Duration.ofDays(21))
- mgmt.commit()
- Vertex TTL
- mgmt = graph.openManagement()
- tweet = mgmt.makeVertexLabel('tweet').setStatic().make()
- mgmt.setTTL(tweet, Duration.ofHours(36))
- mgmt.commit()
- Undirected Edges
- mgmt = graph.openManagement()
- mgmt.makeEdgeLabel('author').unidirected().make()
- mgmt.commit()
这种 undirected edge 只能通过 out-going 的方向去遍历, 这有点像万维网.
Eventually-Consistent Storage Backends
底层数据的最终一致性问题.
Eventually consistent storage backend 有哪些? Apache Cassandra 或者 Apache HBase 其实都是这种数据库类型.
数据的一致性
通过 JanusGraphManagement.setConsistency(element, ConsistencyModifier.LOCK) 方法去定义数据的一致性问题, 如下面的例子:
- mgmt = graph.openManagement()
- name = mgmt.makePropertyKey('consistentName').dataType(String.class).make()
- index = mgmt.buildIndex('byConsistentName', Vertex.class).addKey(name).unique().buildCompositeIndex()
- mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex
- mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph
- mgmt.commit()
使用锁其实开销还是很大的, 在对数据一致性要求不高的情形, 最好不用锁, 让后期数据库自己在读操作中去解决数据一致性问题.
当有两个事务同时对一个元素进行写操作的时候, 怎么办呢? 我们可以先让写操作成功, 然后后期再去解决一致性问题, 具体有两种思路解决这个问题:
Forking Edges
思想就是, 每一个事务 fork 一个对应的要修改的 edge, 再根据时间戳去在后期修改.
下面是个例子:
- mgmt = graph.openManagement()
- related = mgmt.makeEdgeLabel('related').make()
- mgmt.setConsistency(related, ConsistencyModifier.FORK)
- mgmt.commit()
这里, 我们创建了一个 edge label, 叫做 related, 然后我们把一致性属性设置成了 ConsistencyModifier.FORK.
这个策略只对 MULTI 类别的边适用. 其他的 multiplicity 并不适用, 因为其它 multiplicity 显式地应用了锁.
Failure & Recovery
失败和恢复, 主要是两个部分:
事务的失败和恢复
实例的宕机和恢复
事务的失败和恢复
事务如果在调用 commit() 之前失败, 是可以恢复的. commit() 之前的改变也会被回滚.
有时候, 数据 persist 到存储系统的过程成功了, 但创建 index 的的过程却失败了. 这种情况下, 该事务会被认为成功了, 因为底层存储才是 source of graph.
但这样会带来数据和索引的不一致性. JanusGraph 维护了一份 transaction write-ahead log, 对应的有两个参数可以调整:
- tx.log-tx = true
- tx.max-commit-time = 10000
如果一个事务的 persistance 过程超过了 max-commit-time,JanusGrpah 会尝试从中恢复. 与此同时, 另外有一个进程去扫描维护好的这份 log, 去 identify 那些只成功了一半的事务. 建议使用另一台机器专门去做失败恢复, 运行:
recovery = JanusGraphFactory.startTransactionRecovery(graph, startTime, TimeUnit.MILLISECONDS);
transaction write-ahead log 本身也有维护成本, 因为涉及到大量的写操作. transaction write-ahead log 自动维护的时间是 2 天, 2 天前的数据会被自动删除.
对于这样的系统, 如何 fine tune log system 也是需要仔细考虑的因素.
实例的恢复
如果某个 JanusGraph instance 宕机了, 其他的实例应该不能受影响. 如果涉及到 schema 相关的操作, 比如创建索引, 这就需要不同 instance 保持协作了, JanusGraph 会自动地去维护一份 running instance 的列表, 如果某一个实例被意外关闭了, 创建索引的操作就会失败.
在这种情况下, 有一个方案是去手动地 remove 某一个实例:
- mgmt = graph.openManagement()
- mgmt.getOpenInstances() //all open instances
- ==>7f0001016161-dunwich1(current)
- ==>7f0001016161-atlantis1
- mgmt.forceCloseInstance('7f0001016161-atlantis1') //remove an instance
- mgmt.commit()
但这样做有数据不一致的风险, 应该尽量少使用这种方式.
索引的管理
重新索引
一般来讲, 我们在创建 schema 的时候, 就应该把索引建立好, 如果事先没有创建好索引, 就需要重新索引了.
可以通过两种方式来执行重索引:
- MapReduce
- JanusGraphManagement
具体的代码可以参考:
删除索引
删除索引分两步:
JanusGraph 通知所有其他的实例, 说明索引即将被删除, 索引便会标记成 DISABLED 状态, 此时 JanusGraph 便会停止使用该索引去回答查询, 或者更新索引, 索引相关的底层数据还保留但会被忽略.
根据索引是属于 composite 索引还是 mixed 索引, 如果是 composite 索引, 可以直接用
JanusGraphManagement
或者 MapReduce 去删除, 如果是 mixed 索引就比较麻烦了, 因为这涉及到后端存储的索引, 所以需要手动地去后端 drop 掉对应的索引.
重建索引的相关问题 v
当一个索引刚刚被建立, 就执行重索引的时候, 可能会报如下错误:
- The index mixedExample is in an invalid state and cannot be indexed.
- The following index keys have invalid status: desc has status INSTALLED
- (status must be one of [REGISTERED, ENABLED])
这是因为建立索引后, 索引信息会被慢慢地广播到集群中其他的 Instances, 这需要一定的时间,所以, 最好不要在索引刚刚建立以后就去执行重索引任务.
大规模导入(Bulk Loading)
大规模导入需要的配置
通过 storage.batch-loading 参数来支持 Bulk loading.
如果打开了 Builk loading, 最好关闭自动创建 schema 的功能(schema.default = none). 因为 automatic type creation 会不断地 check 来保证数据的一致性和完整性, 对于 Bulk loading 的场合, 这或许是不需要的.
另外一个需要关注的参数是 ids.block-size, 可以通过增大这个参数来减少 id 获取过程的数量(id block acquisition process), 但这会造成大量的 id 浪费, 这个参数需要根据每台机器添加的顶点的数量来做调整, 默认值已经比较合理了, 如果不行, 可以适当地增大这个数值(10 倍, 100 倍, 比如).
对于这个参数, 有个技巧: Rule of thumb: Set ids.block-size to the number of vertices you expect to add per JanusGraph instance per hour.
Note: 要保证所有 JanusGraph instance 这个参数的配置都一样, 如果需要调整这个参数, 最好先关闭所有的 instance, 调整好后再上线.
如果有多个实例, 这些实例在不断地分配 id, 可能会造成冲突问题, 有时候甚至会报出异常, 一般来说, 对于这个问题, 可以调整下面几个参数:
ids.authority.wait-time
: 单位是 milliseconds,id pool mamager 在等待 id block 应用程序获得底层存储所需要等待的时间, 这个时间越短, 越容易出问题.
Rule of thumb: Set this to the sum of the 95th percentile read and write times measured on the storage backend cluster under load. Important: This value should be the same across all JanusGraph instances.
ids.renew-timeout: 单位是 milliseconds,JanusGraph 的 id pool manager 在获取新一个 id 之前会等待的总时间.
Rule of thumb: Set this value to be as large feasible to not have to wait too long for unrecoverable failures. The only downside of increasing it is that JanusGraph will try for a long time on an unavailable storage backend cluster.
还有一些需要注意的读写参数:
storage.buffer-size: 我们执行很多 query 的时候, JanusGraph 会把它们封装成一个个的小 batch, 然后推送到后端的存储执行, 当我们在短时间内执行大量的写操作的时候, 后端存储可能承受不了这么大的压力. 在这种情况下, 我们可以增大这个 buffer 参数, 但与此相对的代价是每秒中可以发送的 request 数量会减小. 这个参数不建议在用事务的方式导入数据的时候进行修改.
storage.read-attempts 和 storage.write-attempts 参数, 这个参数指的是每个推送到后端的 batch 会被尝试多少次(直至认为这个 batch fail), 如果希望在导数据的时候支持 high load, 最好调大这几个参数.
storage.attempt-wait 参数指定了 JanusGraph 在重新执行一次失败的操作之前会等待的时间(millisecond), 这个值越大, 后端能抗住的 load 越高.
Graph Partitioning
分区策略, 主要是两种:
Edge Cut
砍边策略, 经常一起遍历到的点尽量放在同一个机器上.
Vertex Cut
砍点策略. 砍边策略的目的是减小通信量, 砍点策略主要是为了处理 hotspot 问题(超级点问题), 比如有的点, 入度非常大, 这种情况下, 用邻接表的方式 + 砍边的方式存储的话, 势必造成某一个分区上某一个点的存储量过大(偏移), 这个时候, 利用砍点策略, 把这种点均匀地分布到不同的 partition 上面就显得很重要了.
一个典型的场景是 User 和 Product 的关系, product 可能只有几千个, 但用户却有上百万个, 这种情况下, product 最好就始终砍点策略.
对与分区这个问题, 如果数据量小, 就用随机分区 (默认的) 就好, 如果数据量过大, 就要好好地去 fine tune 分区的策略了.
JanusGraph with TinkerPop's Hadoop-Gremlin
JanusGraph 和 TinkerPop 的 Hadoop 框架的整合问题. JanusGraph 和 Apache Spark 还有 Hadoop 的整合主要是依靠社区的力量.
来源: http://www.bubuko.com/infodetail-3108267.html