1.环境
jdk : 1.8
scala : 2.11.7
hadoop:2.7
spark : 2.2.0
2. 开发工具
idea 2017.2
3.maven的pom文件
<dependencies>
<!-- https://mvnrepository.com/artifact/com.sun/tools -->
<!-- https://mvnrepository.com/artifact/org.apache.maven/maven-core -->
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-core</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
4.sparkStreaming通过本地的socket端口解析日志
package test02 import org.apache.spark.SparkConfimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.StreamingContext object demo5 { def main (args : Array[String]) { val conf = new SparkConf().setMaster("local[4]").setAppName("SaprkApp") val ssc = new StreamingContext(conf, Seconds(10)) val lines = ssc.socketTextStream("localhost", 9999) //测试数据 : 2017-07-18 11:02:52.032 INFO 172.31.20.232:56965 [email protected]@172.31.33.81 PUBLIC_USER_LOGIN(0202)-0 create session for 18583551/[email protected]@172.31.33.81 val loginRDD = lines.map(line =>{ val pattern = """^(\S+\s\d+\:\d+\:\d+\.\d{3})\s+\S+\s+(\d+\.\d+.\d+\.\d+):\d+\S+\s+(\d+)@(\S+)@\S+\s+PUBLIC_USER_LOGIN\(0202\)-0\s+create session for.*""".r var login_time = "" var ip_address = "" var passport_id = "" var session_id = "" var scene_name = "login" pattern.findAllIn(line).matchData foreach{m =>{ login_time = m.group(1) ip_address = m.group(2) passport_id = m.group(3) session_id = m.group(4) println(login_time) println(ip_address) println(passport_id) println(session_id) }} (login_time,ip_address,passport_id,session_id,scene_name) }) val loginRes = loginRDD.filter(_._1 != "").filter(_._2 != "") loginRes.print() //测试数据 : 2017-07-18 11:03:44.312 INFO 0.0.0.0:18402 [email protected]@172.31.32.135 ADMIN_SYSTEM_SUCCESS(00FE)-84581 -> USER_DISCONNECTED Time cost 9.09ms val logoutRDD = lines.map(line =>{ val pattern = """^(\S+\s\d+\:\d+\:\d+\.\d{3})\s+\S+\s+(\d+\.\d+\.\d+\.\d+):\d+\S+\s+(\d+)@(\S+)@.*(disconnected|DISCONNECTED).*""".r var login_id = "" var ip = "" var passport_id = "" var str = "" var scene_name = "logout" pattern.findAllIn(line).matchData foreach{m =>{ login_id = m.group(1) ip = m.group(2) passport_id = m.group(3) str = m.group(4) println(login_id) println(ip) println(passport_id) println(str) }} (login_id,ip,passport_id,str) }) val logoutRes = logoutRDD.filter(_._1 != "").filter(_._2 != "") logoutRes.print() logoutRes.saveAsTextFiles("/Users/huiliyang/streaming/aa") //测试数据 : 2017-08-27 06:04:38.420 [info] <0.3471.83> 172.31.2.201:59154 70281275 PUBLIC_SERVER_CLIENT_LOG(258)-0 LovelyStreet:1228 val eventRDD = lines.map(line =>{ val pattern = """^(\S+\s\d+\:\d+\:\d+\.\d{3})\s+\[info\]\s<[\d\.]*>\s?(\d+\.\d+\.\d+.\d+):\d+\S+\s+\S?([1-9]\d{7})(@\[email protected]\d+\.\d+\.\d+\.\d+)?\S?\s+PUBLIC_(SERVER|SYSTEM)_CLIENT\S+\s(\S+):(\d+)""".r var login_time = "" var ip_address = "" var passport_id = "" var session_id = "" var str1 = "" var str2 = "" var str3 = "" pattern.findAllIn(line).matchData foreach{m =>{ login_time = m.group(1) ip_address = m.group(2) passport_id = m.group(3) session_id = m.group(4) str1 = m.group(5) str2 = m.group(6) str3 = m.group(7) println(login_time) println(ip_address) println(passport_id) println(session_id) println(str1) println(str2) println(str3) }} (login_time,ip_address,passport_id,session_id,str1,str2,str3) }) val eventRes = eventRDD.filter(_._1 != "").filter(_._2 != "") eventRes.print() ssc.start() ssc.awaitTermination() }}