Kafka 学习笔记之 Producer/Consumer (Scala)

既然Kafka使用Scala写的,最近也在慢慢学习Scala的语法,虽然还比较生疏,但是还是想尝试下用Scala实现Producer和Consumer,并且用HashPartitioner实现消息根据key路由到指定的partition。

Producer:

import java.util.Properties
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.KeyedMessage

object ProducerDemo {
  def main(args: Array[String]): Unit = {

    val brokers = "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092"
    val topic = "ScalaTopic";

    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    props.put("partitioner.class", classOf[HashPartitioner].getName)
    props.put("producer.type", "sync")
    props.put("batch.num.messages", "1")
    props.put("queue.buffering.max.messages", "1000000")
    props.put("queue.enqueue.timeout.ms", "20000000")

    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config);

    val sleepFlag = false;
    val message1 = new KeyedMessage[String, String](topic, "1", "test 0");
    producer.send(message1);
    if(sleepFlag) Thread.sleep(5000);
    val message2 = new KeyedMessage[String, String](topic, "1", "test 1");
    producer.send(message2);
    if(sleepFlag) Thread.sleep(5000);
    val message3 = new KeyedMessage[String, String](topic, "1", "test 2");
    producer.send(message3);
    if(sleepFlag) Thread.sleep(5000);
    val message4 = new KeyedMessage[String, String](topic, "4", "test 3");
    producer.send(message4);
    if(sleepFlag) Thread.sleep(5000);
    val message5 = new KeyedMessage[String, String](topic, "4", "test 4");
    producer.send(message5);
    if(sleepFlag) Thread.sleep(5000);
    val message6 = new KeyedMessage[String, String](topic, "4", "test 4");
    producer.send(message6);
    if(sleepFlag) Thread.sleep(5000);

  }
}

Consumer:

import java.util.Properties
import kafka.consumer.ConsumerConfig
import kafka.consumer.Consumer
import kafka.message.MessageAndMetadata

object ConsumerDemo {
  def main(args: Array[String]): Unit = {
    var groupid = ""
    var consumerid = ""
    var topic = ""

    args match {
      case Array(arg1, arg2, arg3) => topic = arg1; groupid = arg2; consumerid = arg3
    }

    val props = new Properties()
    props.put("zookeeper.connect", "192.168.1.151:2181,192.168.1.152:2181,192.168.1.153:2181")
    props.put("group.id", groupid)
    props.put("client.id", "test")
    props.put("consumer.id", consumerid)
    props.put("auto.offset.reset", "smallest")
    props.put("auto.commit.enable", "true")
    props.put("auto.commit.interval.ms", "100")

    val consumerConfig = new ConsumerConfig(props)
    val consumer = Consumer.create(consumerConfig)

    val topicCountMap = Map(topic -> 1)
    val consumerMap = consumer.createMessageStreams(topicCountMap)
    val streams = consumerMap.get(topic).get
    for (stream <- streams) {
      val it = stream.iterator()

      while (it.hasNext()) {
        val messageAndMetadata = it.next()

        val message = s"Topic:${messageAndMetadata.topic}, GroupID:$groupid, Consumer ID:$consumerid, PartitionID:${messageAndMetadata.partition}, " +
          s"Offset:${messageAndMetadata.offset}, Message Key:${new String(messageAndMetadata.key())}, Message Payload: ${new String(messageAndMetadata.message())}"

        System.out.println(message);

      }

    }

  }

}

HashPartitioner:

import kafka.producer.Partitioner
import scala.math._
import kafka.utils.VerifiableProperties

class HashPartitioner extends Partitioner {
  def this(verifiableProperties: VerifiableProperties) { this }

  override def partition(key: Any, numPartitions: Int): Int = {

    if (key.isInstanceOf[Int]) {
      abs(key.toString().toInt) % numPartitions
    }

    key.hashCode() % numPartitions
  }

}

运行结果:

所有消息都被路由到了Partition1,测试成功!

时间: 2024-10-13 01:00:20

Kafka 学习笔记之 Producer/Consumer (Scala)的相关文章

Kafka 学习笔记之 Consumer API

Kafka提供了两种Consumer API High Level Consumer API Low Level Consumer API(Kafka诡异的称之为Simple Consumer API,实际上非常复杂) 1. High Level Consumer API概述 High Level Consumer API围绕着Consumer Group这个逻辑概念展开,它屏蔽了每个Topic的每个Partition的Offset管理(自动读取zookeeper中该Consumer group

kafka学习笔记:知识点整理

一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕. 3.扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可. 4.

[Big Data - Kafka] kafka学习笔记:知识点整理

一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕. 3.扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可. 4.

Kafka学习笔记(一):概念介绍

Kafka是一个开源的,分布式的,高吞吐量的消息系统.随着Kafka的版本迭代,日趋成熟.大家对它的使用也逐步从日志系统衍生到其他关键业务领域.特别是其超高吞吐量的特性,在互联网领域,使用越来越广泛,生态系统也越来的完善.同时,其设计思路也是其他消息中间件重要的设计参考. Kafka原先的开发初衷是构建一个处理海量日志的框架,基于高吞吐量为第一原则,所以它对消息的可靠性以及消息的持久化机制考虑的并不是特别的完善.0.8版本后,陆续加入了一些复制.应答和故障转移等相关机制以后,才可以让我们在其他关

kafka学习笔记:知识点整理(二)

三.kafka HA 3.1 replication 如图.1所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N).没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition.引入replication 之后,同一个 partition 可能会有多个 replic

Kafka 学习笔记之 High Level Consumer相关参数

High Level Consumer相关参数 自动管理offset auto.commit.enable = true auto.commit.interval.ms = 60*1000 手动管理offset auto.commit.enable = false ConsumerConnector.commitOffsets(); offset存储 offsets.storage = zookeeper (kafka) dual.commit.enabled = true (false)

Kafka学习笔记

Apache Kafka 一.消息队列分类 1.1 点对点 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并消费消息 注意:   1.消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息   2.Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费 1.2 发布/订阅 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息.和点对点方式不同,发布到topic的消息会被所有订阅者消费 二.消息队

kafka学习笔记:知识点整理(一)

一.kafka 架构 1.1 拓扑结构 如下图: 图.1 1.2 相关概念 如图.1中,kafka 相关名词解释如下: 1.producer:  消息生产者,发布消息到 kafka 集群的终端或服务. 2.broker:  kafka 集群中包含的服务器. 3.topic:  每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的. 4.partition:  partition 是物理上的概念,每个 topic 包含一个或多个 partition.kafka 分配

Kafka学习笔记-Java简单操作

Maven依赖包: [plain] view plain copy <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.1</version> </dependency> <dependency> <groupId>org.apache