程序实现kafka 生产和消费

生产端程序

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.util.Random
import java.util
object KafkaProducer {
  def main(args: Array[String]): Unit = {
    //kafka-console-producer.sh --broker-list master:9092,master:9093 -topic mykafka2
    val brokers="master:9092,master:9093"
    val topic = "mykafka2"

    val props = new util.HashMap[String,Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")

    val msgPerSec = 2
    val wordgPerMsg = 3
    val producer = new KafkaProducer[String,String](props)
    while(true){
      for(i<- 1 to msgPerSec){
        val str = (1 to wordgPerMsg).map(x=>Random.nextInt(100).toString).mkString(" ")
        println(str)
        val msg = new ProducerRecord[String,String](topic,null,str)
        producer.send(msg)
      }
      Thread.sleep(1000)
    }

  }
}

生产端程序运行结果

消费端程序

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._

object kafkaWorldCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaWorldCount")
    val ssc = new StreamingContext(conf,Seconds(10))
    ssc.sparkContext.setLogLevel("warn")

    val zkQurom = "master:12181/kafka0.11"
    val group = "888"
    val topics ="mykafka2"
    val numThreads = 3
    val topMap = topics.split(" ").map((_,numThreads)).toMap

    val lines = KafkaUtils.createStream(ssc,zkQurom,group,topMap)
    val words = lines.map(_._2).flatMap(_.split(" "))
    val pairs = words.map((_,1))
    val wordcounts = pairs.reduceByKey(_+_)
    wordcounts.print()
    ssc.start()
    ssc.awaitTermination()

  }
}

消费端程序运行结果

原文地址:https://www.cnblogs.com/BrentBoys/p/10802730.html

时间: 2024-08-30 11:49:54

程序实现kafka 生产和消费的相关文章

Python 基于Python结合pykafka实现kafka生产及消费速率&amp;主题分区偏移实时监控

基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控   By: 授客 QQ:1033553122   1.测试环境 python 3.4 zookeeper-3.4.13.tar.gz 下载地址1: http://zookeeper.apache.org/releases.html#download https://www.apache.org/dyn/closer.cgi/zookeeper/ https://mirrors.tuna.tsinghua.edu

Kafka 使用Java实现数据的生产和消费demo

前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. Kafka 有如下特性: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能. 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输. 支持Kafka Serv

【sparkStreaming】kafka作为数据源的生产和消费

1.建立生产者发送数据 (1)配置zookeeper属性信息props (2)通过 new KafkaProducer[KeyType,ValueType](props) 建立producer (3)通过 new ProducerRecord[KeyType,ValueType](topic,key,value) 封装消息message (4)通过 producer.send(message) 发送消息 package SparkDemo import java.util import org.

JAVA代码之RocketMQ生产和消费数据

一.启动RocketMQ [[email protected] ~]# cat /etc/hosts # Do not remove the following line, or various programs # that require network functionality will fail. 127.0.0.1               localhost.localdomain localhost ::1             localhost6.localdomain6

将 Django 应用程序部署到生产服务器

原文出自: http://www.ibm.com/developerworks/cn/opensource/os-django/ 比较有启发性质的一篇文章,会避免很多弯路 Django 是一个基于 Python 的开源 Web 应用程序框架,其目的是使创建数据库驱动的 Web 站点和 Web 应用程序更加容易.开发 Django 应用程序很简单,因为该框架包含了一个开发 Web 服务器.但是这个框架不适合在生产环境中使用,因此需要进一步将 Django 应用程序部署到 Web.在本文中,您将了解

程序重启RocketMQ消息重复消费

最近在调试RocketMQ消息发送与消费的Demo时,发现一个问题:只要重启程序,RocketMQ消息就会重复消费. 那么这是什么原因导致的,又该如何解决呢? 经过一番排查,发现程序使用的RocketMQ客户端版本是3.6.2,而测试环境安装的RocketMQ环境的版本是4.1.0.原来是客户端和服务器端版本不一样导致的,消息并没有最终被消费,即没有ACK消息确认,只要程序重启就会重复消费. 解决方案:RocketMQ客户端版本使用与服务器端的同一版本,即4.1.0版本. 划重点:使用Rocke

HyperLedger Fabric 1.2 kafka生产环境部署(11.1)

11.1 Kafka模式简介       上一章介绍的Solo模式只存在一个排序(orderer)服务,是一种中心化结构,一旦排序(orderer)服务出现了问题,整个区块链网络将会崩溃,为了能在正式环境中稳定运行,需要对排序(orderer)服务采用集群方式,Hyperledger Fabric采用kafka方式实现排序(orderer)服务的集群,kafka模块被认为是半中心化结构.       顺便提一下,去中心化的BFT(拜占庭容错)排序(orderer)服务集群方式目前还在开发,还没有

kafka生产实例安装

2019/3/14 星期四 Linux 初始化脚本 (centos6 centos7 通用)Linux 初始化脚本 (centos6 centos7 通用)zookeeper生产环境搭建 zookeeper生产环境搭建 在安装前请务必安装好zookeeper 查看上面2个链接地址! kafka 生产环境搭建 [root@emm-kafka01-10--174 ~]# cd /opt/ins/ [root@emm-kafka01-10--174 ins]# ll total 233044 -rwx

spring整合kafka项目生产和消费测试结果记录(一)

使用spring+springMVC+mybatis+kafka做了两个web项目,一个是生产者,一个是消费者. 通过JMeter测试工具模拟100个用户并发访问生产者项目,发送json数据给生产者的接口,生产者将json数据发送到kafka集群, 消费者监听到kafka集群中的消息就开始消费,并将json解析成对象存到MySQL数据库. 下面是使用JMeter测试工具模拟100个并发的线程设置截图: 请求所发送的数据: 下面是100个用户10000个请求的聚合报告: 下面是生产者截图生产完10