一般我们常见的 RPC 框架都包含如下三个部分:
此外不多描述, 还没研究 raft
启动 consul 之后访问管理页面
提取出服务注册与服务发现两个接口,然后使用 Consul 实现,这里主要通过 consul-client 来实现(也可以是 consul-api),需要在 pom 中引入:
- <dependency>
- <groupId>
- com.orbitz.consul
- </groupId>
- <artifactId>
- consul-client
- </artifactId>
- <version>
- 0.14.1
- </version>
- </dependency>
- public interface RegistryService{void register(RpcURL url);void unregister(RpcURL url);
- }
- public class AbstractConsulService{private static final Loggerlogger= LoggerFactory.getLogger(AbstractConsulService.class);protected final static String CONSUL_NAME="consul_node_jim";protected final static String CONSUL_ID="consul_node_id";protected final static String CONSUL_TAGS="v3";protected final static String CONSUL_HEALTH_INTERVAL="1s";protected Consul buildConsul(String registryHost,int registryPort){return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build();
- }
- }
服务的删除暂时未实现
- public class ConsulRegistryService extends AbstractConsulService implements RegistryService{private final static int CONSUL_CONNECT_PERIOD=1*1000;@Override
- public void register(RpcURL url) {Consulconsul= this.buildConsul(url.getRegistryHost(),url.getRegistryPort());AgentClientagent=consul.agentClient();ImmutableRegCheckcheck= ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();ImmutableRegistration.Builderbuilder= ImmutableRegistration.builder();
- builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check);
- agent.register(builder.build());
- }@Override
- public void unregister(RpcURL url) {
- }
- }
由于我实现的 RPC 是基于 TCP 的,所以服务注册的健康检查也指定为 TCP,consul 会按指定的 IP 以及端口建立连接以此判断服务的健康状态。如果是 http,则需要调用 http 方法,同时指定健康检查地址。
- ImmutableRegCheckcheck= ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();
后台的监控信息如下:
虽然只是指定了 TCP, 可能出于某种机制后台依然会发起 HTTP 的健康检查请求, 上图第一条请求日志。
- public interface DiscoveryService{List<RpcURL> getUrls(String registryHost,int registryPort);
- }
- List<RpcURL>urls= Lists.newArrayList();Consulconsul= this.buildConsul(registryHost,registryPort);HealthClientclient=consul.healthClient();Stringname= CONSUL_NAME;ConsulResponseobject=client.getAllServiceInstances(name);List<ImmutableServiceHealth>serviceHealths=(List<ImmutableServiceHealth>)object.getResponse();for(ImmutableServiceHealthserviceHealth:serviceHealths){RpcURLurl=new RpcURL();
- url.setHost(serviceHealth.getService().getAddress());
- url.setPort(serviceHealth.getService().getPort());
- urls.add(url);
- }
服务更新监听,当可用服务列表发现变化时需要通知调用端。
- try{ServiceHealthCacheserviceHealthCache= ServiceHealthCache.newCache(client, name);
- serviceHealthCache.addListener(new ConsulCache.Listener<ServiceHealthKey,ServiceHealth>() {@Override
- public void notify(Map<ServiceHealthKey,ServiceHealth> map) {
- logger.info("serviceHealthCache.addListener notify");RpcClientInvokerCache.clear();
- }
- });
- serviceHealthCache.start();
- }catch(Exceptione) {
- logger.info("serviceHealthCache.start error:",e);
- }
由于之前对客户端的 Invoker 有缓存,所以当服务列表有变化时需要对缓存信息进行更新。
这里简单的直接对缓存做清除处理,其实好一点的方法应该只对有变化的做处理。
- public class RpcClientInvokerCache{private static CopyOnWriteArrayList<RpcClientInvoker>connectedHandlers= new CopyOnWriteArrayList<>();public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlersClone(){return(CopyOnWriteArrayList<RpcClientInvoker>)RpcClientInvokerCache.getConnectedHandlers().clone();
- }public static void addHandler(RpcClientInvoker handler) {CopyOnWriteArrayList<RpcClientInvoker>newHandlers=getConnectedHandlersClone();
- newHandlers.add(handler);
- connectedHandlers=newHandlers;
- }public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlers(){returnconnectedHandlers;
- }public static RpcClientInvoker get(int i){returnconnectedHandlers.get(i);
- }public static int size(){returnconnectedHandlers.size();
- }public static void clear(){CopyOnWriteArrayList<RpcClientInvoker>newHandlers=getConnectedHandlersClone();
- newHandlers.clear();
- connectedHandlers=newHandlers;
- }
- }
代码中取服务列表的方法有小问题,未按接口信息取,后续再完成
- public class RoundRobinLoadbalanceService implements LoadbalanceService{private AtomicIntegerroundRobin= new AtomicInteger(0);private static final int MAX_VALUE=1000;private static final int MIN_VALUE=1;private AtomicInteger getRoundRobinValue(){if(this.roundRobin.getAndAdd(1)>MAX_VALUE){this.roundRobin.set(MIN_VALUE);
- }return this.roundRobin;
- }@Override
- public int index(int size) {return(this.getRoundRobinValue().get()+size)%size;
- }
- }
https://github.com/jiangmin168168/jim-framework
来源: http://www.cnblogs.com/ASPNET2008/p/6892137.html