[Golang] kafka集群搭建和golang版生产者和消费者

一、kafka集群搭建  

  至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了。

1. 下载zookeeper  https://zookeeper.apache.org/releases.html

2. 下载kafka http://kafka.apache.org/downloads

3. 启动zookeeper集群(我的示例是3台机器,后面的kafka也一样,这里就以1台代指3台,当然你也可以只开1台)

  1)配置zookeeper。 修改复制一份 zookeeper-3.4.13/conf/zoo_sample.cfg 改名成zoo.cfg。修改以下几个参数,改成适合自己机器的。

  dataDir=/home/test/zookeeper/data
  dataLogDir=/home/test/zookeeper/log
  server.1=10.22.1.1:2888:3888
  server.2=10.22.1.2:2888:3888
  server.3=10.22.1.3:2888:3888

  2) 创建myid文件,确定机器编号。分别在3台机器的/home/test/zookeeper/data目录执行分别执行命令 echo 1 > myid(注意ip为10.22.1.2把1改成2,见上面的配置)

  3) 启动zookeeper集群。分别进入目录zookeeper-3.4.13/bin 执行 sh zookeeper-server-start.sh

4. 启动kafka集群

  1) 配置kafka。进入kafka_2.11-2.2.0/config。复制3份,分别为server1.properties,server2.properties,server3.properties。修改以下几项(注意对应的机器id)

log.dirs和zookeeper.connect 是一样的。broker.id和listeners分别填对应的id和ip
broker.id=1
listeners=PLAINTEXT://10.22.1.1:9092
log.dirs=/home/test/kafka/log
zookeeper.connect=10.22.1.1:2181,10.22.1.2:2181,10.22.1.3:2181

  2) 启动kafka集群。分别进入kafka_2.11-2.2.0/bin目录,分别执行sh kafka-server-start.sh ../config/server1.properties (第2台用server2.properties配置文件)

 

二、Golang生产者和消费者

  目前比较流行的golang版的kafka客户端库有两个:

  1. https://github.com/Shopify/sarama

  2. https://github.com/confluentinc/confluent-kafka-go

  至于谁好谁坏自己去分辨,我用的是第1个,star比较多的。

1. kafka生产者代码

  这里有2点要说明:

  1)  config.Producer.Partitioner = sarama.NewRandomPartitioner,我分partition用的是随机,如果你想稳定分paritition的话可以自定义,还有轮询和hash方式

  2) 我的topic是走的外部配置,可以根据自己的需求修改

// Package kafka_producer kafka 生产者的包装
package kafka_producer

import (
    "github.com/Shopify/sarama"
    "strings"
    "sync"
    "time"

    "github.com/alecthomas/log4go"
)

// Config 配置
type Config struct {
    Topic      string `xml:"topic"`
    Broker     string `xml:"broker"`
    Frequency  int    `xml:"frequency"`
    MaxMessage int    `xml:"max_message"`
}

type Producer struct {
    producer sarama.AsyncProducer

    topic     string
    msgQ      chan *sarama.ProducerMessage
    wg        sync.WaitGroup
    closeChan chan struct{}
}

// NewProducer 构造KafkaProducer
func NewProducer(cfg *Config) (*Producer, error) {

    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.NoResponse                                  // Only wait for the leader to ack
    config.Producer.Compression = sarama.CompressionSnappy                            // Compress messages
    config.Producer.Flush.Frequency = time.Duration(cfg.Frequency) * time.Millisecond // Flush batches every 500ms
    config.Producer.Partitioner = sarama.NewRandomPartitioner

    p, err := sarama.NewAsyncProducer(strings.Split(cfg.Broker, ","), config)
    if err != nil {
        return nil, err
    }
    ret := &Producer{
        producer:  p,
        topic:     cfg.Topic,
        msgQ:      make(chan *sarama.ProducerMessage, cfg.MaxMessage),
        closeChan: make(chan struct{}),
    }

    return ret, nil
}

// Run 运行
func (p *Producer) Run() {

    p.wg.Add(1)
    go func() {
        defer p.wg.Done()

    LOOP:
        for {
            select {
            case m := <-p.msgQ:
                p.producer.Input() <- m
            case err := <-p.producer.Errors():
                if nil != err && nil != err.Msg {
                    l4g.Error("[producer] err=[%s] topic=[%s] key=[%s] val=[%s]", err.Error(), err.Msg.Topic, err.Msg.Key, err.Msg.Value)
                }
            case <-p.closeChan:
                break LOOP
            }

        }
    }()

    for hasTask := true; hasTask; {
        select {
        case m := <-p.msgQ:
            p.producer.Input() <- m
        default:
            hasTask = false
        }
    }

}

// Close 关闭
func (p *Producer) Close() error {
    close(p.closeChan)
    l4g.Warn("[producer] is quiting")
    p.wg.Wait()
    l4g.Warn("[producer] quit over")

    return p.producer.Close()
}

