spark数据监控实战

转载请注明出处。更多文章请访问 http://bigdataer.net

1.概述

数据准确性,稳定性,时效性是数据开发中需要重点关注的,一般称之为数据质量。保证数据质量往往会占用数据开发工程师的很多精力,所以一个好的数据监控系统或者一个合理的数据监控方案对于数据质量的保证至关重要。本文将展示一种实际生产中使用过的数据监控方案,并给出相关的代码。
数据计算采用spark,报警形式采用邮件报警。涉及到的内容有使用springMVC构建一个支持发送html和文件的邮件接口;在spark计算任意过程中调用邮件接口;在spark中通过邮件接口发送hdfs上的结果数据。

2.架构图

说明:通常情况下公司内部的hadoop/spark集群和外网隔离,直接在spark作业里发送邮件显然不现实。所以需要构建一个邮件发送服务,暴露内网接口给spark作业调用,同时也能访问外网,把邮件发送到用户邮箱中。

3.基于springMVC构建的邮件服务

3.1 设计目标

(1)支持自定义邮件发件人昵称,主题,内容等
(2)支持发送html以及文件

3.2技术方案

springMVC,JavaMail

3.3核心代码

邮件发送工具类EmailSendUtil

java    98行

import java.io.File;
import java.util.Date;
import java.util.Properties;

import javax.activation.CommandMap;
import javax.activation.DataHandler;
import javax.activation.FileDataSource;
import javax.activation.MailcapCommandMap;
import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;

import org.springframework.stereotype.Service;

@Service
public class EmailSendUtil {

public void sendEmail(String nick,String subject,String content,String receivers,File file) throws Exception {

		Properties proper = new Properties();
		proper.setProperty("mail.transport.protocol", "smtp");
		proper.setProperty("mail.stmp.auth", "true");

		//账号密码认证
		Session session = Session.getInstance(proper);
		MimeMessage msg = new MimeMessage(session);

		try {

			MailcapCommandMap mc = (MailcapCommandMap) CommandMap.getDefaultCommandMap();
	        mc.addMailcap("text/html;; x-Java-content-handler=com.sun.mail.handlers.text_html");
	        mc.addMailcap("text/xml;; x-java-content-handler=com.sun.mail.handlers.text_xml");
	        mc.addMailcap("text/plain;; x-java-content-handler=com.sun.mail.handlers.text_plain");
	        mc.addMailcap("multipart/*;; x-java-content-handler=com.sun.mail.handlers.multipart_mixed");
	        mc.addMailcap("message/rfc822;; x-java-content-handler=com.sun.mail.handlers.message_rfc822");
	        CommandMap.setDefaultCommandMap(mc);
			//设置发件人
			String nickname=javax.mail.internet.MimeUtility.encodeText(nick);
			msg.setFrom(new InternetAddress(nickname+"发件人邮箱地址"));
			//设置收件人
			msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(receivers));
			//设置邮件主题
			msg.setSubject(subject);
			MimeMultipart  msgMimePart = new MimeMultipart ("mixed");
			//正文内容
			MimeBodyPart contents = getBodyPart(content);
			msgMimePart.addBodyPart(contents);
			//附件
			if(file!=null){
				MimeBodyPart attachment = getAttachPart(file);
				msgMimePart.addBodyPart(attachment);
			}

			//设置邮件消息体
			msg.setContent(msgMimePart);
			//设置发送时间
			msg.setSentDate(new Date());
			msg.saveChanges();

			Transport trans=session.getTransport();
			trans.connect("smtp.exmail.qq.com", "发件人邮箱地址", "密码");
			trans.sendMessage(msg, msg.getRecipients(Message.RecipientType.TO));
			trans.close();
		} catch (Exception e) {
			throw new Exception("email send error:"+e.getMessage());
		}finally{
			if(file!=null&&file.exists()){
				file.delete();
			}
		}
	}

	private static MimeBodyPart getBodyPart(String content) throws MessagingException{
		MimeBodyPart body = new MimeBodyPart();
		MimeMultipart mmp = new MimeMultipart("related");
		MimeBodyPart contents = new MimeBodyPart();
		contents.setContent(content, "text/html;charset=utf-8");
		mmp.addBodyPart(contents);
		body.setContent(mmp);
		return body;
	}

	private static MimeBodyPart getAttachPart(File file) throws MessagingException{
		MimeBodyPart attach = new MimeBodyPart();
		FileDataSource fds = new FileDataSource(file);
		attach.setDataHandler(new DataHandler(fds));
		attach.setFileName(file.getName());
		return attach;
	}
}

