简单介绍
flink 内部实现了一套 metric 数据收集库. 同时 flink 自身系统有一些固定的 metric 数据, 包括系统的一些指标, CPU, 内存, IO 或者各个 task 运行的一些指标. 具体包含那些指标可以查看官方文档: flink-metric
同时我们也可以利用系统的 metric 库在自己的代码中进行打点收集 metrics 数据. 此外, flink 提供了外部接口, 可以用来导出这些 metrics 数据.
flink-metric 库的使用
在官方的文档中有介绍, 需要继承 Richfunction 才能获得对应的 metric 对象, 用法如下:
- public class MyMapper extends RichMapFunction<String, String> {
- private transient Counter counter;
- @Override
- public void open(Configuration config) {
- this.counter = getRuntimeContext()
- .getMetricGroup()
- .counter("myCounter");
- }
- @Override
- public String map(String value) throws Exception {
- this.counter.inc();
- return value;
- }
- }
flink-metrics 导出到外部系统
在 flink 中, 提供了方便的 metric 数据导出的库, 通过实现自己的 reporter, 可以将 metrics 数据导出到不同的系统.
官方提供有多种 reporter 库, JMX, Graphite, Slf4j... 等等. 同时, 我们可以自定义实现 metric 库, 来导入到自己的系统.
自定义 reporter 类
实现 MetricReporter 类中的 open,close, notifyOfAddedMetric, notifyOfRemovedMetric 方法
实现 Scheduled 的 report 方法 , 在刚方法中实现写入到其他系统的逻辑
实现 CharacterFilter 的 filterCharacters 方法, 用于对 scope 进行过滤.
- public class FalconReporter implements MetricReporter, CharacterFilter, Scheduled {
- private static final Logger LOG =LoggerFactory.getLogger(FalconReporter.class);
- private final Map<Gauge<?>, MetricTag> gauges = new ConcurrentHashMap<>();
- private final Map<Counter, MetricTag> counters = new ConcurrentHashMap<>();
- private final Map<Histogram, MetricTag> histograms = new ConcurrentHashMap<>();
- private final Map<Meter, MetricTag> meters = new ConcurrentHashMap<>();
- @Override
- public String filterCharacters(String s) {
- return s;
- }
- @Override
- public void open(MetricConfig metricConfig) {
- }
- @Override
- public void close() {
- }
- @Override
- public void notifyOfAddedMetric(Metric metric, String s, MetricGroup metricGroup) {
- }
- @Override
- public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
- }
- @Override
- public void report() {
- }
- }
配置 reporter
在 flink-conf.YAML 中配置即可, 配置如下
- metrics.reporters: slf4j, jmx
- metrics.reporter.slf4j.class: org.apache.flink.metrics.falcon.FalconReporter
- metrics.reporter.slf4j.interval: 60 SECONDS
metrics.reporters 用于配置类型名, 自定义即可
metrics.reporter.slf4j.class: 配置对应类型的 reporter 类
metrics.reporter.slf4j.interval: 60 SECONDS 消息上报的间隔
metrics.reporter.slf4j.* 可以自定义配置, 可以在 open(MetricConfig metricConfig) 中的获得对应的 config
来源: https://www.cnblogs.com/0x12345678/p/10561039.html