在第六章《路由表》中,客户端进行会话时,首先要获取对方的Session实例。获取Session实例的方法,是先查找本地路由表,若找不到,则通过路由表中的缓存数据,由定位器获取。
路由表中的缓存,如下:
- public RoutingTableImpl() {
- super("Routing table");
- serversCache = CacheFactory.createCache(S2S_CACHE_NAME);
- componentsCache = CacheFactory.createCache(COMPONENT_CACHE_NAME);
- usersCache = CacheFactory.createCache(C2S_CACHE_NAME);
- anonymousUsersCache = CacheFactory.createCache(ANONYMOUS_C2S_CACHE_NAME);
- usersSessions = CacheFactory.createCache(C2S_SESSION_NAME);
- localRoutingTable = new LocalRoutingTable();
- }
这些缓存中,存储了整个集群内的所有Session信息。Openfire实现了对集群的支持接口,可能通过插件的形式构建集群。具体如何实现,下面来分析。本文使用的集群插件为Hazelcast。
- 1、interface:
- RemoteSessionLocator ----> Session远程定位器,用于从集群中获取Session
- ClusterEventListener ----> 监听集群加入、离开事件
- CacheFactoryStrategy ----> 缓存策略接口
- 2、class:
- ClusterManager ----> 集群管理类,管理自身而非集群。集群内的Master、缓存同步等由插件处理
- CacheFactory ----> 缓存工厂类
- DefaultLocalCacheStrategy ----> 本地缓存策略,实现CacheFactoryStrategy接口
- ClusteredCacheFactory ----> 集群缓存策略,实现CacheFactoryStrategy接口
- // 插件启动
- initializePlugin():ClusterManager.startup();
- // 插件销毁
- destroyPlugin():ClusterManager.shutdown();
CluterManager中提供了集群事件的处理方法,主要如下两个队列进行:
- private static Queue < ClusterEventListener > listeners = new ConcurrentLinkedQueue < >();
- private static BlockingQueue < Event > events = new LinkedBlockingQueue < >(10000);
listeners:用于通知所有注册了ClusterEventListener事件的组件
events:用于存储集群中所有设备进、出集群的事件
并相应的提供了如下几个方法,用于操作这两个队列:
- public static void fireJoinedCluster(byte[] nodeID, boolean asynchronous) {
- try {
- Event event = new Event(EventType.joined_cluster, nodeID);
- events.put(event);
- if (!asynchronous) {
- while (!event.isProcessed()) {
- Thread.sleep(50);
- }
- }
- } catch(InterruptedException e) {
- // Should never happen
- Log.error(e.getMessage(), e);
- }
- }
- public static void fireLeftCluster(byte[] nodeID) {
- try {
- Event event = new Event(EventType.left_cluster, nodeID);
- events.put(event);
- } catch(InterruptedException e) {
- // Should never happen
- Log.error(e.getMessage(), e);
- }
- }
- public static void addListener(ClusterEventListener listener) {
- if (listener == null) {
- throw new NullPointerException();
- }
- listeners.add(listener);
- }
- public static void removeListener(ClusterEventListener listener) {
- listeners.remove(listener);
- }
集群的启动方法
- public static synchronized void startup() {
- if (isClusteringEnabled() && !isClusteringStarted()) {
- initEventDispatcher();
- CacheFactory.startClustering();
- }
- }
上面代码中, initEventDispatcher()方法,启动一个线程,根据events事件队列,完成事件调度。
- private static void initEventDispatcher() {
- if (dispatcher == null || !dispatcher.isAlive()) {
- dispatcher = new Thread("ClusterManager events dispatcher") {@Override public void run() {
- // exit thread if/when clustering is disabled
- while (ClusterManager.isClusteringEnabled()) {
- try {
- Event event = events.take();
- EventType eventType = event.getType();
- // Make sure that CacheFactory is getting this events first (to update cache structure)
- if (event.getNodeID() == null) {
- // Replace standalone caches with clustered caches and migrate data
- if (eventType == EventType.joined_cluster) {
- CacheFactory.joinedCluster();
- } else if (eventType == EventType.left_cluster) {
- CacheFactory.leftCluster();
- }
- }
- // Now notify rest of the listeners
- for (ClusterEventListener listener: listeners) {
- try {
- switch (eventType) {
- case joined_cluster:
- {
- if (event.getNodeID() == null) {
- listener.joinedCluster();
- } else {
- listener.joinedCluster(event.getNodeID());
- }
- break;
- }
- case left_cluster:
- {
- if (event.getNodeID() == null) {
- listener.leftCluster();
- } else {
- listener.leftCluster(event.getNodeID());
- }
- break;
- }
- case marked_senior_cluster_member:
- {
- listener.markedAsSeniorClusterMember();
- break;
- }
- default:
- break;
- }
- } catch(Exception e) {
- Log.error(e.getMessage(), e);
- }
- }
- // Mark event as processed
- event.setProcessed(true);
- } catch(Exception e) {
- Log.warn(e.getMessage(), e);
- }
- }
- }
- };
- dispatcher.setDaemon(true);
- dispatcher.start();
- }
- }
集群的关闭方法:
- public static synchronized void shutdown() {
- if (isClusteringStarted()) {
- Log.debug("ClusterManager: Shutting down clustered cache service.");
- CacheFactory.stopClustering();
- }
- }
由上过程可以看出,集群功能的具体实现,主要集中CacheFactory类中。
- private static Map<String, Cache> caches = new ConcurrentHashMap<>();
通过调用指定的缓存策略构造缓存,并存入队列中:
- @SuppressWarnings("unchecked") public static synchronized < T extends Cache > T createCache(String name) {
- T cache = (T) caches.get(name);
- if (cache != null) {
- return cache;
- }
- cache = (T) cacheFactoryStrategy.createCache(name);
- log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name);
- return wrapCache(cache, name);
- }
Openfire定义的缓存策略有两种,本地缓存、集群缓存。这两种缓存策略对应的类名由Openfire预先定好。本地缓存由Openfire自身实现,集群缓存由集群插件按定好的类名规范实现。
两种缓存机制的类名如下:
- static {
- localCacheFactoryClass = JiveGlobals.getProperty(LOCAL_CACHE_PROPERTY_NAME,
- "org.jivesoftware.util.cache.DefaultLocalCacheStrategy");
- clusteredCacheFactoryClass = JiveGlobals.getProperty(CLUSTERED_CACHE_PROPERTY_NAME,
- "org.jivesoftware.openfire.plugin.util.cache.ClusteredCacheFactory");
- }
无集群的情况,使用本地缓存:
- public static synchronized void initialize() throws InitializationException {
- try {
- localCacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass).newInstance();
- cacheFactoryStrategy = localCacheFactoryStrategy;
- } catch(Exception e) {
- log.error("Failed to instantiate local cache factory strategy: " + localCacheFactoryClass, e);
- throw new InitializationException(e);
- }
- }
当加入集群时,切换为集群缓存:
- @SuppressWarnings("unchecked") public static synchronized void joinedCluster() {
- cacheFactoryStrategy = clusteredCacheFactoryStrategy;
- // Loop through local caches and switch them to clustered cache (copy content)
- for (Cache cache: getAllCaches()) {
- // skip local-only caches
- if (localOnly.contains(cache.getName())) continue;
- CacheWrapper cacheWrapper = ((CacheWrapper) cache);
- Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
- clusteredCache.putAll(cache);
- cacheWrapper.setWrappedCache(clusteredCache);
- }
- clusteringStarting = false;
- clusteringStarted = true;
- log.info("Clustering started; cache migration complete");
- }
切换的方法是将本地缓存使用集群缓存策略重新生成一次,这时,本地的缓存将会被同步到集群中的各个机器上。
当离开集群时,又会切换为本地缓存:
- @SuppressWarnings("unchecked") public static synchronized void leftCluster() {
- clusteringStarted = false;
- cacheFactoryStrategy = localCacheFactoryStrategy;
- // Loop through clustered caches and change them to local caches (copy content)
- for (Cache cache: getAllCaches()) {
- // skip local-only caches
- if (localOnly.contains(cache.getName())) continue;
- CacheWrapper cacheWrapper = ((CacheWrapper) cache);
- Cache standaloneCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
- standaloneCache.putAll(cache);
- cacheWrapper.setWrappedCache(standaloneCache);
- }
- log.info("Clustering stopped; cache migration complete");
- }
集群缓存策略,是Openfire与集群组件的过渡层。由Openfire制定了接口规范CacheFactoryStrategy,且包名必须为org.jivesoftware.openfire.plugin.util.cache.ClusteredCacheFactory,其中的方法,由具体的集群插件来完成。
集群缓存的创建:
- public Cache createCache(String name) {
- // Check if cluster is being started up
- while (state == State.starting) {
- // Wait until cluster is fully started (or failed)
- try {
- Thread.sleep(250);
- } catch(InterruptedException e) {
- // Ignore
- }
- }
- if (state == State.stopped) {
- throw new IllegalStateException("Cannot create clustered cache when not in a cluster");
- }
- return new ClusteredCache(name, hazelcast.getMap(name));
- }
其中,CluteredCache对象的生成,是实现数据同步的关键:
- return new ClusteredCache(name, hazelcast.getMap(name));
表明该缓存队列是Hazelcast中定义的,当队列发生变更时,实际上是更新了Hazelcast中的内容。
启动集群的方法
- public boolean startCluster() {
- state = State.starting;
- // Set the serialization strategy to use for transmitting objects between node clusters
- serializationStrategy = ExternalizableUtil.getInstance().getStrategy();
- ExternalizableUtil.getInstance().setStrategy(new ClusterExternalizableUtil());
- // Set session locator to use when in a cluster
- XMPPServer.getInstance().setRemoteSessionLocator(new RemoteSessionLocator());
- // Set packet router to use to deliver packets to remote cluster nodes
- XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new ClusterPacketRouter());
- ClassLoader oldLoader = null;
- // Store previous class loader (in case we change it)
- oldLoader = Thread.currentThread().getContextClassLoader();
- ClassLoader loader = new ClusterClassLoader();
- Thread.currentThread().setContextClassLoader(loader);
- int retry = 0;
- do {
- try {
- Config config = new ClasspathXmlConfig(HAZELCAST_CONFIG_FILE);
- config.setInstanceName("openfire");
- config.setClassLoader(loader);
- if (JMXManager.isEnabled() && HAZELCAST_JMX_ENABLED) {
- config.setProperty("hazelcast.jmx", "true");
- config.setProperty("hazelcast.jmx.detailed", "true");
- }
- hazelcast = Hazelcast.newHazelcastInstance(config);
- cluster = hazelcast.getCluster();
- // Update the running state of the cluster
- state = cluster != null ? State.started: State.stopped;
- // Set the ID of this cluster node
- XMPPServer.getInstance().setNodeID(NodeID.getInstance(getClusterMemberID()));
- // CacheFactory is now using clustered caches. We can add our listeners.
- clusterListener = new ClusterListener(cluster);
- lifecycleListener = hazelcast.getLifecycleService().addLifecycleListener(clusterListener);
- membershipListener = cluster.addMembershipListener(clusterListener);
- break;
- } catch(Exception e) {
- if (retry < CLUSTER_STARTUP_RETRY_COUNT) {
- logger.warn("Failed to start clustering (" + e.getMessage() + "); " + "will retry in " + CLUSTER_STARTUP_RETRY_TIME + " seconds");
- try {
- Thread.sleep(CLUSTER_STARTUP_RETRY_TIME * 1000);
- } catch(InterruptedException ie) {
- /* ignore */
- }
- } else {
- logger.error("Unable to start clustering - continuing in local mode", e);
- state = State.stopped;
- }
- }
- } while ( retry ++< CLUSTER_STARTUP_RETRY_COUNT );
- if (oldLoader != null) {
- // Restore previous class loader
- Thread.currentThread().setContextClassLoader(oldLoader);
- }
- return cluster != null;
- }
停止集群的方法
- public void stopCluster() {
- // Stop the cache services.
- cacheStats = null;
- // Update the running state of the cluster
- state = State.stopped;
- // Stop the cluster
- Hazelcast.shutdownAll();
- cluster = null;
- if (clusterListener != null) {
- // Wait until the server has updated its internal state
- while (!clusterListener.isDone()) {
- try {
- Thread.sleep(100);
- } catch(InterruptedException e) {
- // Ignore
- }
- }
- hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListener);
- cluster.removeMembershipListener(membershipListener);
- lifecycleListener = null;
- membershipListener = null;
- clusterListener = null;
- }
- // Reset the node ID
- XMPPServer.getInstance().setNodeID(null);
- // Reset packet router to use to deliver packets to remote cluster nodes
- XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
- // Reset the session locator to use
- XMPPServer.getInstance().setRemoteSessionLocator(null);
- // Set the old serialization strategy was using before clustering was loaded
- ExternalizableUtil.getInstance().setStrategy(serializationStrategy);
- }
集群的启动、停止两个方法,下面做一个综合分析,主要执行了如下操作:
(1)设置缓存序列化策略,序列化是为了使数据能够在集群之间复制。
设置之前,先对原有的序列化策略做备份
- serializationStrategy = ExternalizableUtil.getInstance().getStrategy();
- ExternalizableUtil.getInstance().setStrategy(new ClusterExternalizableUtil());
在集群停止的时候,重置为原来的策略
- ExternalizableUtil.getInstance().setStrategy(serializationStrategy);
(2)设置远程Session定位器。集群中的每台机器,都只保存了连接到本机的Session实例。当连接到不同机器的两个客户端发生通信时,就需要用定位器从集群中找到对方。
- XMPPServer.getInstance().setRemoteSessionLocator(new RemoteSessionLocator());
在集群停止的时候,置空即可
- XMPPServer.getInstance().setRemoteSessionLocator(null);
(3)添加远程包路由器到路由表中,主要是用于数据同步。
- XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new ClusterPacketRouter());
离开集群时,置空
- XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
(4)根据配置文件,加载Hazelcast的实例
- Config config = new ClasspathXmlConfig(HAZELCAST_CONFIG_FILE);
- config.setInstanceName("openfire");
- config.setClassLoader(loader);
- if (JMXManager.isEnabled() && HAZELCAST_JMX_ENABLED) {
- config.setProperty("hazelcast.jmx", "true");
- config.setProperty("hazelcast.jmx.detailed", "true");
- }
- hazelcast = Hazelcast.newHazelcastInstance(config);
- cluster = hazelcast.getCluster();
(5)设置节点ID号
- XMPPServer.getInstance().setNodeID(NodeID.getInstance(getClusterMemberID()));
(6)设置监听,当集群中状态变化、成员变化时,实现回调
- clusterListener = new ClusterListener(cluster);
- lifecycleListener = hazelcast.getLifecycleService().addLifecycleListener(clusterListener);
- membershipListener = cluster.addMembershipListener(clusterListener);
ClusterListener中实现了MembershipListener,LifecycleListener接口,当收到回调时,会触发集群管理CluterManager更新事件队列events,并进行事件调度、建立集群缓存等工作,以此实现了集群的响应与管理。
对集群响应的流程总体做一个描述
1、初始状态,Openfire系统启动,并加载了集群插件,第一台完成启动的机器,会被Hazelcast标记为master节点,此时的集群环境,与单机没什么差别
2、当Openfire系统陆续完成启动,新的设备陆续加入、移出集群,Hazelcast本身会完成集群内各种数据同步,然后通过ClusterListener会回调到如下两个方法:
- public void memberAdded(MembershipEvent event) {
- .......
- ClusterManager.fireJoinedCluster(StringUtils.getBytes(event.getMember().getUuid()), true);
- ......
- }
- public void memberRemoved(MembershipEvent event) {
- ......
- ClusterManager.fireLeftCluster(nodeID);
- ......
- }
3、CluterManager中的fireJoinedCluster()与fireLeftCluster()方法会触发事件队列的events的更新
4、CluterManager事件调度线程dispatcher中,在事件队列更新时将执行CacheFactory.joinedCluster()或CacheFactory.leftCluster()方法更新缓存数据,并通知其他相关组件更新数据,如SessionManager、RouteTableIpml等
5、当有新的客户端发出登录请求,在资源绑定时针将该客户端的Session信息放入集群缓存队列中,由Hazelcast完成数据同步。
6、当集群内客户端发生通信时,使用RemoteSessionLocator获得对方的session实例,再由路由表完成消息路由。
在第四章《消息路由》中,在路由表中,如果是远程消息,将调用routeToRemoteDomain()方法实现消息路由。
RouteTableImpl.routeToRemoteDomain()方法:
- private boolean routeToRemoteDomain(JID jid, Packet packet,
- boolean routed) {
- byte[] nodeID = serversCache.get(jid.getDomain());
- if (nodeID != null) {
- if (server.getNodeID().equals(nodeID)) {
- // This is a route to a remote server connected from this node
- try {
- localRoutingTable.getRoute(jid.getDomain()).process(packet);
- routed = true;
- } catch (UnauthorizedException e) {
- Log.error("Unable to route packet " + packet.toXML(), e);
- }
- }
- else {
- // This is a route to a remote server connected from other node
- if (remotePacketRouter != null) {
- routed = remotePacketRouter.routePacket(nodeID, jid, packet);
- }
- }
- }
- else {
- // Return a promise of a remote session. This object will queue packets pending
- // to be sent to remote servers
- OutgoingSessionPromise.getInstance().process(packet);
- routed = true;
- }
- return routed;
- }
在集群启动中,设置了ClusterPacketRouter作为路由器RemotePacketRouter,ClusterPacketRouter类:
- public class ClusterPacketRouter implements RemotePacketRouter {
- private static Logger logger = LoggerFactory.getLogger(ClusterPacketRouter.class);
- public boolean routePacket(byte[] nodeID, JID receipient, Packet packet) {
- // Send the packet to the specified node and let the remote node deliver the packet to the recipient
- try {
- CacheFactory.doClusterTask(new RemotePacketExecution(receipient, packet), nodeID);
- return true;
- } catch(IllegalStateException e) {
- logger.warn("Error while routing packet to remote node: " + e);
- return false;
- }
- }
- public void broadcastPacket(Message packet) {
- // Execute the broadcast task across the cluster
- CacheFactory.doClusterTask(new BroadcastMessage(packet));
- }
- }
使用集群中的计算任务,指定一个节点完成消息路由:
- CacheFactory.doClusterTask(new RemotePacketExecution(receipient, packet), nodeID);
而RemotePacketExecution实际是一个线程,其run()方法:
- public void run() {
- XMPPServer.getInstance().getRoutingTable().routePacket(recipient, packet, false);
- }
也就是说,集群中的消息路由,如果通信双方是分处于两台机器上,那么将使用集群将消息指定由对应的主机执行消息路由。
Over!
来源: http://www.cnblogs.com/Fordestiny/p/7694294.html