Mapreduce任务实现邮件监控

Mapreduce任务实现邮件监控


这里主要使用Java自带邮件类实现Mapreduce任务的监控,如果Mapreduce任务报错则发送报错邮件。Mapreduce的报错信息通过hdfs中的日志获取,里面的报错日志是json格式,这里讲json转换成xml格式发送到邮件。具体代码如下

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.URI;
import java.util.Properties;
import java.util.StringTokenizer;

import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import net.sf.json.xml.XMLSerializer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;

public class Email {

    private static final String USERNAME = "[email protected]";//发送邮件的用户名
    private static final String PASSWORD = "123456789";//发送邮件的用户名对应的密码
    private static final String EMAIL_HOST = "smtp.qq.com";//邮件服务器host

    public static void main(String args[]) {
        try {
            sendEmail("测试邮件", "测试邮件内容!", "[email protected]");
            System.out.println("email ok !");
        } catch (MessagingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * @category 发送邮件方法,该方法实现发送Mapreduce任务报错信息,具体的报错信息通过hdfs的报错日志获取
     * @param to 目标邮箱(可以多个邮箱,用,号隔开)
     * @param job 通过mapreduce的job获取jobID
     * @param time 通过时间戳访问错误日志路径
     * @throws Exception
     */
    public static void sendErrMail(String to, Job job, String time)
            throws Exception {
        String subject = job.getJobName();
        String message = getErr(job, time);
        LoginMail lm = new LoginMail(USERNAME, PASSWORD);
        // 创建session
        Properties props = new Properties();
        props.put("mail.smtp.auth", "true");
        props.put("mail.smtp.host", EMAIL_HOST);
        Session session = Session.getDefaultInstance(props, lm);

        // 创建 message
        Message msg = new MimeMessage(session);

        // 设置发送源地址
        msg.setFrom(new InternetAddress(USERNAME));

        // 多用户分解
        StringTokenizer st = new StringTokenizer(to, ",");
        String[] recipients = new String[st.countTokens()];
        int rc = 0;
        while (st.hasMoreTokens())
            recipients[rc++] = st.nextToken();
        InternetAddress[] addressTo = new InternetAddress[recipients.length];
        for (int i = 0; i < recipients.length; i++) {
            addressTo[i] = new InternetAddress(recipients[i]);
        }
        msg.setRecipients(Message.RecipientType.TO, addressTo);

        // 设置邮件主题并发送邮件
        msg.setSubject(subject);
        msg.setContent(message, "text/html;charset=utf-8");
        Transport.send(msg);
    }

    /**
     * @category 自定义主题内容发送,这里的邮件内容不一定是Mapreduce的,可以任意填写
     * @param subject 主题
     * @param body 内容
     * @param to 目标邮箱
     * @throws MessagingException
     */
    public static void sendEmail(String subject, String body, String to)
            throws MessagingException {
        LoginMail lm = new LoginMail(USERNAME, PASSWORD);
        // 创建session
        Properties props = new Properties();
        props.put("mail.smtp.auth", "true");
        props.put("mail.smtp.host", EMAIL_HOST);
        Session session = Session.getDefaultInstance(props, lm);

        // 创建 message
        Message msg = new MimeMessage(session);

        // 设置发送源地址
        msg.setFrom(new InternetAddress(USERNAME));

        // 多用户分解
        StringTokenizer st = new StringTokenizer(to, ",");
        String[] recipients = new String[st.countTokens()];
        int rc = 0;
        while (st.hasMoreTokens())
            recipients[rc++] = st.nextToken();
        InternetAddress[] addressTo = new InternetAddress[recipients.length];
        for (int i = 0; i < recipients.length; i++) {
            addressTo[i] = new InternetAddress(recipients[i]);
        }
        msg.setRecipients(Message.RecipientType.TO, addressTo);

        // 设置邮件主题并发送邮件
        msg.setSubject(subject);
        msg.setContent(body, "text/html;charset=utf-8");
        Transport.send(msg);

    }

    /**
     * @category 获取日志文件
     * @param job
     * @param time
     * @return FSDataInputStream
     * @throws IOException
     */
    public static FSDataInputStream getFile(Job job, String time)
            throws IOException {
        String year = time.substring(0, 4);
        String month = time.substring(4, 6);
        String day = time.substring(6, 8);
        String dst = "hdfs://192.168.1.100:9000/tmp/hadoop-yarn/staging/history/done/"
                + year + "/" + month + "/" + day + "/000000";
        FileSystem fs = FileSystem.get(URI.create(dst), new Configuration());
        FileStatus[] status = fs.listStatus(new Path(dst));
        FSDataInputStream in = null;
        for (int i = 0; i < status.length; i++) {
            if (status[i].getPath().getName()
                    .contains(job.getJobID().toString())
                    && status[i].getPath().getName().endsWith("jhist")) {
                in = new FSDataInputStream(fs.open(status[i].getPath()));
            }
        }
        return in;
    }

    /**
     * @category 解析文件类容为xml
     * @param job
     * @param time
     * @return xml
     * @throws IOException
     * @throws InterruptedException
     */
    public static String getErr(Job job, String time) throws IOException,
            InterruptedException {
        FSDataInputStream in = getFile(job, time);
        Thread t1 = new Thread();
        while (in == null) {
            t1.sleep(20000);//由于hdfs每个job的日志不是实时生成,所以需要每隔20秒检查一次hdfs该job日志是否已生成
            t1.join();
            in = getFile(job, time);
        }
        BufferedReader br = new BufferedReader(new InputStreamReader(in));

        String line = "";
        JSONObject jo;
        JSONArray jsa = new JSONArray();
        String xml = "";
        XMLSerializer xmlSerializer = new XMLSerializer();
        while ((line = br.readLine()) != null) {
            if (line.toUpperCase().indexOf("error".toUpperCase()) > -1) {
                jo = JSONObject.fromObject(line);
                jsa.add(jo);
            }
        }
        xml = xmlSerializer.write(jsa);
        in.close();
        br.close();
        return xml;

    }

    /**
     * @category 获取try-catch中的异常内容
     * @param e Exception
     * @return 异常内容
     */
    public static String getException(Exception e) {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        PrintStream pout = new PrintStream(out);
        e.printStackTrace(pout);
        String ret = new String(out.toByteArray());
        pout.close();
        try {
            out.close();
        } catch (Exception ex) {
        }
        return ret;
    }
}

class LoginMail extends Authenticator {

    private String username;
    private String password;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    protected PasswordAuthentication getPasswordAuthentication() {
        return new PasswordAuthentication(username, password);
    }

    public LoginMail(String username, String password) {
        this.username = username;
        this.password = password;
    }
}
时间: 2024-11-24 13:38:19

Mapreduce任务实现邮件监控的相关文章

邮件监控存储卷空间的脚本

邮件监控存储卷空间的脚本: 说明: 1.显示存储名.ip.卷名.total空间.free空间.1天用空间.已用百分比 2.对卷名字符数的统计(echo aa1 | wc -m) 3.对卷名部分的排除,只保留数值部分(/usr/lib64/nagios/plugins/check-netapp-ng2.pl -H 10.0.0.16 -C public -T DISKUSED -v /vol/$eNas/ -w 80 -c 90 | awk -F[:" "]+ '{print $5}'

局域网监控软件——邮件监控

随着互联网的飞速发展,电子邮件的使用越来越普遍,不仅成为企业内部的沟通桥梁,也是企业和外部进行各类业务往来的重要管道,为保护商业秘密不通过邮件外泄,成为企业的一件大事,邮件监控也就越来越重要. 比起传统信件,E-MAIL速度快,还更有效率,在许多企业,电子邮件已渐渐具备正式公文的性质,企业档案资料的管理已不仅限于各类纸张文挡,也包括各类来往的电子邮件.企业对电子邮件进行监控和备份,可对企业对内对外的档案进行更有效的管理. 方便的E-MAIL,也可能被员工当作有意或无意泄漏机密的主要管道.据调查美

用C#写个邮件监控服务(一)

监控服务,首先当然是个服务了.至于什么是windows服务,这里就不多说了.正题 1. 创建服务项目 打开VS编程环境,在C#中创建windows服务项目 2.创建后属性中更改名称和服务名. 3.增加一个定时器 (这里的timer控件一定要是 System.Timers命名空间下的) 4. 增加安装 在设计页面点右键增加安装,之后你会看到以下的样子,并分别进行设定. 注意设定你的显示信息和服务名称,不是控件名. 同时也要设定StartType,我设为自动,这样一开机就会自动启用. 注意使用Loc

zabbix邮件监控配置

参考博客:http://blog.chinaunix.net/uid/26118446.html  小钻风                  http://www.iyunv.com/thread-61736-1-1.html 运维网论坛帖子 搭建好zabbix监控服务系统后,就是增加邮件报警功能,还是费了点时间,因为先是能发送到163邮箱,不能发送到QQ邮箱,后又是以附件的形式发送邮件,而不是直接显示其报警内容: 用163做的测试: 使用外部邮箱账号发送报警邮件设置 一.关闭sendmail或

zabbix之本地邮件监控报警安装详解

文记: 写这篇博文之前的时候我接到了一个不幸的消息,跟我一起的一位同事要走,准备换工作了,于是,我的第一反应,我的蜜月期结束了,即将开始一个被蹂躏的时代,心里默默的祝愿他找一份15K的工作,又默默的想说对他说一句,你真JB坑.... 哎~! 我即将跳进一个无底洞,默默的被蹂躏,默默的被践踏...也许这也是一个新的开始.... 开始正文,估计他会默默的陪伴我这仅有的半个月时间,为此,这半个月,可能都会以zabbix 为主,如不出意外,每天一博.在半月后的某天,我估计我能出书了   <<zabbi

基于HBase的MapReduce实现大量邮件信息统计分析

一:概述 在大多数情况下,如果使用MapReduce进行batch处理,文件一般是存储在HDFS上的,但这里有个很重要的场景不能忽视,那就是对于大量的小文件的处理(此处小文件没有确切的定义,一般指文件大小比较小,比如5M以内的文件),而HDFS的文件块一般是64M,这将会影响到HDFS的性能,因为小文件过多,那么NameNode需要保存的文件元信息将占用更多的空间,加大NameNode的负载进而影响性能,假如对于每个文件,在NameNode中保存的元数据大小是100字节,那么1千万这样的小文件,

上网行为管理软件如何监控客户端方式收发的邮件内容?

客户端邮件是指用邮件客户端来收发邮件. 邮件客户端一般采用SMTP.POP3和IMAP协议,随着SSL加密的广泛应用,后来又发展了SSL加密的邮件收发. 配置客户端时,如果勾选了"此链接需要SSL加密"或者"STARTTLS"的传输方式,都意外着该链接已经被加密.目前,SSL加密邮件已经得到了广泛的应用. 在"WFilter上网行为管理软件"中,无需配置即可监控到不加密的客户端邮件. 1. 在"所有在线"中点击"邮件

监控,你为啥总是断断续续发邮件?

前言: 一般服务器都会搭建监控,既然监控有了,那么报警肯定是必不可少了.监控不管是cacti,nagios,zabbx等,报警不管飞信,微信,邮件,短信.只要适合管理员,帮助其及时掌握服务器的状态,那肯定能省不少的事情.笔者用的是邮件监控,而且是异地的.然后在qq里设置了代理报警.也就是邮件可以收到,只要qq或者微信都在线,也可以收到,用起来算是方便了. 问题: 事情并没有想象中的那么好啊!!!最近总是断断续续发邮件,特别郁闷.然而,今天花了一上午来排查.居然是一个可笑的问题导致的.网络带宽!

通过Powershell 来监控华为无线设备

############################################ #Author:Lixiaosong #Email;[email protected] #For:监控无线AP运行状态 #Version:1.0 2015年6月1日 ############################################ $aps=0,1,2,3,4,5,6,7,12,13,14,15 $aplist = @() foreach($ap in $aps){ $secpass