Golang之发送消息至kafka

windows下安装zookeeper

1、安装JAVA-JDK,从oracle下载最新的SDK安装(我用的是1.8的) 
2、安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/ 
3、重命名conf/zoo_sample.cfg 为conf/zoo.cfg 
4、编辑 conf/zoo.cfg,修改dataDir=D:\zookeeper-3.3.6\data\ 
4、运行bin/zkServer.cmd

启动结果如下:

安装kafka

1、打开链接:http://kafka.apache.org/downloads.html下载kafka2.1.2

2、打开config目录下的server.properties, 修改log.dirs为D:\kafka_logs,

3、修改advertised.host.name=服务器ip

4、启动kafka ./bin/windows/kafka-server-start.bat ./config/server.preperties

kafka链接zookeeper

kafka也提供了一个命令行消费者,接受消息并打印到标准输出。

bin/kafka-console-consumer.bat --zookeeper 127.0.0.1:2181 --topic nginx_log

golang写入kafka


package main

import (   "fmt"

   "github.com/Shopify/sarama"   "time")

//消息写入kafkafunc main() {   //初始化配置   config := sarama.NewConfig()   config.Producer.RequiredAcks = sarama.WaitForAll   config.Producer.Partitioner = sarama.NewRandomPartitioner   config.Producer.Return.Successes = true   //生产者   client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)   if err != nil {      fmt.Println("producer close,err:", err)      return   }

   defer client.Close()   var n int=0

   for n<20{      n++      //创建消息      msg := &sarama.ProducerMessage{}      msg.Topic = "nginx_log"      msg.Value = sarama.StringEncoder("this is a good test,hello chaoge!!")      //发送消息      pid, offset, err := client.SendMessage(msg)      if err != nil {         fmt.Println("send message failed,", err)         return      }      fmt.Printf("pid:%v offset:%v\n,", pid, offset)      time.Sleep(10 * time.Millisecond)

   }

}
 

goland运行结果:

kafka收到的数据:

原文地址:https://www.cnblogs.com/pyyu/p/8371649.html

时间: 2024-10-03 16:45:04

Golang之发送消息至kafka的相关文章

log4j2发送消息至Kafka

title: 自定义log4j2发送日志到Kafka 图片描述(最多50字) tags: log4j2,kafka 为了给公司的大数据平台提供各项目组的日志,而又使各项目组在改动上无感知.做了一番调研后才发现log4j2默认有支持将日志发送到kafka的功能,惊喜之下赶紧看了下log4j对其的实现源码!发现默认的实现是同步阻塞的,如果kafka服务一旦挂掉会阻塞正常服务的日志打印,为此本人在参考源码的基础上做了一些修改. log4j日志工作流程 log4j2对于log4j在性能上有着显著的提升,

kafka无法发送消息问题处理

背景 在服务器上搭建了一个单机环境的kafka broker,在服务器上使用命令生产消息时,一切正常.当在本地使用JAVA程序发送消息时,一直出错. 抛出的错误为: Exception in thread "main" Failed to send requests for topics test with correlation ids in [0,12] kafka.common.FailedToSendMessageException: Failed to send messag

【原创】Kafka接受发送消息对象Object基础版

首先感谢 kafka 中国社区 王扬庭例子的帮助和指导~~~~~(kafka_2.9.2-0.8.1.1) kafka常用的发送消息的方法如下: Properties props = new Properties(); props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181"); props.put("serializer.class", "kafka

kafka producer batch 发送消息

1. 使用 KafkaProducer 发送消息,是按 batch 发送的,producer 首先把消息放入 ProducerBatch 中: org.apache.kafka.clients.producer.internals.ProducerBatch 2. KafkaProduer 类中有一个 Thread 属性,负责 IO,发送和接收数据: this.sender = new Sender(logContext, client, this.metadata, this.accumula

java客户端向单机版kafka发送消息没有接收到

kafka版本:kafka_2.11-0.10.0.0 在kafka服务器命令发送消息,消费者可以接受到, 但是在java客户端向kafka发送消息时消费者接受不到, 在kafka/config/sever.properties把这个注解打开 advertised.listeners=PLAINTEXT://ip.137:9092        #本机服务器ip 意思就是说:hostname.port都会广播给producer.consumer.如果你没有配置了这个属性的话,则使用listene

分布式消息队列kafka

下载地址:http://kafka.apache.org/downloads.html 先启动zookeeper服务器 bin/zookeeper-server-start.sh config/zookeeper.properties & 再启动kafka服务器 bin/kafka-server-start.sh -daemon config/server.properties & 创建topic bin/kafka-topics.sh --create --zookeeper local

分布式公布订阅消息系统 Kafka 架构设计

我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础. 如今它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是全部站点在对其站点使用情况做报表时要用到的数据中最常规的部分.活动数据包含页面訪问量(page view).被查看内容方面的信息以及搜索情况等内容.这样的数据通常的处理方式是先把各种活动以日志的形式写

分布式发布订阅消息系统 Kafka 架构设计[转]

分布式发布订阅消息系统 Kafka 架构设计 转自:http://www.oschina.net/translate/kafka-design 我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部

分布式消息系统-kafka

消息中间件MessageQuene 解耦且可扩展:业务复杂度的提升带来的也是耦合度的提高,消息队列在处理过程中间插入了一个隐含的.基于数据的接口层,两边的处理过程都要实现这一接口.这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 冗余:有些业务在处理过程中如果失败了,数据在未进行持久化的时候就已经消失,消息队列把数据持久化直到他们被处理,避免了数据的丢失 处理并发:大数据量访问的时候我们可以将消息放入队列中,然后在队列里面按照系统的吞吐能力来进行稳定的抽取数据并进行业务处