使用 Kafka 和 MongoDB 进行 Go 异步处理

在这个示例中,我将数据的保存和 MongoDB 分离,并创建另一个微服务去处理它。我还添加了 Kafka 为消息层服务,这样微服务就可以异步处理它自己关心的东西了。

下面是这个使用了两个微服务的简单的异步处理示例的上层架构图。

图片描述(最多50字)

微服务 1 —— 是一个 REST 式微服务,它从一个 /POST http 调用中接收数据。接收到请求之后,它从 http 请求中检索数据,并将它保存到 Kafka。保存之后,它通过 /POST 发送相同的数据去响应调用者。

微服务 2 —— 是一个订阅了 Kafka 中的一个主题的微服务,微服务 1 的数据保存在该主题。一旦消息被微服务消费之后,它接着保存数据到 MongoDB 中。

我们开始吧!

首先,启动 Kafka,在你运行 Kafka 服务器之前,你需要运行 Zookeeper。下面是示例:

$ cd /<download path>/kafka_2.11-1.1.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties
接着运行 Kafka —— 我使用 9092 端口连接到 Kafka。如果你需要改变端口,只需要在 config/server.properties 中配置即可。如果你像我一样是个新手,我建议你现在还是使用默认端口。

$ bin/kafka-server-start.sh config/server.properties
Kafka 跑起来之后,我们需要 MongoDB。它很简单,只需要使用这个 docker-compose.yml 即可。

version: ‘3‘
services:
mongodb:
image: mongo
ports:

  • "27017:27017"
    volumes:
  • "mongodata:/data/db"
    networks:
  • network1
    volumes:
    mongodata:
    networks:
    network1:
    使用 Docker Compose 去运行 MongoDB docker 容器。

docker-compose up
这里是微服务 1 的相关代码。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

func jobsPostHandler(w http.ResponseWriter, r *http.Request) {
//Retrieve body from http request
b, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
panic(err)
}
//Save data into Job struct
var _job Job
err = json.Unmarshal(b, &_job)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
saveJobToKafka(_job)
//Convert job struct into json
jsonString, err := json.Marshal(job)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
//Set content-type http header
w.Header().Set("content-type", "application/json")
//Send back data as response
w.Write(jsonString)
}
func saveJobToKafka(job Job) {
fmt.Println("save to kafka")
jsonString, err := json.Marshal(job)
jobString := string(jsonString)
fmt.Print(jobString)
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
panic(err)
}
// Produce messages to topic (asynchronously)
topic := "jobs-topic1"
for
, word := range []string{string(jobString)} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
}
这里是微服务 2 的代码。在这个代码中最重要的东西是从 Kafka 中消费数据,保存部分我已经在前面的博客文章中讨论过了。这里代码的重点部分是从 Kafka 中消费数据:

kafka-to-mongo/kafka-mongo-sample.go

func main() {
//Create MongoDB session
session := initialiseMongo()
mongoStore.session = session
receiveFromKafka()
}
func receiveFromKafka() {
fmt.Println("Start receiving from Kafka")
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "group-id-1",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"jobs-topic1"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))
job := string(msg.Value)
saveJobToMongo(job)
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
break
}
}
c.Close()
}
func saveJobToMongo(jobString string) {
fmt.Println("Save to MongoDB")
col := mongoStore.session.DB(database).C(collection)
//Save data into Job struct
var _job Job
b := []byte(jobString)
err := json.Unmarshal(b, &_job)
if err != nil {
panic(err)
}
//Insert job into MongoDB
errMongo := col.Insert(_job)
if errMongo != nil {
panic(errMongo)
}
fmt.Printf("Saved to MongoDB : %s", jobString)
}
我们来演示一下,运行微服务 1。确保 Kafka 已经运行了。

$ go run rest-kafka-sample.go
我使用 Postman 向微服务 1 发送数据。

图片描述(最多50字)

这里是日志,你可以在微服务 1 中看到。当你看到这些的时候,说明已经接收到了来自 Postman 发送的数据,并且已经保存到了 Kafka。

图片描述(最多50字)

因为我们尚未运行微服务 2,数据被微服务 1 只保存在了 Kafka。我们来消费它并通过运行的微服务 2 来将它保存到 MongoDB。