controller类,写的比较粗糙,提供了两个接口,一个发纯html,一个可以发送混合格式的邮件。

java    47行

import java.io.File;

import net.bigdataer.api.weixin.utils.EmailSendUtil;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;

@Controller
@RequestMapping("/email_api")
public class EmailSendController extends DmBaseController{

	@Autowired
	private EmailSendUtil est;

	//只发送html
	@RequestMapping("/send")
	public @ResponseBody String sendEmail(@RequestParam("nickname") String nickname,@RequestParam("subject") String subject,@RequestParam("content") String content,@RequestParam("receivers") String receivers){
		String result = "{\"status\":\"0\",\"msg\":\"success\"}";
		try{
			est.sendEmail(nickname,subject, content, receivers,null);
		}catch(Exception e){
			result = "{\"status\":\"-1\",\"msg\":\""+e.getMessage()+"\"}";
		}
		return result;
	}

	//发送混合格式的邮件
	@RequestMapping("/sendattachment")
	public @ResponseBody String sendAttachment(@RequestParam("nickname") String nickname,@RequestParam("subject") String subject,@RequestParam("content") String content,@RequestParam("receivers") String receivers,@RequestParam("attachment") MultipartFile attachment){
		String result = "{\"status\":\"0\",\"msg\":\"success\"}";
		File file = new File("/opt/soft/tomcat/temp/"+attachment.getOriginalFilename());
		try {
			attachment.transferTo(file);
			est.sendEmail(nickname,subject, content, receivers,file);
		} catch (Exception e) {
			result = "{\"status\":\"-1\",\"msg\":\""+e.getMessage()+"\"}";
		}

		return result;
	}

}

4.spark作业调用邮件接口

4.1封装一个接口调用工具类

这个类提供了对https及http协议的访问,同时支持get和post请求。在本例中没有使用到get请求。
另外这个类依赖于httpclient的相关jar包。我这里使用的jarmaven依赖如下:

xml    11行

 		<dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.5.2</version>
		</dependency>

注意:因为spark源码中也使用了http-core的包,你的引用可能会和spark集群中本身的包冲突导致抛找不到某个类或者没有某个方法的异常,需要根据实际情况调整。不过代码大体上都一样,下面的代码可以参考

java    174行

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpResponseException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.List;

/**
 * 服务请求类
 * 封装了post,get请求
 * 同时支持http和https方式
 * @author liuxuecheng
 *
 */
public class HttpClientUtil { 

	private static final Log logger = LogFactory.getLog(HttpClientUtil.class);
	//设置超时(单位ms)
	private static int TIME_OUT = 3000;
	private static CloseableHttpClient client = null;

