Java Kafka 消费积压监控
Java Kafka 消费积压监控
后端代码:
Monitor.java代码:


package com.suncreate.kafkaConsumerMonitor.service;
import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* kafka消费监控
*
* @author suxiang
*/
public class Monitor {
private static final Logger log = LoggerFactory.getLogger(Monitor.class);
private String servers;
private String topic;
private String groupId;
private long lastTime;
private long lastTotalLag = 0L;
private long lastLogSize = 0L;
private long lastOffset = 0L;
private double lastRatio = 0;
private long speedLogSize = 0L;
private long speedOffset = 0L;
private String time;
private List<ConsumerInfo> list;
private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public long getLastTotalLag() {
return lastTotalLag;
}
public double getLastRatio() {
return lastRatio;
}
public String getTopic() {
return topic;
}
public String getGroupId() {
return groupId;
}
public long getSpeedLogSize() {
return speedLogSize;
}
public long getSpeedOffset() {
return speedOffset;
}
public List<ConsumerInfo> getList() {
return list;
}
public void setList(List<ConsumerInfo> list) {
this.list = list;
}
private KafkaConsumer<String, String> consumer;
private List<TopicPartition> topicPartitionList;
private final DecimalFormat decimalFormat = new DecimalFormat("0.00");
public Monitor(String servers, String topic, String groupId) {
this.servers = servers;
this.topic = topic;
this.groupId = groupId;
this.list = new ArrayList<>();
//消费者
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumer = new KafkaConsumer<String, String>(properties);
//查询 topic partitions
topicPartitionList = new ArrayList<>();
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfoList) {
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
topicPartitionList.add(topicPartition);
}
}
public void monitor(boolean addToList) {
try {
long startTime = System.currentTimeMillis();
//查询 log size
Map<Integer, Long> endOffsetMap = new HashMap<>();
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitionList);
for (TopicPartition partitionInfo : endOffsets.keySet()) {
endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
}
//查询消费 offset
Map<Integer, Long> commitOffsetMap = new HashMap<>();
for (TopicPartition topicAndPartition : topicPartitionList) {
OffsetAndMetadata committed = consumer.committed(topicAndPartition);
commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
}
long endTime = System.currentTimeMillis();
log.info("查询logSize和offset耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");
startTime = System.currentTimeMillis();
//累加lag
long totalLag = 0L;
long logSize = 0L;
long offset = 0L;
if (endOffsetMap.size() == commitOffsetMap.size()) {
for (Integer partition : endOffsetMap.keySet()) {
long endOffset = endOffsetMap.get(partition);
long commitOffset = commitOffsetMap.get(partition);
long diffOffset = endOffset - commitOffset;
totalLag += diffOffset;
logSize += endOffset;
offset += commitOffset;
}
} else {
log.error("Topic:" + topic + " consumer:" + consumer + " topic partitions lost");
}
log.info("Topic:" + topic + " logSize:" + logSize + " offset:" + offset + " totalLag:" + totalLag);
if (lastTime > 0) {
if (System.currentTimeMillis() - lastTime > 0) {
speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0));
speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0));
}
if (speedLogSize > 0) {
String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0));
lastRatio = Double.parseDouble(strRatio);
log.info("Topic:" + topic + " speedLogSize:" + speedLogSize + " speedOffset:" + speedOffset + " 百分比:" + strRatio + "%");
}
}
lastTime = System.currentTimeMillis();
lastTotalLag = totalLag;
lastLogSize = logSize;
lastOffset = offset;
endTime = System.currentTimeMillis();
log.info("计算耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");
if (addToList) {
this.setTime(simpleDateFormat.format(new Date()));
this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime()));
if (this.list.size() > 500) {
this.list.remove(0);
}
}
} catch (Exception e) {
log.error("Monitor error", e);
}
}
}