$ go run kafka-mongo-sample.go
现在,你将在微服务 2 上看到消费的数据,并将它保存到了 MongoDB。

图片描述(最多50字)

检查一下数据是否保存到了 MongoDB。如果有数据,我们成功了!

图片描述(最多50字)
欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 855835163
群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

原文地址:http://blog.51cto.com/13952953/2298377

时间: 2024-10-21 14:48:07

使用 Kafka 和 MongoDB 进行 Go 异步处理的相关文章

【JAVA版】Storm程序整合Kafka、Mongodb示例及部署

一.环境 一台Centos6.5主机 Mongo 3.0 Kafka_2.11-0.8.2.1 Storm-0.9.5 Zookeeper-3.4.6 java 1.7 (后因在mac上打包的jar由1.8编译没法儿运行,改为java 1.8) 其余环境暂略 二.运行启动 启动zookeeper 确认配置正确,配置相关可自行搜索. [[email protected] zookeeper-3.4.6]#pwd /data0/xxx/zookeeper-3.4.6 [[email protecte

大数据架构开发 挖掘分析 Hadoop HBase Hive Storm Spark Sqoop Flume ZooKeeper Kafka Redis MongoDB 机器学习 云计算 视频教程

培训大数据架构开发.挖掘分析! 从零基础到高级,一对一培训![技术QQ:2937765541] ------------------------------------------------------------------------------------------------------------------------------------------- 课程体系: 获取视频资料和培训解答技术支持地址 课程展示(大数据技术很广,一直在线为你培训解答!):    获取视频资料和培

Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程

线上某服务 A 调用服务 B 接口完成一次交易,一次晚上的生产变更之后,系统监控发现服务 B 接口频繁超时,后续甚至返回线程池耗尽错误 Thread pool is EXHAUSTED.因为服务 B 依赖外部接口,刚开始误以为外部接口延时导致,所以临时增加服务 B dubbo 线程池线程数量.配置变更之后,重启服务,服务恢复正常.一段时间之后,服务 B 再次返回线程池耗尽错误.这次深入排查问题之后,才发现 Kafka 异步发送消息阻塞了 dubbo 线程,从而导致调用超时. 一.问题分析 Dub

Kafka设计解析:Kafka High Availability

Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务.若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失.而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高.因此,Kafka从0.8开始提供High Availability机制.本文从Data Replic

[Big Data - Kafka] Kafka设计解析(二):Kafka High Availability (上)

Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务.若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失.而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高.因此,Kafka从0.8开始提供High Availability机制.本文从Data Replic

【转载】Kafka High Availability

http://www.haokoo.com/internet/2877400.html Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务.若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失.而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高.因此,Kaf

转:Kafka设计解析(二):Kafka High Availability (上)

Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务.若该Broker永远不能再恢 复,亦或磁盘故障,则其上数据将丢失.而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或 者多台机器宕机的可能性大大提高,对Failover要求非常高.因此,Kafka从0.8开始提供High Availability机制.本文从Data Repl

Kafka的分布式架构设计与High Availability机制

作者:Wang, Josh 一.Kafka的基本概述   1.Kafka是什么? Kafka官网上对Kafka的定义叫:Adistributed publish-subscribe messaging system.publish-subscribe是发布和订阅的意思,所以准确的说Kafka是一个消息订阅和发布的系统.最初,Kafka实际上是LinkedIn用于日志处理的分布式消息队列,LinkedIn的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录.浏览.点击.分享.喜欢

[原创]MongoDB C++ 驱动部分问题解决方案(MongoDB C++ Driver)

本文为我长时间开发以及修改MongoDB C++ Driver时的一些问题和解决方案.目前本文所介绍的相关引擎也已经发布闭源版本,请自行下载 库版本以及相关位置:http://code.google.com/p/mongodb-cpp-engine/ Q & A1.C++版本驱动我们需要另行封装么?A:并非一定要做此类行为,不过如果我们增加和使用一个连接池其稳定性将远远优于我们的想想. 2.C++版本的驱动除了官方版本外,还有什么版本么?A:目前未知,但我已经完成了一版线程安全的版本,下载地址: