package com.zjs.mic.metrics.stream;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.endpoint.mvc.MetricsMvcEndpoint;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.Assert;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
@EnableScheduling
public class MetriCSStreamTask {private final static Logger log = LoggerFactory.getLogger(MetricsStreamTask.class);
private MessageChannel outboundChannel;
private ServiceInstance registration;
private MetricsStreamProperties properties;
private MetricsMvcEndpoint mme;
// Visible for testing
final LinkedBlockingQueue<String> jsonMetrics;
private final JsonFactory jsonFactory = new JsonFactory();
public MetricsStreamTask(MessageChannel outboundChannel,
ServiceInstance registration, MetricsStreamProperties properties, MetricsMvcEndpoint mme) {
Assert.notNull(outboundChannel, "outboundChannel may not be null");
Assert.notNull(registration, "registration may not be null");
Assert.notNull(properties, "properties may not be null");
Assert.notNull(mme, "properties may not be null");
this.outboundChannel = outboundChannel;
this.registration = registration;
this.properties = properties;
this.jsonMetrics = new LinkedBlockingQueue<>(properties.getSize());
this.mme=mme;
}
// TODO: use integration to split this up?
@Scheduled(fixedRateString = "${metrics.stream.queue.sendRate:1000}")
public void sendMetrics() {
log.info("推送 metrics 信息");
ArrayList<String> metrics = new ArrayList<>();
this.jsonMetrics.drainTo(metrics);
if (!metrics.isEmpty()) {
if (log.isTraceEnabled()) {
log.trace("sending stream Metrics metrics size:" + metrics.size());
}
for (String json : metrics) {
// TODO: batch all metrics to one message
try {
// TODO: remove the explicit content type when s-c-stream can handle
// that for us
this.outboundChannel.send(MessageBuilder.withPayload(json)
.setHeader(MessageHeaders.CONTENT_TYPE,
this.properties.getContentType())
.build());
}
catch (Exception ex) {
if (log.isTraceEnabled()) {
log.trace("failed sending stream Metrics metrics:" + ex.getMessage());
}
}
}
}
}
@Scheduled(fixedRateString = "${metrics.stream.queue.gatherRate:1000}")
public void gatherMetrics() {
log.info("开始获取 metrics 信息");
try {
StringWriter jsonString = new StringWriter();
JsonGenerator json = this.jsonFactory.createGenerator(jsonString);
json.writeStartObject();
json.writeObjectField("instanceId",registration.getServiceId() + ":" + registration.getHost() + ":"
+ registration.getPort());
json.writeObjectField("type", "metrics");
json.writeObjectField("currentTime",System.currentTimeMillis());
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) mme.value(this.properties.getPathTail());
for (String str : map.keySet()) {
json.writeObjectField(str, map.get(str));
}
json.writeEndObject();
json.close();
// output to stream
this.jsonMetrics.add(jsonString.getBuffer().toString());
}
catch (Exception ex) {
log.error("Error adding metrics metrics to queue", ex);
}
}
}
来源: https://www.cnblogs.com/zhyg/p/9377406.html