Pinpoint https://github.com/naver/pinpoint 是用 Java 编写的大型分布式系统的 APM(应用程序性能管理) 工具. 受 Dapper 的启发, Pinpoint 提供了一种解决方案, 通过在分布式应用程序中跟踪事务来帮助分析系统的整体结构以及它们中的组件之间的相互关系.
pinpoint API:
/applications.pinpoint 获取 applications 基本信息
/getAgentList.pinpoint 获取对应 application agent 信息
/getServerMapData.pinpoint 获取对应 App 基本数据流信息
db.py
- import MySQL.connector
- class MyDB(object):
- """docstring for MyDB"""
- def __init__(self, host, user, passwd , db):
- self.host = host
- self.user = user
- self.passwd = passwd
- self.db = db
- self.connect = None
- self.cursor = None
- def db_connect(self):
- """ 数据库连接
- """
- self.connect = MySQL.connector.connect(host=self.host, user=self.user, passwd=self.passwd, database=self.db)
- return self
- def db_cursor(self):
- if self.connect is None:
- self.connect = self.db_connect()
- if not self.connect.is_connected():
- self.connect = self.db_connect()
- self.cursor = self.connect.cursor()
- return self
- def get_rows(self , sql):
- """ 查询数据库结果
- :param sql: SQL 语句
- :param cursor: 数据库游标
- """
- self.cursor.execute(sql)
- return self.cursor.fetchall()
- def db_execute(self, sql):
- self.cursor.execute(sql)
- self.connect.commit()
- def db_close(self):
- """ 关闭数据库连接和游标
- :param connect: 数据库连接实例
- :param cursor: 数据库游标
- """
- if self.connect:
- self.connect.close()
- if self.cursor:
- self.cursor.close()
- pinpoint.py:
- # -*- coding: utf-8 -*-
- '''
- Copyright (c) 2018, mersap
- All rights reserved.
- 摘 要: pinpoint.py
- 创 建 者: mersap
- 创建日期: 2019-01-17
- '''
- import sys
- import requests
- import time
- import datetime
- import JSON
- sys.path.append('../Golf')
- import db #db.py
- PPURL = "https://pinpoint.*******.com"
- From_Time = datetime.datetime.now() + datetime.timedelta(seconds=-60)
- To_Time = datetime.datetime.now()
- From_TimeStamp = int(time.mktime(From_Time.timetuple()))*1000
- To_TimeStamp = int(time.mktime(datetime.datetime.now().timetuple()))*1000
- class PinPoint(object):
- """docstring for PinPoint"""
- def __init__(self, db):
- self.db = db
- super(PinPoint, self).__init__()
- """获取 pinpoint 中应用"""
- def get_applications(self):
- '''return application dict
- ''' applicationListUrl = PPURL +"/applications.pinpoint"
- res = requests.get(applicationListUrl)
- if res.status_code != 200:
- print("请求异常, 请检查")
- return
- applicationLists = []
- for App in res.JSON():
- applicationLists.append(App)
- applicationListDict={}
- applicationListDict["applicationList"] = applicationLists
- return applicationListDict
- def getAgentList(self, appname):
- AgentListUrl = PPURL + "/getAgentList.pinpoint"
- param = {
- 'application':appname
- }
- res = requests.get(AgentListUrl, params=param)
- if res.status_code != 200:
- print("请求异常, 请检查")
- return
- return len(res.JSON().keys()),JSON.dumps(list(res.JSON().keys()))
- def update_servermap(self, appname , from_time=From_TimeStamp,
- to_time=To_TimeStamp, serviceType='SPRING_BOOT'):
- '''更新 app 上下游关系
- :param appname: 应用名称
- :param serviceType: 应用类型
- :param from_time: 起始时间
- :param to_time: 终止时间
- :
- '''
- #https://pinpoint.*****.com/getServerMapData.pinpoint?applicationName=test-App&from=1547721493000&to=1547721553000&callerRange=1&calleeRange=1&serviceTypeName=TOMCAT&_=1547720614229
- param = {
- 'applicationName':appname,
- 'from':from_time,
- 'to':to_time,
- 'callerRange':1,
- 'calleeRange':1,
- 'serviceTypeName':serviceType
- }
- # serverMapUrl = PPURL + "/getServerMapData.pinpoint"
- serverMapUrl = "{}{}".format(PPURL, "/getServerMapData.pinpoint")
- res = requests.get(serverMapUrl, params=param)
- if res.status_code != 200:
- print("请求异常, 请检查")
- return
- update_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
- links = res.JSON()["applicationMapData"]["linkDataArray"]
- for link in links :
- ### 排除 test 的应用
- if link['sourceInfo']['applicationName'].startswith('test'):
- continue
- #应用名称, 应用类型, 下游应用名称, 下游应用类型, 应用节点数, 下游应用节点数, 总请求数, 错误请求数, 慢请求数 (本应用到下一个应用的数量)
- application = link['sourceInfo']['applicationName']
- serviceType = link['sourceInfo']['serviceType']
- to_application = link['targetInfo']['applicationName']
- to_serviceType = link['targetInfo']['serviceType']
- agents = len(link.get('fromAgent',' '))
- to_agents = len(link.get('toAgent',' '))
- totalCount = link['totalCount']
- errorCount = link['errorCount']
- slowCount = link['slowCount']
- sql = """
- REPLACE into application_server_map (application, serviceType,
- agents, to_application, to_serviceType, to_agents, totalCount,
- errorCount,slowCount, update_time, from_time, to_time)
- VALUES ("{}", "{}", {}, "{}", "{}", {}, {}, {}, {},"{}","{}",
- "{}")""".format(
- application, serviceType, agents, to_application,
- to_serviceType, to_agents, totalCount, errorCount,
- slowCount, update_time, From_Time, To_Time)
- self.db.db_execute(sql)
- def update_app(self):
- """ 更新 application
- """
- appdict = self.get_applications()
- apps = appdict.get("applicationList")
- update_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
- for App in apps:
- if App['applicationName'].startswith('test'):
- continue
- agents, agentlists = self.getAgentList(App['applicationName'])
- sql = """
- REPLACE into application_list( application_name,
- service_type, code, agents, agentlists, update_time)
- VALUES ("{}", "{}", {}, {}, '{}', "{}");""".format(
- App['applicationName'], App['serviceType'],
- App['code'], agents, agentlists, update_time)
- self.db.db_execute(sql)
- return True
- def update_all_servermaps(self):
- """ 更新所有应用数
- """
- appdict = self.get_applications()
- apps = appdict.get("applicationList")
- for App in apps:
- self.update_servermap(App['applicationName'], serviceType=App['serviceType'])
- ### 删除 7 天前数据
- Del_Time = datetime.datetime.now() + datetime.timedelta(days=-7)
- sql = """delete from application_server_map where update_time <="{}"""".format(Del_Time)
- self.db.db_execute(sql)
- return True
- def connect_db():
- """ 建立 SQL 连接
- """
- mydb = db.MyDB(
- host="rm-*****.mysql.rds.aliyuncs.com",
- user="user",
- passwd="passwd",
- db="pinpoint"
- )
- mydb.db_connect()
- mydb.db_cursor()
- return mydb
- def main():
- db = connect_db()
- pp = PinPoint(db)
- pp.update_app()
- pp.update_all_servermaps()
- db.db_close()
- if __name__ == '__main__':
- main()
附 sql 语句
- CREATE TABLE `application_list` (
- `application_name` varchar(32) NOT NULL,
- `service_type` varchar(32) DEFAULT NULL COMMENT '服务类型',
- `code` int(11) DEFAULT NULL COMMENT '服务类型代码',
- `agents` int(11) DEFAULT NULL COMMENT 'agent 个数',
- `agentlists` varchar(256) DEFAULT NULL COMMENT 'agent list',
- `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- PRIMARY KEY (`application_name`),
- UNIQUE KEY `Unique_App` (`application_name`) USING BTREE
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='pinpoint app list'
- CREATE TABLE `application_server_map` (
- `application` varchar(32) NOT NULL COMMENT '应用名称',
- `serviceType` varchar(8) NOT NULL,
- `agents` int(2) NOT NULL COMMENT 'agent 个数',
- `to_application` varchar(32) NOT NULL COMMENT '下游服务名称',
- `to_serviceType` varchar(32) DEFAULT NULL COMMENT '下游服务类型',
- `to_agents` int(2) DEFAULT NULL COMMENT '下游服务 agent 数量',
- `totalCount` int(8) DEFAULT NULL COMMENT '总请求数',
- `errorCount` int(8) DEFAULT NULL,
- `slowCount` int(8) DEFAULT NULL,
- `update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
- `from_time` datetime DEFAULT NULL,
- `to_time` datetime DEFAULT NULL,
- PRIMARY KEY (`application`,`to_application`),
- UNIQUE KEY `Unique_AppMap` (`application`,`to_application`) USING BTREE
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='应用链路数据'
来源: https://yq.aliyun.com/articles/690351