【sparkStreaming】kafka作为数据源的生产和消费

1.建立生产者发送数据

(1)配置zookeeper属性信息props

(2)通过 new KafkaProducer[KeyType,ValueType](props) 建立producer

(3)通过 new ProducerRecord[KeyType,ValueType](topic,key,value) 封装消息message

(4)通过 producer.send(message) 发送消息

package SparkDemo

import java.util
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
object KafkaProducer {
    def main(args:Array[String]): Unit ={
        if(args.length<4){
            //参数
            //<metadataBrokerList> broker地址
            //<topic> topic名称
            //<messagesPerSec> 每秒产生的消息
            //<wordsPerMessage> 每条消息包括的单词数
            System.err.println("Usage:KafkaProducer <metadataBrokerList> <topic> <messagesPerSec> <wordsPerMessage>")
            System.exit(1)
        }
        val Array(brokers,topic,messagesPerSec,wordsPerMessage) = args
        //zookeeper连接属性
        val props = new util.HashMap[String,Object]();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
        //通过zookeeper建立kafka的producer
        val producer = new KafkaProducer[String,String](props)
        //通过producer发送一些消息
        while(true){
            (1 to messagesPerSec.toInt).foreach{//遍历[1, messagesPerSec.toInt]
                messageNum =>
                    val str = (1 to wordsPerMessage.toInt).map(
                        x => scala.util.Random.nextInt(10).toString
                    ).mkString(" ")//连成字符串用空格隔开
                    println(str)
                    //注意,我们这里发送的消息都是以键值对的形式发送的
                    //需要把消息内容和topic封装到ProducerRecord中再发送
                    //我们这里的topic为外部的传参,消息的键值对为<null,str>
                    val message = new ProducerRecord[String,String](topic,null,str)
                    //发送消息
                    producer.send(message)
            }
            Thread.sleep(1000)//休眠一秒钟
        }
    }
}

我们把程序打包好,提交到spark集群中执行

最后四个为我们要传入的程序参数

我们定义在localhost:9092的名字为wordsender的topic会以每秒3条,每条5个单词往外发送数据

2.建立消费者消费数据

(1)建立sparkStream ssc

(2)配置zookeeper地址 zkQuorum

(3)设置topic所在组名 group

(4)将topic配置成 Map<topicName,numThreads> 的 topicMap<topic名称,所需线程数> 的形式

(5)通过 KafkaUtils.createStream(ssc,zkQuorum,group,topicMap) 建立sparkStream-kafka的流通道

(6)sparkStream处理

package SparkDemo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaConsumer {
    def main(args:Array[String]): Unit ={
        //设置日志等级
        StreamingLoggingExample.setStreamingLogLevels()
        //建立spark流
        val conf = new SparkConf().setAppName("KafkaConsumerDemo").setMaster("local")
        val ssc = new StreamingContext(conf,Seconds(10))
        //设置检查点
        ssc.checkpoint("file:///  or  hdfs:///")
        //zookeeper
        val zkQuorum = "localhost:2181" //zookeeper服务器地址
        //topic所发放的组名
        val group = "1" //topic 所在的组名,可以设置为任意名字
        //topic配置
        val topics = "wordsender" //topic 名称,可以为多个topic,多个之间用逗号隔开 “topic1,topic2”
        //建立topicMap<topicName,numThreads.toInt>  key为topic名称,value为所需要的线程数
        val topicMap = topics.split(",").map((_,1)).toMap //numThreads.toInt为所需线程数
        //建立spark流
        val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
        //处理spark流
        val lines = lineMap.map(_._2)//上面传过来的数据为<null,string>,我们去后边的value
        val pair = lines.flatMap(_.split(" ")).map((_,1))
        val wordCount = pair.reduceByKey(_+_)
        wordCount.print
        //启动spark流
        ssc.start()
        ssc.awaitTermination()
    }

}

然后我们将程序打包提交到集群上运行,就可以对上面我们建立的kafka生产的消息进行消费了。

原文地址:https://www.cnblogs.com/zzhangyuhang/p/9071161.html

时间: 2024-11-05 23:36:29

【sparkStreaming】kafka作为数据源的生产和消费的相关文章

Python 基于Python结合pykafka实现kafka生产及消费速率&amp;主题分区偏移实时监控

基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控   By: 授客 QQ:1033553122   1.测试环境 python 3.4 zookeeper-3.4.13.tar.gz 下载地址1: http://zookeeper.apache.org/releases.html#download https://www.apache.org/dyn/closer.cgi/zookeeper/ https://mirrors.tuna.tsinghua.edu

Kafka 使用Java实现数据的生产和消费demo

前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. Kafka 有如下特性: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能. 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输. 支持Kafka Serv

JAVA代码之RocketMQ生产和消费数据

一.启动RocketMQ [[email protected] ~]# cat /etc/hosts # Do not remove the following line, or various programs # that require network functionality will fail. 127.0.0.1               localhost.localdomain localhost ::1             localhost6.localdomain6

spring整合kafka项目生产和消费测试结果记录(一)

使用spring+springMVC+mybatis+kafka做了两个web项目,一个是生产者,一个是消费者. 通过JMeter测试工具模拟100个用户并发访问生产者项目,发送json数据给生产者的接口,生产者将json数据发送到kafka集群, 消费者监听到kafka集群中的消息就开始消费,并将json解析成对象存到MySQL数据库. 下面是使用JMeter测试工具模拟100个并发的线程设置截图: 请求所发送的数据: 下面是100个用户10000个请求的聚合报告: 下面是生产者截图生产完10

程序实现kafka 生产和消费

生产端程序 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import scala.util.Randomimport java.utilobject KafkaProducer { def main(args: Array[String]): Unit = { //kafka-console-producer.sh --broker-list master:9092

Kafka使用总结与生产消费Demo实现

什么是kafka Kafka官网自己的介绍是:一个可支持分布式的流平台.kafka官网介绍 kafka三个关键能力: 1.发布订阅记录流,类似于消息队列与企业信息系统 2.以容错的持久方式存储记录流 3.对流进行处理 kafka通常应用再两大类应用中: 1.构建实时流数据管道,在系统或应用程序之间可靠地获取数据 2.构建转换或响应数据流的实时流应用程序 kafka的一些基本概念: 1.Kafka作为一个集群运行在一个或多个服务器上,这些服务器可以跨越多个数据中心. 2.Kafka集群将记录流存储

SparkStreaming之基本数据源输入

输入DStreams表示从数据源获取的原始数据流.Spark Streaming拥有两类数据源 (1)基本源(Basic sources):这些源在StreamingContext API中直接可用.例如文件系统.套接字连接. Akka的actor等. (2)高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等. 1.基本数据源输入源码 SparkStream 对于外部的数据输入源,一共有下面几种: (1)用户自定义的数据源:recei

Spark-Streaming kafka count 案例

Streaming 统计来自 kafka 的数据,这里涉及到的比较,kafka 的数据是使用从 flume 获取到的,这里相当于一个小的案例. 1. 启动 kafka Spark-Streaming hdfs count 案例 2. 启动 flume flume-ng agent -c conf -f conf/kafka_test.conf -n a1 -Dflume.root.logger=INFO,console flume 配置文件如下 # Name the components on

多线程,生产者消费者模型(生产馒头,消费馒头)

先建立一个容器 /** * 容器 * 共享资源 * @author Administrator * */ public class SynStack { int index = 0; //容器 SteamBread[] stb = new SteamBread[6]; /** * 往容器中放产品 */ public synchronized void push(SteamBread st){ while(index == stb.length){ try { this.wait(); } cat