	private static TrustManager  trustManager = new X509TrustManager() {
	    @Override
	    public X509Certificate[] getAcceptedIssuers() {
	        return null;
	    }
		@Override
		public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
		}
		@Override
		public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
		}
	};

	static{

		try{
		//请求配置
		RequestConfig config = RequestConfig.custom()
				.setConnectTimeout(TIME_OUT)
				.setConnectionRequestTimeout(TIME_OUT)
				.setSocketTimeout(TIME_OUT)
				.build();

		//访问https站点相关
		SSLContext context = SSLContext.getInstance("TLS");
		context.init(null, new TrustManager[]{trustManager}, null);
		SSLConnectionSocketFactory scsf = new SSLConnectionSocketFactory(context, NoopHostnameVerifier.INSTANCE);
		//注册
		Registry<ConnectionSocketFactory> registry = RegistryBuilder
				.<ConnectionSocketFactory>create()
				.register("http", PlainConnectionSocketFactory.INSTANCE)
				.register("https", scsf)
				.build();

		//连接池
		PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(registry);

		//构造请求client
		client = HttpClients.custom()
				.setConnectionManager(manager)
				.setDefaultRequestConfig(config)
				.build();

		}catch(Exception e){
			e.printStackTrace();
		}
	}

	/**
	 * post方法
	 * post请求涉及到不同contentType,对应不同的HttpEntity
	 * 这里定义HttpEntity接口,所有实现了这个接口的实体均可传入
	 * @param token
	 * @param url
	 * @param entity
	 * @return
	 * @throws ClientProtocolException
	 * @throws IOException
	 */
	public static JSONObject post(String token,String url,HttpEntity entity) throws ClientProtocolException, IOException{
		//UrlEncodedFormEntity stringEntity = new UrlEncodedFormEntity(,"UTF-8");
		HttpPost post = new HttpPost(url);

		if(token !=null){
			post.setHeader("Authorization", "Bearer "+token);
		}
		post.setHeader("Accept-Charset", "UTF-8");
		post.setEntity(entity);

		return client.execute(post, handler);
	}

	/**
	 * get请求
	 * @param token
	 * @param url
	 * @param content_type
	 * @param params
	 * @return
	 * @throws ClientProtocolException
	 * @throws IOException
	 * @throws URISyntaxException
	 */
	public static JSONObject get(String token,String url,String content_type,List<NameValuePair> params) throws ClientProtocolException, IOException, URISyntaxException{
		UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params,"UTF-8");
		entity.setContentType(content_type);

		String para = EntityUtils.toString(entity);

		HttpGet get = new HttpGet(url);
		if(token !=null){
			get.setHeader("Authorization", "Bearer "+token);
		}
		get.setHeader("Accept-Charset", "UTF-8");

		//get请求将参数拼接在参数中
		get.setURI(new URI(get.getURI().toString()+"?"+para));
		return client.execute(get, handler);
	}

	//请求返回格式
	public static ResponseHandler<JSONObject> handler = new ResponseHandler<JSONObject>(){

		@Override
		public JSONObject handleResponse(HttpResponse res) throws ClientProtocolException, IOException {
			StatusLine status = res.getStatusLine();
			HttpEntity entity = res.getEntity();

			if(status.getStatusCode()>300){
				throw new HttpResponseException(status.getStatusCode(),
						status.getReasonPhrase());
			} 

			if(entity==null){
				throw new ClientProtocolException("respones has no content");
			}

			String res_str = EntityUtils.toString(entity);

			return JSONObject.parseObject(res_str);
		}

	};
}

4.2进一步对发送邮件的接口封装

以下为scala代码

scala    51行

import java.io.File
import java.util

import org.apache.http.NameValuePair
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.entity.ContentType
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.entity.mime.content.{FileBody, StringBody}
import org.apache.http.message.BasicNameValuePair
import org.apache.http.util.CharsetUtils

/**
  * Created by liuxuecheng on 2017/1/4.
  */
object EmailSendUtil {

  def sendEmail(nickname: String,subject:String,content:String,receivers:String):Unit={
    val url = "邮件发送接口地址"
    val content_type = "application/x-www-form-urlencoded"

    val params = new util.ArrayList[NameValuePair]()
    params.add(new BasicNameValuePair ("nickname",nickname))
    params.add(new BasicNameValuePair ("subject",subject))
    params.add(new BasicNameValuePair ("content",content))
    params.add(new BasicNameValuePair ("receivers",receivers))
    try{
      val entity = new UrlEncodedFormEntity(params,"UTF-8")
      entity.setContentType(content_type)
      HttpClientUtil.post(null,url,entity)
    }catch{
      case e:Throwable=>e.printStackTrace()
    }
  }