// Log 发送log
func (p *Producer) Log(key string, val string) {
    msg := &sarama.ProducerMessage{
        Topic: p.topic,
        Key:   sarama.StringEncoder(key),
        Value: sarama.StringEncoder(val),
    }

    select {
    case p.msgQ <- msg:
        return
    default:
        l4g.Error("[producer] err=[msgQ is full] key=[%s] val=[%s]", msg.Key, msg.Value)
    }
}

2. kafka消费者

  几点说明:

  1) kafka一定要选用支持集群的版本

  2) 里面带了创建topic,删除topic,打印topic的工具

  3) replication是外面配置的

  4) 开多个consumer需要在创建topic时设置多个partition。官方的示例当开多个consumer的时候会崩溃,我这个版本不会,我给官方提交了一个PR,还不知道有没有采用

// Package main Kafka消费者
package main

import (
    "context"
    "encoding/xml"
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "os/signal"
    "runtime"
    "strings"
    "syscall"
    "time"

    "github.com/Shopify/sarama"
    "github.com/alecthomas/log4go"
)

// Consumer Consumer配置
type ConsumerConfig struct {
    Topic       []string `xml:"topic"`
    Broker      string   `xml:"broker"`
    Partition   int32    `xml:"partition"`
    Replication int16    `xml:"replication"`
    Group       string   `xml:"group"`
    Version     string   `xml:"version"`
}

var (
    configFile = "" // 配置路径
    initTopic  = false
    listTopic  = false
    delTopic   = ""
    cfg        = &Config{}
)

// Config 配置
type Config struct {
    Consumer ConsumerConfig `xml:"consumer"`
}

func init() {
    flag.StringVar(&configFile, "config", "../config/consumer.xml", "config file ")
    flag.BoolVar(&initTopic, "init", initTopic, "create topic")
    flag.BoolVar(&listTopic, "list", listTopic, "list topic")
    flag.StringVar(&delTopic, "del", delTopic, "delete topic")

}

func main() {

    runtime.GOMAXPROCS(runtime.NumCPU())

    defer func() {
        time.Sleep(time.Second)
        log4go.Warn("[main] consumer quit over!")
        log4go.Global.Close()
    }()

    contents, _ := ioutil.ReadFile(configFile)
    xml.Unmarshal(contents, cfg)

    // sarama的logger
    sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", "consumer"), log.LstdFlags)

    // 指定kafka版本,一定要支持kafka集群
    version, err := sarama.ParseKafkaVersion(cfg.Consumer.Version)
    if err != nil {
        panic(err)
    }
    config := sarama.NewConfig()
    config.Version = version
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    // 工具
    if tool(cfg, config) {
        return
    }

    // kafka consumer client
    ctx, cancel := context.WithCancel(context.Background())
    client, err := sarama.NewConsumerGroup(strings.Split(cfg.Consumer.Broker, ","), cfg.Consumer.Group, config)
    if err != nil {
        panic(err)
    }

    consumer := Consumer{}
    go func() {
        for {
            err := client.Consume(ctx, cfg.Consumer.Topic, &consumer)
            if err != nil {
                log4go.Error("[main] client.Consume error=[%s]", err.Error())
                // 5秒后重试
                time.Sleep(time.Second * 5)
            }
        }
    }()

    // os signal
    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

    <-sigterm

    cancel()
    err = client.Close()
    if err != nil {
        panic(err)
    }

    log4go.Info("[main] consumer is quiting")
}

func tool(cfg *Config, config *sarama.Config) bool {
    if initTopic || listTopic || len(delTopic) > 0 {
        ca, err := sarama.NewClusterAdmin(strings.Split(cfg.Consumer.Broker, ","), config)
        if nil != err {
            panic(err)
        }

        if len(delTopic) > 0 { // 删除Topic
            if err := ca.DeleteTopic(delTopic); nil != err {
                panic(err)
            }
            log4go.Info("delete ok topic=[%s]\n", delTopic)
        } else if initTopic { // 初始化Topic
            if detail, err := ca.ListTopics(); nil != err {
                panic(err)
            } else {
                for _, v := range cfg.Consumer.Topic {
                    if d, ok := detail[v]; ok {
                        if cfg.Consumer.Partition > d.NumPartitions {
                            if err := ca.CreatePartitions(v, cfg.Consumer.Partition, nil, false); nil != err {
                                panic(err)
                            }
                            log4go.Info("alter topic ok", v, cfg.Consumer.Partition)
                        }

                    } else {
                        if err := ca.CreateTopic(v, &sarama.TopicDetail{NumPartitions: cfg.Consumer.Partition, ReplicationFactor: cfg.Consumer.Replication}, false); nil != err {
                            panic(err)
                        }
                        log4go.Info("create topic ok", v)
                    }
                }
            }
        }

        // 显示Topic列表
        if detail, err := ca.ListTopics(); nil != err {
            log4go.Info("ListTopics error", err)
        } else {
            for k := range detail {
                log4go.Info("[%s] %+v", k, detail[k])
            }
        }

        if err := ca.Close(); nil != err {
            panic(err)
        }

        return true
    }
    return false
}

