golang基础-WaitGroup、kafka消费者

kafka消费者

以下博客是通过生产者创建、发送消息至kafka 
博客链接

现在我们站在消费者的角度,来进行收取消息

package main

import (
    "fmt"
    "strings"
    "sync"

    "github.com/Shopify/sarama"
)

var (
    wg sync.WaitGroup
)

func main() {
    //创建消费者
    consumer, err := sarama.NewConsumer(strings.Split("192.168.11.48:9092", ","), nil)
    if err != nil {
        fmt.Println("Failed to start consumer: %s", err)
        return
    }
    //设置分区
    partitionList, err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions: ", err)
        return
    }
    fmt.Println(partitionList)
    //循环分区
    for partition := range partitionList {
        pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
            return
        }
        defer pc.AsyncClose()
        go func(pc sarama.PartitionConsumer) {
            wg.Add(1)
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
                fmt.Println()
            }
            wg.Done()
        }(pc)
    }
    //time.Sleep(time.Hour)
    wg.Wait()
    consumer.Close()
}

接下来我们测试上面的消费者示例代码,在进行测试前我们需要如下的准备工作 
1、启动zookeeper 
2、启动kafka 
3、创立生产者topic

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic linlin

4、执行生产者发送消息至kafka代码 
5、执行消费者代码程序

第4步的代码如下:

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    msg := &sarama.ProducerMessage{}
    msg.Topic = "nginx_log"
    msg.Value = sarama.StringEncoder("this is a good test, my message is good")

    client, err := sarama.NewSyncProducer([]string{"192.168.11.28:9092"}, config)
    if err != nil {
        fmt.Println("producer close, err:", err)
        return
    }

    defer client.Close()

    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send message failed,", err)
        return
    }

    fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
  • 然后最后看效果图如下:

原文地址:https://www.cnblogs.com/yaowen/p/8320842.html

时间: 2024-08-02 08:09:52

golang基础-WaitGroup、kafka消费者的相关文章

[Golang] kafka集群搭建和golang版生产者和消费者

一.kafka集群搭建 至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了. 1. 下载zookeeper  https://zookeeper.apache.org/releases.html 2. 下载kafka http://kafka.apache.org/downloads 3. 启动zookeeper集群(我的示例是3台机器,后面的kafka也一样,这里就以1台代指3台,当然你也可以只开1台) 1)配置zookeeper. 修改复制一份 zookeeper-3.4.13/c

二、Kafka基础实战:消费者和生产者实例

一.Kafka消费者编程模型 1.分区消费模型 分区消费伪代码描述 main() 获取分区的size for index =0 to size create thread(or process) consumer(Index) 第index个线程(进程) consumer(index) 创建到kafka broker的连接: KafkaClient(host,port) 指定消费参数构建consumer: SimpleConsumer(topic, partitions) 设置消费offset

kafka 消费者offset记录位置和方式

我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同. 首先来说说消费者如果是根据javaapi来消费,也就是[kafka.javaapi.consumer.ConsumerConnector],我们会配置参数[zookeeper.connect]来消费.这种情况下,消费者的offset会更新到zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录下,例如: [z

Golang基础入门

Go语言很容易上手 第一步,在你的计算机上安装Go语言环境 首先下载对应操作系统的安装包或者源文件 Windows和Mac OSX 都有安装包,可以选择直接双击安装,很简单 Ubuntu系统可以使用 apt-get 安装 sudo apt-get install golang 当然,你也可以选择使用源码包安装 获取源码: $ hg clone -u release https://code.google.com/p/go 进入到源码目录,运行安装脚本 $ cd go/src $ ./all.ba

一文精通kafka 消费者的三种语义

本文主要是以kafka 09的client为例子,详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once, at-least-once, 和 exactly-once message ,生产者的使用等. (一) 创建topic bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1 (二)

Kafka 系列(四)—— Kafka 消费者详解

一.消费者和消费者群组 在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响.Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度.此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是 Kafka 实现横向伸缩的主要手段. 需要注意的是:同一个分区只能被同一个消费者群组里面的一个消费

Kafka消费者——结合spring开发

Kafka消费者端 可靠性保证 作为消费端,消费数据需要考虑的是: 1.不重复消费消息 2.不缺失消费消息 自动提交 offset 的相关参数: enable.auto.commit: 是否开启自动提交 offset 功能(true) auto.commit.interval.ms: 自动提交 offset 的时间间隔 (1000ms = 1s) 手动提交offset 的相关参数: enable.auto.commit: 是否开启自动提交 offset 功能(false) 异步提交也个缺点,那就

Spring整合kafka消费者和生产者&redis的步骤

==================================================================================一.整合kafka(生产者)步骤1.导入依赖(pom.xml)2.编写配置文件,修改配置文件的ip和端口号,修改主题(producer.xml)3.如果再ssm项目中可以让spring.xml来加载这个配置文件 <import resource="classpath:XXX.xml" /> 如果是再测试类中如何加

java基础之----kafka

概述 听到这个名字是不是很熟悉,没错这个名字就是文学家卡夫卡的英文,传说中国的王小波也被誉为东方的乔伊斯+卡夫卡,哈哈哈,当然这篇文章不是谈论文学家卡夫卡的,那为什么一个消息中间件叫kafka呢?很简单就是这个中间件的作者喜欢卡夫卡,所以就这么命名了,如果有一天你也写出来一个牛逼的软件,而且你也很喜欢王小波,那你可以命名为xiaobo,没人可以拦得住你. kafka架构 先上图(开篇一张图,内容全靠编) kafka broker: 从图中可以看出,这家伙是喜欢搞黄色的^_^,其实broker是k