在 Apache Geode 中,成员之间以点对点的网络形式互相连接形成分布式系统。Apache Geode 支持动态成员关系,集群中的成员能够随时加入和离开分布式系统而不影响其他成员正常运行。这种动态化成员关系的能力是非常重要的,在较高的 SLA 要求下,允许有状态的应用内置在分布式系统中而不影响高性能和高可靠性。成员关系的改变不引入锁和资源争抢。成员之间能够通过多播或者基于 TCP/IP 的位置发现服务互相发现,如果网络没有启用多播,系统将自动选举组成员协调者 - 一个成员负责授权其他成员加入到分布式系统中和把成员关系改变通告给系统中的所有成员。
Apache Geode 整体架构如下图所示:
(点击放大图像)
作为 Apache Geode 作为分布式内存管理平台,有着极强的弹性伸缩能力,这得益于 Apache Geode 内部复杂的分布式结构来保证。下面我们就来剖析一下 Apache Geode 分布式系统的内部结构。
Apache Geode 的核心是由几个单例系统组成:
Apache Geode 有自己的序列化框架,允许开发者实现自己的 DataSerializable 接口。序列化框架的示例可参考 https://github.com/theseusyang/geodedemo
其中 Data 类为 Geode 序列化的代码示例。
Data 类实现了 DataSerializable 接口, 开发了自己的 public void fromData(DataInput in) 和 public void toData(DataOutput out) 方法,将 Data 类的属性作为数据输入对象和数据输出对象进行序列化和反序列化进行传递。
内部类使用相同的框架进行序列化,但我们不使用公共的 Instantiator 框架。相反,内部类实现 DataSerializableFixedID。
Geode 使用 log4j2 进行日志记录。要获取记录器,请向类中添加一个静态记录器字段:
private static final Logger logger = LogService.getLogger();
在特定的分布式系统成员上,您可以通过使用 InternalDistributedMember 引用其他成员。当你发送一条消息给对等体,你传递 InternalDistributedMember 对象以标明目的地。该 DM 提供了获取所有当前正在运行的对等体的所有名单,并增加监听器监控分布式系统内对等体的加入和离开。最终从 DistributionAdvisor 获取成员列表。请参阅下面的顾问部分。
要创建新消息类型,首先创建子类 DistributionMessage。子类应该实现 DataSerializableFixedID,并且负责序列化需要作为消息的一部分发送的任何字段。实现过程方法。在远程端点,将调用过程方法,并且可以在远程端点执行所需的任何逻辑。
要发送消息,请创建您的 DistributionMessage 子类的实例,调用 setRecipient 或 setRecipients 以标明目标,并调用 DM.putOutgoing(yourMessage)。该消息将发送给收件人。
(点击放大图像)
如果 Geode 分布式系统能够发送消息,说明节点间可以正常通信,但您还需要能够发送获取响应的请求,或等待,直到你确定对等体已处理消息。
应答处理器模式用于处理需要应答的消息。这里是一个代码示例(这是从 RemovePersistentMemberMessage。
- Set members =// 从顾问或从 DM 获得此成员列表
- DM dm =// 分布式管理器. 我们能够从上下文中获得这个 DM
- // (例如, 一个区域能够找到它的 DM
- ReplyProcessor21 processor =newReplyProcessor21(dm, members);
- RemovePersistentMemberMessage msg =newRemovePersistentMemberMessage(regionPath,
- id, initializingId, processor.getProcessorId());
- msg.setRecipients(members);
- dm.putOutgoing(msg);
- processor.waitForRepliesUninterruptibly();
第一行创建一个 ReplyProcessor21 的实例,并向其传递一组收件人。这实际上做的是分配回答处理器一个唯一的 ID,并注册一个单身的 ProcessorKeeper。参见 ReplyProcessor21.keeper。处理器的唯一 ID 将添加到调用 processor.getProcessorId()的消息中。
通过 putOutgoing 的调用,消息被发送到收件人。然后线程在处理器. waitForRepliesUninterruptibly()中等待答复处理器。
在远程调用方面,这是过程方法的样子:
- protectedvoidprocess(DistributionManager dm) {
- try{
- // 删除持久化成员
- //...
- }finally{
- ReplyMessage replyMsg =newReplyMessage();
- replyMsg.setRecipient(getSender());
- replyMsg.setProcessorId(processorId);
- if(exception !=null) {
- replyMsg.setException(exception);
- }
- dm.putOutgoing(replyMsg);
- }
- }
这里,远程端发送具有相同唯一 ID 的 ReplyMessage。最后,当在消息的原始发送者上接收到 ReplyMessage 时,它查找 ReplyProcessor 并通知它。这会唤醒 processor.waitForRepliesUninterruptibly() 中的线程。
分发管理器跟踪所有对等体。在通常情况下,我们想要在分布式系统中形成子组。例如,Geode 仅向具有特定区域的对等体发送消息。
DistributionAdvisor 类提供创建群组的能力。每个成员创建一个 DistributionAdvisor 的实例,并执行初始化配置文件交换的操作。在配置文件交换期间,对等体给予配置文件的每个其他实例,其提供关于该对等体的信息。
在代码中,用户可以从 DistributionAdvisor 获取对等体列表。
不同的组件使用 DistributionAdvisor 和 Profile 的不同子类。例如,DistributedRegion 使用 CacheDistributionAdvisor 交换 CacheProfile 对象。CacheDistributionAdvisor 提供了特殊的方法来根据对象的配置文件内容获取对等体的子组。例如,CacheDistributionAdvisor.advisePersistentMembers() 仅返回对指定区域使用持久性的成员。
LocalRegion
LocalRegion 是其他类型区域的基类。本地区域实现公共区域接口并保存具有区域数据的 HashMap。
(点击放大图像)
DistributedRegion 类是我们用于复制区域的类。分布式区域扩展了本地区域,但是具有用于消息传递的顾问。
(点击放大图像)
当用户创建分区区域时,创建一个 PartitionedRegion 类的实例。本质上,分区区域的工作方式是,对于用户放置的每个键,我们使用密钥的哈希码来确定密钥应该去的桶。然后将桶分配给各个成员。分区区域确保我们制作足够的存储桶副本以满足冗余级别。
(点击放大图像)
当向一个成员分配存储桶时,该成员将创建一个 BucketRegion 的实例,该实例扩展了 DistributedRegion。bucket 区域存储 bucket 的实际数据。
(点击放大图像)
存储桶区域与分布区域稍有不同,因为它们具有主要区域。选择一个成员作为每个存储桶的主要成员。当用户执行 put 时,PartitionedRegion 类将 put 放置到主服务器。
(点击放大图像)
客户端服务器消息传递的实现方式与对等消息传递层不同。消息传递的服务器端部分在 com.gemstone.gemfire.internal.cache.tier.sockets.command 中实现每个消息类型都有自己的服务器端类,它读取特定消息,处理它和回复。
消息的客户端在 com.gemstone.gemfire.cache.client.internal 作为 AbstractOp 的子类。这些操作负责发送消息并读取回复。
服务器代码在 BridgeServerImpl 和 AcceptorImpl 中实现。AcceptorImpl 具有实际的服务器套接字和运行循环,它接受该套接字上的连接。
(点击放大图像)
实际上,服务器有两种不同的模式,基于 max-connections 和 max-threads 设置。如果允许服务器使用与连接一样多的线程,则每个套接字都会获得自己的专用线程。如果允许的连接多于连接,则连接由线程池和选择器处理。
从连接读取的线程在 ServerConnection 中实现。
在客户端,客户端到服务器的连接被管理 PoolImpl。每个客户区域保存一个区域可以调用操作的 ServerRegionProxy。代理使用池将操作发送到服务器端。
(点击放大图像)
客户端可以订阅具有 Region.register 兴趣或连续查询的事件。当服务器端的事件通过异步队列更改时,它们将接收更新。
(点击放大图像)
每个成员写入它自己的独立磁盘存储。成员从不共享磁盘工件。例如,如果新成员需要创建区域的副本,我们从一个成员读取磁盘文件,将逻辑键和值发送给另一个成员,该成员写入它自己的磁盘文件。
DiskStoreImpl 类拥有一个特定的磁盘存储。在磁盘上,磁盘存储的布局看起来像这样:
- BACKUPdiskStore1.if
- BACKUPdiskStore1_1.crf
- BACKUPdiskStore1_1.drf
- BACKUPdiskStore1_1.krf
- BACKUPdiskStore1_2.crf
- BACKUPdiskStore1_2.drf
- BACKUPdiskStore1_2.krf
- BACKUPdiskStore1_3.crf
- BACKUPdiskStore1_3.drf
.if 文件包含磁盘存储的元数据(磁盘存储中的什么区域,每个区域的成员资格信息等)。在内部,init 文件由 DiskInitFile 类管理。每当进行元数据更改时,逻辑更改将附加到. if 文件。在恢复时,读取整个文件,并应用所有更改,直到最终的元数据。当. if 文件变大时,我们将当前元数据转储为新文件,并删除旧的. if 文件。
_X.crf,_X.drf 和_X.krf 文件一起构成操作日志或 oplog。因此,在上面的示例中,有三个 oplog,编号为 1,2 和 3. Oplog 包含区域操作。.crf 文件包含创建或更新。.drf 文件包含任何删除。并且可选的. krf 文件包含 oplog 中的键的副本,以加速恢复。每个 oplog 由 Oplog 类管理。
当用户执行操作时,操作被记录到当前 oplog。当当前 oplog 达到给定的大小限制时,它关闭并且开始新的 oplog。在这一点上,我们可以为旧的 oplog 生成一个. krf 文件。
在恢复期间,我们读取所有的 oplogs 以建立该地区的内容。如果一个 krf 文件存在,我们可以读取而不是. crf。这意味着我们不需要在恢复期间读取值。
为了防止 oplogs 永远增长,老 oplogs 被压缩。当旧 oplog 中的超过 50%的数据是垃圾(因为对较新的 oplog 中的相同键有较新的更新)时,剩余数据被向前复制到当前 oplog,并且旧的 oplog 从磁盘中删除。
(点击放大图像)
Geode 提供了一种类似 SQL 的查询语言,称为 OQL,允许应用程序访问存储在区域中的数据。
OQL 允许用于查询复杂对象,对象属性和方法的附加功能。
查询 - 可以在 DefaultQuery 中找到查询的实现。这个类是查询执行的起点,是在调试器中开始跟踪的好地方。当执行查询时,引擎采用的第一步是解析查询。我们使用 antlr 来完成这个任务,生成一个 AST 树,然后编译树并执行。
查询分区区域 - 分区区域查询执行稍微复杂一些。存在用于确定向哪些节点发送查询,在远程节点上执行查询以及在协调节点上收集结果的逻辑,其中诸如排序结果和重试失败的节点。这样做的逻辑可以在 PRQueryProcessor 和 PartitionedRegionQueryEvaluator 中找到。
IndexManager - 当在区域上创建索引时,为每个区域创建一个 IndexManager。每个区域有一个索引管理器。索引管理器管理每个索引的创建,删除和更新。它保留索引表达式到索引的映射,以便可以避免重复的索引,并且还用于确定索引是否可用于查询。
索引 - 用于为经常被查询的字段提供速度改进,但是以保持索引的内存成本和非常小的成本。内部有各种类型的索引。这些包括 CompactRangeIndex,RangeIndex,HashIndex,MapRangeIndex,CompactMapRangeIndex,PartitionedIndex 和 PrimaryKeyIndex。
来源: http://www.infoq.com/cn/articles/internal-structure-of-apache-geode-distributed-system