type Consumer struct {
}

func (consumer *Consumer) Setup(s sarama.ConsumerGroupSession) error {
    return nil
}

func (consumer *Consumer) Cleanup(s sarama.ConsumerGroupSession) error {
    return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        key := string(message.Key)
        val := string(message.Value)
        log4go.Info("%s-%s", key, val)
        session.MarkMessage(message, "")
    }

    return nil
}

原文地址:https://www.cnblogs.com/mrblue/p/10770651.html

时间: 2024-08-03 02:14:51

[Golang] kafka集群搭建和golang版生产者和消费者的相关文章

KAFKA集群搭建

一.简介 Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark都支持与Kafka集成.   Kafka适合做什么? 官方文档介绍,它通常被使用在两大类应用中: 搭建实时数据流管道,在系统或应用之间可靠的获取数据 搭建对数据流进行转换或相应的实时流应用程序.   为了了解Kafka具体如何实现这些功能, 首先理解几个概

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf

kafka学习(三)-kafka集群搭建

kafka集群搭建 下面简单的介绍一下kafka的集群搭建,单个kafka的安装更简单,下面以集群搭建为例子. 我们设置并部署有三个节点的 kafka 集合体,必须在每个节点上遵循下面的步骤来启动 kafka 服务器,kafka集群需要依赖zookeeper集群,上一篇已经说道了zookeeper的搭建,方法请参考:http://www.cnblogs.com/chushiyaoyue/p/5615267.html 1.环境准备 测试服务器(2n+1)奇数台 192.168.181.128 ce

Kafka【第一篇】Kafka集群搭建

Kafka初识 1.Kafka使用背景 在我们大量使用分布式数据库.分布式计算集群的时候,是否会遇到这样的一些问题: 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位 我想对用户的搜索关键词进行统计,分析出当前的流行趋势 有些数据,存储数据库浪费,直接存储硬盘效率又低 这些场景都有一个共同点: 数据是又上游模块产生,上游模块,使用上游模块的数据计算.统计.分析,这个时候就可以使用消息系统,尤其是分布式消息系统! 2.Kafka的定义 What is Kafka:它是一个分布

消息队列之kafka(集群搭建)

1.kafka集群搭建   kafka安装包下载地址: 官网网址:http://kafka.apache.org/quickstart 中文官网:http://kafka.apachecn.org/quickstart.html 在 windows 平台,从官网下载:http://mirrors.hust.edu.cn/apache/kafka/1.1.0/ 在 centos 平台:wgethttp://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2

Linux下kafka集群搭建过程记录

环境准备 zookeeper集群环境kafka是依赖于zookeeper注册中心的一款分布式消息对列,所以需要有zookeeper单机或者集群环境. 三台服务器: 172.16.18.198 k8s-n1 172.16.18.199 k8s-n2 172.16.18.200 k8s-n3 下载kafka安装包 http://kafka.apache.org/downloads 中下载,目前最新版本的kafka已经到2.2.0,我这里之前下载的是kafka_2.11-2.2.0.tgz. 安装ka

kafka集群搭建与apiclient创建

曾经的消息队列(activeMQ)对于大数据吞吐率不行,但kafka非常好的攻克了此类问题.而且以集群的方式进行扩展.可谓相当强大: 集群搭建方式很轻量级.仅仅需将tar包复制到server,解压,改动配置文件就可以: 1.tar -xzf kafka_2.9.2-0.8.1.1.tgz 2.改动配置文件: 节点1: vim config/server.properties broker.id=1 //全部集群节点中必须唯一 port=9092 //端口号(发送消息) log.dir=/tmp/

kafka集群搭建与api客户端创建

以前的消息队列(activeMQ)对于大数据吞吐率不行,但kafka很好的解决了此类问题,并且以集群的方式进行扩展,可谓相当强大: 集群搭建方式非常轻量级,只需将tar包拷贝到服务器,解压,修改配置文件即可: 1.tar -xzf kafka_2.9.2-0.8.1.1.tgz 2.修改配置文件: 节点1: vim config/server.properties broker.id=1 //所有集群节点中必须唯一 port=9092 //端口号(发送消息) log.dir=/tmp/kafka

kafka 集群搭建

环境:ubuntu14.04 版本:jdk1.8,zookeeper 3.4.10,kafka 2.11 搭建步骤: 1. 搭建zookeeper集群 参考链接:zookeeper集群搭建 2. 下载kafka源码包,并上传到服务器 下载链接:http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz 3. 解压kafka源码包 tar -zxvf kafka_2.11-1.0.0.tgz 4. 创建ka