监控主动轮询进程是否会崩溃跪掉,采取措施是通过心跳包形式进行抓取,定时生成文件,jnotify监听发到kafka上,之后通过消费者进行解析,若发现不符规则情况,发邮件短信报警。
Java代码
- package heartbeat.monitor;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
- import org.apache.commons.io.IOUtils;
- import heartbeat.monitor.sendmessage.SendMailUtil;
- import heartbeat.monitor.util.HeartBeatMonitorConstant;
- /**
- * @author Switching
- * @version 1.0, 2017-01-01
- * @since heartbeat.monitor 1.0
- */
- public class HeartBeatMonitorAccess {
- private List<HeartBeatMonitor> consumers;
- private static String heartbeatConsumerProperties;
- public HeartBeatMonitorAccess() throws IOException {
- Properties properties = new Properties();
- if (heartbeatConsumerProperties == null) {
- properties.load(ClassLoader.getSystemResourceAsStream("heartBeatMonitor.properties"));
- } else {
- String propurl = heartbeatConsumerProperties;
- InputStream in = new FileInputStream(propurl);
- properties.load(in);
- IOUtils.closeQuietly(in);
- }
- int num = 1;
- String topic = properties.getProperty(HeartBeatMonitorConstant.MONITOR_TOPIC);
- consumers = new ArrayList<HeartBeatMonitor>(num);
- for (int i = 0; i < num; i++) {
- consumers.add(new HeartBeatMonitor(properties, topic));
- new SendMailUtil(properties, topic);
- }
- }
- public void exeute() throws InterruptedException, ExecutionException {
- for (HeartBeatMonitor consumer : consumers) {
- new Thread(consumer).start();
- }
- }
- public static void main(String[] args) throws Exception {
- if (args.length > 0)
- heartbeatConsumerProperties = args[0];
- HeartBeatMonitorAccess consumerGroup = new HeartBeatMonitorAccess();
- consumerGroup.exeute();
- }
- }
消费者抓取相应的规则进行解析判断
后台轮询消费kafka中消息
Java代码
- package heartbeat.monitor;
- import java.io.IOException;
- import java.text.DateFormat;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.Collections;
- import java.util.Date;
- import java.util.Map;
- import java.util.Properties;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.codehaus.jackson.map.ObjectMapper;
- import heartbeat.monitor.sendmessage.SendMailUtil;
- import heartbeat.monitor.util.HeartBeatMonitorConstant;
- import heartbeat.monitor.util.MessageUtil;
- /**
- * @author Switching
- * @version 1.0, 2017-01-01
- * @since heartbeat.monitor 1.0
- */
- public class HeartBeatMonitor implements Runnable {
- private final KafkaConsumer<String, String> consumer;
- public static String lastMonitorTime;
- public static String lastMonitorInfo;
- public static DateFormat dfMonitorTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- private long timeMonitor = 20000;
- private long maxTriesTime = 3;
- private String monitorKeyWord = "FILE_WRITTEN_CLOSED";
- private String monitorLogLvls = "EXCEPTION";
- public HeartBeatMonitor(Properties properties, String topic) throws IOException {
- if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_OFFSET_TIME)))
- timeMonitor = Long.parseLong(properties.getProperty(HeartBeatMonitorConstant.MONITOR_OFFSET_TIME));
- if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_TRIES)))
- maxTriesTime = Long.parseLong(properties.getProperty(HeartBeatMonitorConstant.MONITOR_TRIES));
- if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_KEY_WORD)))
- monitorKeyWord = properties.getProperty(HeartBeatMonitorConstant.MONITOR_KEY_WORD);
- if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_LOG_LVLS)))
- monitorLogLvls = properties.getProperty(HeartBeatMonitorConstant.MONITOR_LOG_LVLS);
- consumer = new KafkaConsumer<String, String>(properties);
- consumer.subscribe(Collections.singletonList(topic));
- }
- public void close() {
- consumer.close();
- }
- public void run() {
- long triesTime = maxTriesTime - 1;
- System.out.println("Heartbeat Monitor Start!");
- if (lastMonitorTime == null) {
- resetTime();
- }
- String name = Thread.currentThread().getName();
- while (true) {
- Date dtMonitor;
- Date dtLocal = new Date();
- ConsumerRecords<String, String> records = consumer.poll(2);
- for (ConsumerRecord<String, String> record : records) {
- ObjectMapper objectMapper = new ObjectMapper();
- try {
- Map<String, String> recordMap = objectMapper.readValue(record.value(), Map.class);
- if (recordMap.get(HeartBeatMonitorConstant.FILE_EVENT).toString().equals(monitorKeyWord)) {
- lastMonitorTime = recordMap.get(HeartBeatMonitorConstant.FILE_TIME).toString();
- lastMonitorInfo = recordMap.get(HeartBeatMonitorConstant.SFTP_HOST_NAME) + ":"
- + recordMap.get(HeartBeatMonitorConstant.SFTP_HOST_IP);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- System.out.println(name + "---" + record.partition() + ":" + record.offset() + " = " + record.key()
- + ":" + record.value());
- }
- try {
- dtMonitor = dfMonitorTime.parse(lastMonitorTime);
- if ((dtLocal.getTime() - timeMonitor) > dtMonitor.getTime()) {
- if (triesTime > 0) {
- triesTime--;
- } else {
- resetTime();
- triesTime = maxTriesTime - 1;
- }
- if (HeartBeatMonitorConstant.LOG_LVL_EXCEP.equals(monitorLogLvls)
- || HeartBeatMonitorConstant.LOG_LVL_ALL.equals(monitorLogLvls)) {
- String content = "[" + dtLocal + "] " + lastMonitorInfo
- + " Exception! manual intervention please! " + (triesTime + 1) + " times! \n\t\t\t\tNow("
- + dtLocal + ") is beyond " + timeMonitor / 1000 + "(s) lastMonitorTime(" + dtMonitor
- + ").";
- System.out.println(content);
- if (triesTime == 0) {
- SendMailUtil.sendMailAccess(lastMonitorInfo, content);
- }
- }
- } else {
- if (HeartBeatMonitorConstant.LOG_LVL_NOEXCEP.equals(monitorLogLvls)
- || HeartBeatMonitorConstant.LOG_LVL_ALL.equals(monitorLogLvls)) {
- System.out.println("[" + dtLocal + "] " + name + "NoException! \n\t\t\t\tNow(" + dtLocal
- + ") is Range " + timeMonitor / 1000 + " (s) lastMonitorTime(" + dtMonitor + ").");
- }
- }
- } catch (ParseException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- public void resetTime() {
- lastMonitorTime = dfMonitorTime.format(new Date());
- if (lastMonitorInfo == null) {
- lastMonitorInfo = "Monitor";
- }
- }
- }
通过配置文件形式,进行各种事件的监控
Java代码
- #Created by Switching
- auto.commit.interval.ms=1000
- auto.offset.reset=earliest
- bootstrap.servers=192.168.102.10\:9092
- enable.auto.commit=true
- group.id=monitor1
- key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- monitorTopic=monitor
- value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- #heartBeat monitor exception notify times(3)
- monitorTries=3
- #heartBeat monitor offset time(20000)(ms)
- monitorOffsetTime=20000
- #heartBeat monitor log lvls(EXCEPTION):ALL|EXCEPTION|NOEXCEPTION|NONE
- monitorLogLvls=EXCEPTION
- #heartBeat Monitor mailInfo
- sendEmailAccount=xxxx@jdongtech.com
- sendEmailPassword=xxxxx
- sendEmailSMTPHost=smtp.xxxx.com
- receEMailAccount=xxx@jdongtech.com
时间: 2024-10-21 11:33:29