sparkstreaming+kafka

生产者

import java.util.HashMap
import org.apache.kafka.clients.producer._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object spark_kafka_wordcount_producer {
    def main(args: Array[String]) {
       val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_first", "3")
       val props = new HashMap[String, Object]()
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")

      val producer = new KafkaProducer[String, String](props)

      while(true) {
          val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
                .mkString(" ")
          val message = new ProducerRecord[String, String](topic, null, str)
          producer.send(message)

          Thread.sleep(1000)
      }
    }
}

消费者

import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object spark_kafka_wordcount_customer {
   def main(args: Array[String]) {
        val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_first")
        val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
        val ssc =  new StreamingContext(sparkConf, Seconds(1))
        ssc.checkpoint("checkpoint")

        val topicpMap = topics.split(",").map((_,2)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
        val words = lines.flatMap(_.split(" "))
        val pairs = words.map(word => (word, 1))
        val wordCounts = pairs.reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}
时间: 2024-12-06 10:26:43

sparkstreaming+kafka的相关文章

Spark-Streaming kafka count 案例

Streaming 统计来自 kafka 的数据,这里涉及到的比较,kafka 的数据是使用从 flume 获取到的,这里相当于一个小的案例. 1. 启动 kafka Spark-Streaming hdfs count 案例 2. 启动 flume flume-ng agent -c conf -f conf/kafka_test.conf -n a1 -Dflume.root.logger=INFO,console flume 配置文件如下 # Name the components on

SparkStreaming与Kafka整合遇到的问题及解决方案

前言 最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志. 实现 Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式. 一. 基于Receiver方式

Spark实战(一)SparkStreaming集成Kafka

Spark Streaming + Kafka集成指南 Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用.请选择正确的包, 请注意,0.8集成与后来的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容. 注意:从Spark 2.3.0开始,不推荐使用Kafka 0.8支持. Spark Streaming从Kafka接收数据,转换为spark streaming中的数据结构Dstream.数据接收方式有两种 :1

Kafka集成SparkStreaming

Spark Streaming + Kafka集成指南 Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用.请选择正确的包, 请注意,0.8集成与后来的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容. 注意:从Spark 2.3.0开始,不推荐使用Kafka 0.8支持. Spark Streaming从Kafka接收数据,转换为spark streaming中的数据结构Dstream.数据接收方式有两种 :1

spark streaming kafka

SparkStreaming+Kafka •kafka是什么,有哪些特点 •SparkStreaming+Kafka有什么好处 –解耦 –缓冲 消息列队的特点 生产者消费者模式 •可靠性保证 –自己不丢数据 –消费者不丢数据:“至少一次,严格一次” broker n. 经纪人,掮客 vt. 以中间人等身分安排... vi. 作为权力经纪人进行谈判 kafka部署 node2,3,4 基于zookeeper 启动 三台 zookeeper /opt/sxt/zookeeper-3.4.6/bin/

大数据-spark理论(3)sparkSql,sparkStreaming,spark调优

导读目录 第一节:sparksql 1:简介 2:核心 3:与hive整合 4:dataFrame 5:函数 第二节:spark Streaming 1:对比strom 2:DStream的算子 3:代码 4:driver HA 5:读取数据 第三节:spark调优 第一节:sparksql (1)简介: Shark:shark是sparksql的前身,hive是shark的前身 快的原因:不仅是内存,还有谓词下移(减少一定量的数据IO) 正常 谓词下移 (先关联表在切割) (先将表中的字段过滤

王家林 大数据Spark超经典视频链接全集[转]

压缩过的大数据Spark蘑菇云行动前置课程视频百度云分享链接 链接:http://pan.baidu.com/s/1cFqjQu SCALA专辑 Scala深入浅出经典视频 链接:http://pan.baidu.com/s/1i4Gh3Xb 密码:25jc DT大数据梦工厂大数据spark蘑菇云Scala语言全集(持续更新中) http://www.tudou.com/plcover/rd3LTMjBpZA/ 1 Spark视频王家林第1课:大数据时代的“黄金”语言Scala 2 Spark视

spark优化

优化一般考虑资源优化 一.资源优化 I 集群方面的:driver的内存,worker内存,核数 方法 1.配置文件:spark-env.sh(配置worker的信息) SPARK_WORKER_CORE 每个worker的使用总核数 SPARK_WORKER_MEMORY 每个worker所使用的内存数(shuffer阶段利用netty 传输文件还会使用到的executor堆外内存也在其中) SPARK_WORKER_INSTANCE 每台节点上启动的worker数量(standalone集群上

Spark_总结五

Spark_总结五 1.Storm 和 SparkStreaming区别 Storm                      纯实时的流式处理,来一条数据就立即进行处理 SparkStreaming 微批处理,每次处理的都是一批非常小的数据 Storm支持动态调整并行度(动态的资源分配),SparkStreaming(粗粒度, 比较消耗资源) Storm 优点 || 缺点 Storm 流式计算(扶梯) 优点:数据延迟度很低,Storm的事务机制要比SparkStreaming的事务机制要完善(