  def sendAttachment(nickname: String,subject:String,content:String,receivers:String,file:File):Unit={
    val url = "邮件发送接口地址"
    val body = new FileBody(file)
    val entity = MultipartEntityBuilder.create()
      .setCharset(CharsetUtils.get("UTF-8"))
      .addPart("attachment",body)
      .addPart("nickname",new StringBody(nickname,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addPart("subject",new StringBody(subject,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addPart("content",new StringBody(content,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addTextBody("receivers",receivers)
      .setContentType(ContentType.MULTIPART_FORM_DATA)
      .build()

    HttpClientUtil.post(null,url,entity)
  }
}

4.3spark读取hdfs文件并创建File对象

以下截取代码片段

scala    7行

 def getHdfsFile(sc:SparkContext,path:String):File = {
 //需要hdfs的全路径,开头一般为hdfs://或者viewfs:// 并且具体到文件名
    val filePath = "viewfs://xx-cluster"+path+"part-00000"
    sc.addFile(filePath)
    new File(SparkFiles.get(new File(filePath).getName))
  }

既然都拿到文件了,发送邮件不就很简单了,调用上面封装好的接口就行。

4.4spark发送html

有时候没必要生成hdfs,计算结果适合报表展示的时候可以直接collect到内存中,然后构建一段html发送,上代码。

scala    17行

	val rdd = group_rdd.groupByKey(200)
      .map(x=>{
        val raw_uv = x._2.flatMap(e=>e._1).toSeq.distinct.size
        val raw_pv = x._2.map(e=>e._2).reduce(_+_)
        val cm_uv = x._2.flatMap(e=>e._3).toSeq.distinct.size
        val cm_pv = x._2.map(e=>e._4).reduce(_+_)
        IndexEntity(x._1._1,x._1._2,x._1._3,raw_uv,raw_pv,cm_uv,cm_pv)
      }).collect().sortBy(_.search_flag).sortBy(_.platform).sortBy(_.bd)

    //模板拼接
    val tbody:StringBuffer = new StringBuffer()
    rdd.foreach(entity=>{
      tbody.append(s"<tr><td>${entity.bd}</td><td>${entity.platform}</td><td>${entity.search_flag}</td>" +
        s"<td>${entity.raw_uv}</td><td>${entity.cm_uv}</td><td>${new DecimalFormat(".00").format((entity.cm_uv.toDouble/entity.raw_uv)*100)}%</td>" +
        s"<td>${entity.raw_pv}</td><td>${entity.cm_pv}</td><td>${new DecimalFormat(".00").format((entity.cm_pv.toDouble/entity.raw_pv)*100)}%</td></tr>")
    })
时间: 2024-10-04 02:56:25

spark数据监控实战的相关文章

Spark 2.x企业级大数据项目实战(实时统计、离线分析和实时ETL)

Spark 2.x企业级大数据项目实战(实时统计.离线分析和实时ETL)全套课程下载:https://pan.baidu.com/s/1mje6bAoLLPrxUIrM-C2VMg 提取码: 9n1x 本门课程来源于一线生产项目, 所有代码都是在现网大数据集群上稳定运行, 拒绝Demo.课程涵盖了离线分析.实时分析绝大部分的场景,通过三个实际生产项目教授如何优雅地集成Hadoop.Spark.HBase.Kafka.Redis.MySQL等相关大数据技术,并实际落地 . 本门课程全程实操,不用担

Spark大型项目实战:电商用户行为分析大数据平台

本项目主要讲解了一套应用于互联网电商企业中,使用Java.Spark等技术开发的大数据统计分析平台,对电商网站的各种用户行为(访问行为.页面跳转行为.购物行为.广告点击行为等)进行复杂的分析.用统计分析出来的数据,辅助公司中的PM(产品经理).数据分析师以及管理人员分析现有产品的情况,并根据用户行为分析结果持续改进产品的设计,以及调整公司的战略和业务.最终达到用大数据技术来帮助提升公司的业绩.营业额以及市场占有率的目标. 1.课程研发环境 开发工具: Eclipse Linux:CentOS 6

中华石杉 Spark大型项目实战:电商用户行为分析大数据平台138讲视频教程(项目实战)

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

Docker 监控实战 教你如何监控 Docker 容器内部

如今,越来越多的公司开始使用 Docker 了,现在来给大家看几组数据: 2 / 3 的公司在尝试了 Docker 后最终使用了它 也就是说 Docker 的转化率达到了 67%,而转化市场也控制在 60 天内. 越大型的公司越早开始使用 Docker 研究发现主机数量越多的公司,越早开始使用 Docker.而主机数量多,在这个研究里就默认等同于是大型公司了. Docker 优势 那为什么 Docker 越来越火呢?一谈起 Docker 总是会跟着让人联想到轻量这个词,甚至会有一种通过 Dock

项目实战——企业级Zabbix监控实战(一)

项目实战--企业级Zabbix监控实战 实验一:Zabbix监控的搭建 1.实验准备 centos系统服务器3台. 一台作为监控服务器, 两台台作为被监控节点, 配置好yum源. 防火墙关闭. 各节点时钟服务同步. 各节点之间可以通过主机名互相通信. 1)所有机器关闭防火墙和selinux iptables -F && setenforing 2)根据架构图,实验基本设置如下: 2.Zabbix的安装 1)更新我们的yum仓库 我们去官网下载一个包zabbix-release-3.4-2.

百度网络监控实战:NetRadar横空出世(上)

原文:https://mp.weixin.qq.com/s/VBShicsqReDtureKAdEgDA 转自订阅号「AIOps智能运维」,已授权运维帮转发 作者简介:运小贝,百度高级研发工程师 负责百度内网质量监测平台(NetRadar)的业务端设计及开发工作.在系统和网络监控.时序指标异常检测.智能客服机器人等方向有广泛实践经验. 干货概览 百度内网连接着数十万台服务器,承载着全公司业务的网络通信,其通信质量的重要性不言而喻.而百度内网的质量监测平台NetRadar(网络雷达),通过对整个内

大数据(实战型)数据分析专家、首席分析师高级视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

程序员入门必备的大数据开发实战系列丛书

想要入行大数据却不知从哪里开始?作为入行十年的码农为大家推荐一套"一站式实战型大数据应用开发学习指导"丛书,帮助读者踏上由开发入门到大数据实战的"互联网+大数据"开发之旅! 此套丛书以实用性.案例丰富见长.由国内知名的IT教育机构课工场创始人肖睿主编,人民邮电出版社出版.编撰此书时为满足企业对人才的技能需求,课工场大数据开发教研团队,通过对数百位BAT一线技术专家进行访谈.上千家企业人力资源情况进行调研.上万上企业招聘岗位进行需求分析,在此基础上,整合了大量案例说明

离线和实时大数据开发实战

离线和实时大数据开发实战 目 录 前言 第一篇 数据大图和数据平台大图 第1章 数据大图 2 1.1 数据流程 2 1.1.1 数据产生 3 1.1.2 数据采集和传输 5 1.1.3 数据存储处理 6 1.1.4 数据应用 7 1.2 数据技术 8 1.2.1 数据采集传输主要技术 9 1.2.2 数据处理主要技术 10 1.2.3 数据存储主要技术 12 1.2.4 数据应用主要技术 13 1.3 数据相关从业者和角色 14 1.3.1 数据平台开发.运维工程师 14 1.3.2 数据开发.