写在前面
- 因为本地电脑没装flume,nginx各种。所以之前写Streaming程序的时候,都是打包了放到集群上跑。就算我在程序代码里不停地logger,调试起来也hin不方便。
- 于是本地写了两个程序,在intellj调试。
- 主要就是包括两个程序:
- 一个是GenerateChar.scala用来向某个指定端口,使用socket发消息;
- 另一个就是要测试的Streaming程序了。
GenerateChar
package com.wttttt.spark import java.io.PrintWriter import java.net.ServerSocket /** * Created with IntelliJ IDEA. * Description: * Author: wttttt * Github: https://github.com/wttttt-wang/hadoop_inaction * Date: 2017-05-19 * Time: 10:19 */ object GenerateChar { def main(args: Array[String]) { val listener = new ServerSocket(9998) while(true){ val socket = listener.accept() new Thread(){ override def run() = { println("Got client connected from :"+ socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream,true) while(true){ Thread.sleep(3000) val context1 = "GET /result.html?Input=test1 HTTP/1.1" println(context1) val context2 = "GET /result.html?Input=test2 HTTP/1.1" println(context2) val context3 = "GET /result.html?Input=test3 HTTP/1.1" println(context3) out.write(context1 + ‘\n‘ + context2 + "\n" + context2 + "\n" + context3 + "\n" + context3 + "\n" + context3 + "\n" + context3 + "\n") out.flush() } socket.close() } }.start() } } }
- 要发送的数据就根据需要自定义。
streaming
- streaming这边就是要调试的程序啦。
- 一方面是,Mater设置成local[x],x > 1,因为这里需要receivers来接收数据。
- 另一方面,设置一个本地checkpoint目录
val conf = new SparkConf() .setMaster("local[2]") .setAppName("LocalTest") // WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, // otherwise Spark jobs will not get resources to process the received data. val sc = new StreamingContext(conf, Milliseconds(5000)) sc.checkpoint("flumeCheckpoint/")
- 测试的时候就各种打log,做输出啦,hin方便哒
时间: 2024-10-12 09:01:18