微服务监控主要分为两部分, 一部分是对微服务本身的监控, 另一方面是对整个调用链的监控. 目前, 我们主要采用 dubbo 作为 rpc 框架, 所以下面重点介绍 dubbo 监控.
1,dubbo 监控
1.1, 原理
dubbo 架构如下:
通过阅读 dubbo 源码, 所有的 rpc 方法调用都会经过 MonitorFilter 进行拦截,
- MonitorFilter.invoke()
- public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
- if (invoker.getUrl().hasParameter("monitor")) {
- RpcContext context = RpcContext.getContext();
- long start = System.currentTimeMillis();
- this.getConcurrent(invoker, invocation).incrementAndGet();
- Result var7;
- try {
- Result result = invoker.invoke(invocation);
- this.collect(invoker, invocation, result, context, start, false);
- var7 = result;
- } catch (RpcException var11) {
- this.collect(invoker, invocation, (Result)null, context, start, true);
- throw var11;
- } finally {
- this.getConcurrent(invoker, invocation).decrementAndGet();
- }
- return var7;
- } else {
- return invoker.invoke(invocation);
- }
- }
对于配置了监控的服务, 会收集一些方法的基本统计信息.
- MonitorFilter.collect()
- private void collect(Invoker<?> invoker, Invocation invocation, Result result, RpcContext context, long start, boolean error) {
- try {
- long elapsed = System.currentTimeMillis() - start;
- int concurrent = this.getConcurrent(invoker, invocation).get();
- String application = invoker.getUrl().getParameter("application");
- String service = invoker.getInterface().getName();
- String method = RpcUtils.getMethodName(invocation);
- URL url = invoker.getUrl().getUrlParameter("monitor");
- Monitor monitor = this.monitorFactory.getMonitor(url);
- int localPort;
- String remoteKey;
- String remoteValue;
- if ("consumer".equals(invoker.getUrl().getParameter("side"))) {
- context = RpcContext.getContext();
- localPort = 0;
- remoteKey = "provider";
- remoteValue = invoker.getUrl().getAddress();
- } else {
- localPort = invoker.getUrl().getPort();
- remoteKey = "consumer";
- remoteValue = context.getRemoteHost();
- }
- String input = "";
- String output = "";
- if (invocation.getAttachment("input") != null) {
- input = invocation.getAttachment("input");
- }
- if (result != null && result.getAttachment("output") != null) {
- output = result.getAttachment("output");
- }
- monitor.collect(new URL("count", NetUtils.getLocalHost(), localPort, service + "/" + method, new String[]{"application", application, "interface", service, "method", method, remoteKey, remoteValue, error ? "failure" : "success", "1", "elapsed", String.valueOf(elapsed), "concurrent", String.valueOf(concurrent), "input", input, "output", output}));
- } catch (Throwable var21) {
- logger.error("Failed to monitor count service" + invoker.getUrl() + ", cause:" + var21.getMessage(), var21);
- }
- }
DubboMonitor 对收集到的数据进行简单统计, 诸如成功次数, 失败次数, 调用时间等, 统计完后存储数据到本地.
- DubboMonitor.collect()
- public void collect(URL url) {
- int success = url.getParameter("success", 0);
- int failure = url.getParameter("failure", 0);
- int input = url.getParameter("input", 0);
- int output = url.getParameter("output", 0);
- int elapsed = url.getParameter("elapsed", 0);
- int concurrent = url.getParameter("concurrent", 0);
- Statistics statistics = new Statistics(url);
- AtomicReference<long[]> reference = (AtomicReference)this.statisticsMap.get(statistics);
- if (reference == null) {
- this.statisticsMap.putIfAbsent(statistics, new AtomicReference());
- reference = (AtomicReference)this.statisticsMap.get(statistics);
- }
- long[] update = new long[10];
- long[] current;
- do {
- current = (long[])reference.get();
- if (current == null) {
- update[0] = (long)success;
- update[1] = (long)failure;
- update[2] = (long)input;
- update[3] = (long)output;
- update[4] = (long)elapsed;
- update[5] = (long)concurrent;
- update[6] = (long)input;
- update[7] = (long)output;
- update[8] = (long)elapsed;
- update[9] = (long)concurrent;
- } else {
- update[0] = current[0] + (long)success;
- update[1] = current[1] + (long)failure;
- update[2] = current[2] + (long)input;
- update[3] = current[3] + (long)output;
- update[4] = current[4] + (long)elapsed;
- update[5] = (current[5] + (long)concurrent) / 2L;
- update[6] = current[6]> (long)input ? current[6] : (long)input;
- update[7] = current[7]> (long)output ? current[7] : (long)output;
- update[8] = current[8]> (long)elapsed ? current[8] : (long)elapsed;
- update[9] = current[9]> (long)concurrent ? current[9] : (long)concurrent;
- }
- } while(!reference.compareAndSet(current, update));
- }
DubboMonitor 有异步线程定时 (默认每分钟) 将收集到数据发送到远端监控服务.
- public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {
- this.monitorInvoker = monitorInvoker;
- this.monitorService = monitorService;
- this.monitorInterval = (long)monitorInvoker.getUrl().getPositiveParameter("interval", 60000);
- this.sendFuture = this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
- public void run() {
- try {
- DubboMonitor.this.send();
- } catch (Throwable var2) {
- DubboMonitor.logger.error("Unexpected error occur at send statistic, cause:" + var2.getMessage(), var2);
- }
- }
- }, this.monitorInterval, this.monitorInterval, TimeUnit.MILLISECONDS);
- }
调用远端的 MonitorService.collect 方法, 然后将本地缓存数据置置零.
- DubboMonitor.send()
- public void send() {
- if (logger.isInfoEnabled()) {
- logger.info("Send statistics to monitor" + this.getUrl());
- }
- String timestamp = String.valueOf(System.currentTimeMillis());
- Iterator i$ = this.statisticsMap.entrySet().iterator();
- while(i$.hasNext()) {
- Entry<Statistics, AtomicReference<long[]>> entry = (Entry)i$.next();
- Statistics statistics = (Statistics)entry.getKey();
- AtomicReference<long[]> reference = (AtomicReference)entry.getValue();
- long[] numbers = (long[])reference.get();
- long success = numbers[0];
- long failure = numbers[1];
- long input = numbers[2];
- long output = numbers[3];
- long elapsed = numbers[4];
- long concurrent = numbers[5];
- long maxInput = numbers[6];
- long maxOutput = numbers[7];
- long maxElapsed = numbers[8];
- long maxConcurrent = numbers[9];
- URL url = statistics.getUrl().addParameters(new String[]{"timestamp", timestamp, "success", String.valueOf(success), "failure", String.valueOf(failure), "input", String.valueOf(input), "output", String.valueOf(output), "elapsed", String.valueOf(elapsed), "concurrent", String.valueOf(concurrent), "max.input", String.valueOf(maxInput), "max.output", String.valueOf(maxOutput), "max.elapsed", String.valueOf(maxElapsed), "max.concurrent", String.valueOf(maxConcurrent)});
- this.monitorService.collect(url);
- long[] update = new long[10];
- while(true) {
- long[] current = (long[])reference.get();
- if (current == null) {
- update[0] = 0L;
- update[1] = 0L;
- update[2] = 0L;
- update[3] = 0L;
- update[4] = 0L;
- update[5] = 0L;
- } else {
- update[0] = current[0] - success;
- update[1] = current[1] - failure;
- update[2] = current[2] - input;
- update[3] = current[3] - output;
- update[4] = current[4] - elapsed;
- update[5] = current[5] - concurrent;
- }
- if (reference.compareAndSet(current, update)) {
- break;
- }
- }
- }
- }
dubbo 监控的主流开源项目, 都是实现了 MonitorService 接口来实现监控, 区别无非就是数据存储, 报表统计逻辑的差异, 基本原理都大同小异.
- public interface MonitorService {
- String APPLICATION = "application";
- String INTERFACE = "interface";
- String METHOD = "method";
- String GROUP = "group";
- String VERSION = "version";
- String CONSUMER = "consumer";
- String PROVIDER = "provider";
- String TIMESTAMP = "timestamp";
- String SUCCESS = "success";
- String FAILURE = "failure";
- String INPUT = "input";
- String OUTPUT = "output";
- String ELAPSED = "elapsed";
- String CONCURRENT = "concurrent";
- String MAX_INPUT = "max.input";
- String MAX_OUTPUT = "max.output";
- String MAX_ELAPSED = "max.elapsed";
- String MAX_CONCURRENT = "max.concurrent";
- void collect(URL var1);
- List<URL> lookup(URL var1);
- }
- dubbo-monitor
- dubbo-d-monitor
- dubbokeeper
- dubbo-monitor-simple
- archive-tmp
- MongoDB-dubbokeeper-server
- MongoDB-dubbokeeper-ui
- MongoDB-dubbokeeper-server.tar.gz
- dubbo.application.name=MongoDB-monitor
- dubbo.application.owner=bieber
- dubbo.registry.address=zookeeper://*.*.*.*:2181?backup=*.*.*.*:2181,*.*.*.*:2181
- dubbo.protocol.name=dubbo
- dubbo.protocol.port=20884
- dubbo.protocol.dubbo.payload=20971520
- #dubbo 数据采集周期 单位毫秒
- monitor.collect.interval=60000
- #use netty4
- dubbo.provider.transporter=netty4
- #dubbokeeper 写入 MongoDB 周期 单位秒
- monitor.write.interval=60
- #mongdb 配置
- dubbo.monitor.MongoDB.url=localhost
- dubbo.monitor.MongoDB.port=27017
- dubbo.monitor.MongoDB.dbname=dubbokeeper
- dubbo.monitor.MongoDB.username=
- dubbo.monitor.MongoDB.password=
- dubbo.monitor.MongoDB.storage.timeout=60000
- import pymongo
- from pymongo import MongoClient
- import time
- import datetime
- import sys
- import os
- client = MongoClient('127.0.0.1', 27017)
- db = client['dubbokeeper']
- collectionlist = db.collection_names()
- for collection in collectionlist:
- if collection!='application':
- db[collection].ensure_index([("timestamp",pymongo.DESCENDING)])
- db[collection].ensure_index([("serviceInterface",pymongo.DESCENDING)])
- db[collection].ensure_index([("method",pymongo.DESCENDING)])
- db[collection].ensure_index([("serviceInterface",pymongo.DESCENDING),("method",pymongo.DESCENDING),("timestamp",pymongo.DESCENDING)])
- db[collection].ensure_index([("serviceInterface",pymongo.DESCENDING),("timestamp",pymongo.DESCENDING)])
- db[collection].ensure_index([("concurrent",pymongo.DESCENDING),("timestamp",pymongo.DESCENDING)])
- db[collection].ensure_index([("elapsed",pymongo.DESCENDING),("timestamp",pymongo.DESCENDING)])
- db[collection].ensure_index([("failureCount",pymongo.DESCENDING),("timestamp",pymongo.DESCENDING)])
- db[collection].ensure_index([("successCount",pymongo.DESCENDING),("timestamp",pymongo.DESCENDING)])
- db[collection].ensure_index([("serviceInterface",pymongo.DESCENDING),("elapsed",pymongo.DESCENDING),("timestamp",pymongo.DESCENDING)])
- db[collection].ensure_index([("serviceInterface",pymongo.DESCENDING),("concurrent",pymongo.DESCENDING),("timestamp",pymongo.DESCENDING)])
- db[collection].ensure_index([("serviceInterface",pymongo.DESCENDING),("failureCount",pymongo.DESCENDING),("timestamp",pymongo.DESCENDING)])
- db[collection].ensure_index([("serviceInterface",pymongo.DESCENDING),("successCount",pymongo.DESCENDING),("timestamp",pymongo.DESCENDING)])
- print 'success'
- import pymongo
- from pymongo import MongoClient
- import time
- import datetime
- import sys
- import os
- day=int(sys.argv[1])
- print day
- timestamp = time.time()*1000-1000*24*3600*day
- print timestamp
- client = MongoClient('127.0.0.1', 27017)
- db = client['dubbokeeper']
- collectionlist = db.collection_names()
- for collection in collectionlist:
- if collection!='application':
- db[collection].remove({"timestamp": {"$lt": timestamp}})
- print 'clean mongodb data success'
来源: https://juejin.im/post/5bf39739f265da615b711362