最近在做压测引擎相关的开发, 需要将聚合数据发送到 InfluxDB 保存以便实时分析和控制 QPS.
下面介绍对 InfluxDB 的使用.
什么是 InfluxDB
InfluxDB 是一款用 Go 语言编写的开源分布式时序, 事件和指标数据库, 无需外部依赖. 该数据库现在主要用于存储涉及大量的时间戳数据, 如 DevOps 监控数据, App metrics, loT 传感器数据和实时分析数据.
InfluxDB 特征:
无结构 (无模式): 可以是任意数量的列 (tags).
可以设置 metric 的保存时间.
支持与时间有关的相关函数 (如 min,max,sum,count,mean,median 等), 方便统计.
支持存储策略: 可以用于数据的删改 (influxDB 没有提供数据的删除与修改方法).
支持连续查询: 是数据库中自动定时启动的一组语句, 和存储策略搭配可以降低 InfluxDB 的系统占用量.
原生的 HTTP 支持, 内置 HTTP API.
支持类似 SQL 语法.
支持设置数据在集群中的副本数.
支持定期采样数据, 写入另外的 measurement, 方便分粒度存储数据.
自带 web 管理界面, 方便使用 (登入方式: http://<InfluxDB-IP>:8083).
支持 Grafana 画图展示.
PS: 有了 InfluxDB+Grafana 后, 你就可以写一些简单的程序了, 可以只负责写后端逻辑部分, 数据都可以存入 InfluxDB, 然后通过 Grafana 展示出来.
Mac 安装 InfluxDB
- # 安装
- brew install influxdb
- # 启动
- influxd -config /usr/local/etc/influxdb.conf
- # 查看 influxdb 运行配置
- influxd config
- # 启动客户端
- influx -precision rfc3339
InfluxDB 开启 UDP 配置
VIM /usr/local/etc/influxdb.conf
开启 udp 配置, 其他为默认值
- [[udp]]
- enabled = true
udp 配置含义:
[[udp]] - udp 配置
enabled: 是否启用该模块, 默认值: false.
bind-address: 绑定地址, 默认值:":8089″.
database: 数据库名称, 默认值:"udp".
retention-policy: 存储策略, 无默认值.
batch-size: 默认值: 5000.
batch-pending: 默认值: 10.
read-buffer:udp 读取 buffer 的大小, 0 表示使用操作系统提供的值, 如果超过操作系统的默认配置则会出错. 该配置的默认值: 0.
batch-timeout: 超时时间, 默认值:"1s".
precision: 时间精度, 无默认值.
Java 发送 UDP 数据报
我们知道 InfluxDB 是支持 Http 的, 为什么我们还要采用 UDP 方式发送数据呢?
基于下列原因:
TCP 数据传输慢, UDP 数据传输快.
网络带宽需求较小, 而实时性要求高.
InfluxDB 和服务器在同机房, 发生数据丢包的可能性较小, 即使真的发生丢包, 对整个请求流量的收集影响也较小.
我们采用了 worker 线程调用 addMetric 方法将数据存储到缓存 map 中, send 线程池来进行每个指定时间发送数据到 Influxdb.
代码如下 (也可参考 Jmeter 的 UdpMetriCSSender 类):
- @Slf4j
- public class InfluxDBClient implements Runnable {
- private String measurement = "example";
- private final Object lock = new Object();
- private InetAddress hostAddress;
- private int udpPort;
- private volatile Map<String, List<Response>> metrics = new HashMap<>();
- private long time;
- private String transaction;
- public InfluxDBClient(String influxdbUrl, String transaction) {
- this.transaction = transaction;
- try {
- log.debug("Setting up with url:{}", influxdbUrl);
- String[] urlComponents = influxdbUrl.split(":");
- if (urlComponents.length == 2) {
- hostAddress = InetAddress.getByName(urlComponents[0]);
- udpPort = Integer.parseInt(urlComponents[1]);
- } else {
- throw new IllegalArgumentException("InfluxDBClient url'" + influxdbUrl + "'is wrong. The format shoule be <host/ip>:<port>");
- }
- } catch (Exception e) {
- throw new IllegalArgumentException("InfluxDBClient url'" + influxdbUrl + "'is wrong. The format shoule be <host/ip>:<port>", e);
- }
- }
- public void addMetric(Response response) {
- synchronized (lock) {
- if (metrics.containsKey(response.getLabel())) {
- metrics.get(response.getLabel()).add(response);
- } else {
- metrics.put(response.getLabel(), new ArrayList<>(Collections.singletonList(response)));
- }
- }
- }
- @Override
- public void run() {
- sendMetrics();
- }
- private void sendMetrics() {
- Map<String, List<Response>> tempMetrics;
- // 复制数据到 tempMetrics, 清空原来 metrics 并初始化上次的大小
- synchronized (lock) {
- if (isEmpty(metrics)) {
- return;
- }
- time = System.currentTimeMillis();
- tempMetrics = metrics;
- metrics = new HashMap<>();
- for (Map.Entry<String, List<Response>> entry : tempMetrics.entrySet()) {
- metrics.put(entry.getKey(), new ArrayList<>(entry.getValue().size()));
- }
- }
- final Map<String, List<Response>> copyMetrics = tempMetrics;
- final List<MetricTuple> aggregateMetrics = aggregate(copyMetrics);
- StringBuilder sb = new StringBuilder(aggregateMetrics.size() * 200);
- // 发送 tempMetrics, 生成一行数据, 然后换行
- for (MetricTuple metric : aggregateMetrics) {
- sb.append(metric.getMeasurement()).append(metric.getTag()).append(" ")
- .append(metric.getField()).append("").append(metric.getTimestamp() +"000000").append("\n");
- }
- //udp 发送数据到 Influxdb
- try (DatagramSocket ds = new DatagramSocket()) {
- byte[] buf = sb.toString().getBytes();
- DatagramPacket dp = new DatagramPacket(buf, buf.length, this.hostAddress, this.udpPort);
- ds.send(dp);
- log.debug("send {} to influxdb", sb.toString());
- } catch (SocketException e) {
- log.error("Cannot open udp port!", e);
- } catch (IOException e) {
- log.error("Error in transferring udp package", e);
- }
- }
- /**
- * 得到聚合数据
- *
- * @param metrics
- * @return
- */
- private List<MetricTuple> aggregate(Map<String, List<Response>> metrics) {
- }
- public boolean isEmpty(Map<String, List<Response>> map) {
- for (Map.Entry<String, List<Response>> entry : map.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- return false;
- }
- }
- return true;
- }
- }
参考文档:
InfluxDB 中文文档
玩转时序数据库 InfluxDB http://www.ywnds.com/?p=10763
来源: https://www.cnblogs.com/morethink/p/9693561.html