Spark修炼之道(进阶篇)——Spark入门到精通:第十五节 Kafka 0.8.2.1 集群搭建

作者:周志湖

微信号:zhouzhihubeyond

本节为下一节Kafka与Spark Streaming做铺垫

主要内容

1.kafka 集群搭建

1. kafka 集群搭建

  1. kafka 安装与配置

    到下面的地址下载:Scala 2.10 - kafka_2.10-0.8.2.1.tgz

    http://kafka.apache.org/downloads.html

    下载完成后,使用命令

tar -zxvf  kafka_2.10-0.8.2.1.tgz 

解压,解压后的目录如下

进入config目录,将server.properties文件内容如下:

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=sparkmaster

//中间省略,默认配置即可
############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=sparkmaster:2181,sparkslave01:2181,sparkslave02:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

将整个安装文件进行跨机器拷贝:

root@sparkmaster:/hadoopLearning# scp -r kafka_2.10-0.8.2.1/ sparkslave01:/hadoopLearning/
root@sparkmaster:/hadoopLearning# scp -r kafka_2.10-0.8.2.1/ sparkslave02:/hadoopLearning/ 

将sparkslave01机器上的server.properties文件内容如下:


############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=sparkslave01

//中间省略,默认配置即可

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=sparkmaster:2181,sparkslave01:2181,sparkslave02:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

将sparkslave02机器上的server.properties文件内容如下:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=sparkslave02

//中间省略,默认配置即可

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=sparkmaster:2181,sparkslave01:2181,sparkslave02:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
  1. 启动Kafka集群
root@sparkslave02:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties
root@sparkmaster:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties 

3 创建topic

在sparkmaster机器上执行下列命令创建一个topic

root@sparkmaster:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-topics.sh --create --topic kafkatopictest --replication-factor 3 --partitions 2 --zookeeper sparkmaster:2181
Created topic "kafkatopictest".

4 发送消息至kafka

在sparkslave01机器上执行下列命令并向kafka发送消息

root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest
Hello Kafka, I will test Spark Streaming on you next lesson

5 接收kafka发送来的消息

在sparkslave02机器上执行下列命令并接收kafka发送消息

root@sparkslave02:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-consumer.sh --zookeeper sparkmaster:2181 --topic kafkatopictest --from-beginning
Hello Kafka, I will test Spark Streaming on you next lesson

至此Kafka 集群搭建与测试完毕

下一节当中,我们将演示kafka如何与Spark Streaimg结合起来使用

时间: 2024-08-07 21:20:35

Spark修炼之道(进阶篇)——Spark入门到精通:第十五节 Kafka 0.8.2.1 集群搭建的相关文章

Scala入门到精通——第十五节 Case Class与模式匹配(二)

本节主要内容 模式匹配的类型 for控制结构中的模式匹配 option类型模式匹配 1. 模式的类型 1 常量模式 object ConstantPattern{ def main(args: Array[String]): Unit = { //注意,下面定义的是一个函数 //函数的返回值利用的是模式匹配后的结果作为其返回值 //还需要注意的是函数定义在main方法中 //也即scala语言可以在一个函数中定义另外一个函数 def patternShow(x:Any)=x match { ca

Scala入门到精通——第二十五节 提取器(Extractor)

作者:摇摆少年梦 视频地址:http://www.xuetuwuyou.com/course/12 本节主要内容 apply与unapply方法 零变量或变量的模式匹配 提取器与序列模式 scala中的占位符使用总结 1. apply与unapply方法 apply方法我们已经非常熟悉了,它帮助我们无需new操作就可以创建对象,而unapply方法则用于析构出对象,在模式匹配中特别提到,如果一个类要能够应用于模式匹配当中,必须将类声明为case class,因为一旦被定义为case class,

Scala入门到精通——第十四节 Case Class与模式匹配(一)

本节主要内容 模式匹配入门 Case Class简介 Case Class进阶 1. 模式匹配入门 在java语言中存在switch语句,例如: //下面的代码演示了java中switch语句的使用 public class SwitchDemo { public static void main(String[] args) { for(int i = 0; i < 100; i++) { switch (i) { case 10:System.out.println("10"

Scala入门到精通——第二十四节 高级类型 (三)

作者:摆摆少年梦 视频地址:http://blog.csdn.net/wsscy2004/article/details/38440247 本节主要内容 Type Specialization Manifest.TypeTag.ClassTag Scala类型系统总结 在scala中,类(class)与类型(type)是两个不一样的概念.我们知道类是对同一类型数据的抽象,而类型则更详细. 比方定义class List[T] {}, 能够有List[Int] 和 List[String]等详细类型

Scala入门到精通——第二十九节 Scala数据库编程

本节主要内容 Scala Mavenproject的创建 Scala JDBC方式訪问MySQL Slick简单介绍 Slick数据库编程实战 SQL与Slick相互转换 本课程在多数内容是在官方教程上改动而来的,官方给的样例是H2数据库上的.经过本人改造,用在MySQL数据库上,官方教程地址:http://slick.typesafe.com/doc/2.1.0/sql-to-slick.html 1. Scala Mavenproject的创建 本节的project项目採用的是Maven P

Scala入门到精通——第十六节 泛型与注解

本节主要内容 泛型(Generic Type)简介 注解(Annotation)简介 注解常用场景 1. 泛型(Generic Type)简介 泛型用于指定方法或类可以接受任意类型参数,参数在实际使用时才被确定,泛型可以有效地增强程序的适用性,使用泛型可以使得类或方法具有更强的通用性.泛型的典型应用场景是集合及集中中的方法参数,可以说同java一样,scala中泛型无处不在,具体可以查看scala的api //单个泛型参数的使用情况 class Person[T](var name:T) cla

Scala入门到精通——第二十六节 Scala并发编程基础

作者:摇摆少年梦 视频地址:http://www.xuetuwuyou.com/course/12 本节主要内容 Scala并发编程简介 Scala Actor并发编程模型 react模型 Actor的几种状态 Actor深入使用解析 本节主要介绍的scala并发编程的基本思想,由于scala在2.10版本之后宣布使用akka作为其并发编程库,因此本节只进行基础性的内容介绍,后面将把重点放在akka框架的讲解上. 1. Scala并发编程简介 2003 年,Herb Sutter 在他的文章 "

零基础入门学习java第十五节:对象的比较

这一篇主要讲讲对象的比较,什么是对象的比较,我们知道两个数值类型只需要用“==”符号即可进行相等判断,但如果是两个Goods对象呢?如何进行比较?这时候,我们的equals方法就派上用场了.equals方法是类的祖先Object类的另一个protected方法,既然是protected方法(能被同一个包里的所有类所访问, 能被该类的子类所访问,子类可以和父类不在一个包中),子类是可以直接访问的,但如果没有覆盖该方法,那么使用的只是Object的原始比较方法,return(this==obj) ,

Scala入门到精通——第十九节 隐式转换与隐式参数(二)

作者:摇摆少年梦 配套视频地址:http://www.xuetuwuyou.com/course/12 本节主要内容 隐式参数中的隐式转换 函数中隐式参数使用概要 隐式转换问题梳理 1. 隐式参数中的隐式转换 前一讲中,我们提到函数中如果存在隐式参数,在使用该函数的时候如果不给定对应的参数,则编译器会自动帮我们搜索相应的隐式值,并将该隐式值作为函数的参数,这里面其实没有涉及到隐式转换,本节将演示如何利用隐式参数进行隐式转换,下面的代码给定的是一个普通的比较函数: object ImplicitP