Go项目实战:打造高并发日志采集系统(六)

前情回顾

前文我们完成了日志采集系统的日志文件监控,配置文件热更新,协程异常检测和保活机制。

本节目标

本节加入kafka消息队列,kafka前文也介绍过了,可以对消息进行排队,解耦合和流量控制的作用,为什么一定要用kafka呢?主要原因就是在日志高并发读取后,如果直接将消息发给前端或者写入数据库,会造成崩溃或者卡死。kafka可以对消息进行排队和减轻压力,这样无论以后将这些消息录入数据库也好,传给前端分析也好,都能保证系统稳定性。代码我们也写过和测试了,只需要将之前写好的kafka读写消息代码整合过来即可。

主函数创建kafka生产者

在主函数中创建kafkaProducer,然后在defer中回收该资源。我们将该producer传递给每个监控日志的协程中,当日志有修改,就通过producer将修改的信息写入kafka,用kafka排队和缓存,可以提高稳定性,减少流量高峰。

func main() {
	//省略...
	kafkaProducer := &kafkaqueue.ProducerKaf{Producer: producer}
	configMgr = make(map[string]*logconfig.ConfigData)
	keyChan := make(chan string, KEYCHANSIZE)
	ConstructMgr(configPaths, keyChan, kafkaProducer)

	defer func() {
		mainOnce.Do(func() {
            //省略...
			kafkaProducer.Producer.Close()
		})
	}()

	for {
		select {
		case pathData, ok := <-pathChan:
			if !ok {
				return
			}
			 //省略...
			for conkey, conval := range pathDataNew {
				oldval, ok := configMgr[conkey]
				if !ok {
					//省略...
					go logtailf.WatchLogFile(configData.ConfigKey, configData.ConfigValue,
						ctx, keyChan, kafkaProducer)
					continue
				}

                if oldval.ConfigValue != conval.(string) {
				    //省略...
					go logtailf.WatchLogFile(conkey, conval.(string),
						ctx, keyChan, kafkaProducer)
					continue
				}

			}	

        case keystr := <-keyChan:
			val, ok := configMgr[keystr]
			if !ok {
				continue
			}
			 //省略...
			go logtailf.WatchLogFile(keystr, val.ConfigValue,
				ctxcover, keyChan, kafkaProducer)
		}
	}
}

WatchLogFile函数携带了该producer。有人会问多个协程共享producer是否会出问题?我查看了Producer发送消息的源码

红框中使用了chan传递数据,所以在多个协程调用producer的发送函数是没问题的。

监控协程写入kafka消息

当日志新增时,我们在监控日志的协程向kafka写入消息

func WatchLogFile(pathkey string, datapath string, ctx context.Context, keychan chan<- string, kafProducer *kafkaqueue.ProducerKaf) {
    //省略逻辑...
    for true {
		select {
		case msg, ok := <-tailFile.Lines:
			//省略逻辑...
			kafProducer.PutIntoKafka(pathkey, msg.Text)
		case <-ctx.Done():
			fmt.Println("receive main gouroutine exit msg")
			fmt.Println("watch log file ", pathkey, " goroutine exited")
			return
		}

	}
}

封装kafkaProducer

上述代码中调用的kafkaProducer是我自己封装的,其实就是组合了原生的kafka生产者,并且封装了发送函数

func CreateKafkaProducer() (sarama.SyncProducer, error) {
	config := sarama.NewConfig()

	// 等待服务器所有副本都保存成功后的响应
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	// 是否等待成功和失败后的响应
	config.Producer.Return.Successes = true

	// 使用给定代理地址和配置创建一个同步生产者
	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		fmt.Println("create producer failed, ", err.Error())
		return nil, err
	}
	fmt.Println("create kafka producer success")

	return producer, nil
}

  上面的函数返回了原生的kafka生产者接口,接下来我们封装这个原生接口,然后编写了写入kafka的方法

type ProducerKaf struct {
	Producer sarama.SyncProducer
}

func (p *ProducerKaf) PutIntoKafka(keystr string, valstr string) {
	//构建发送的消息,
	msg := &sarama.ProducerMessage{
		Topic: "logcatchsys",
		Key:   sarama.StringEncoder(keystr),
		Value: sarama.StringEncoder(valstr),
	}
	partition, offset, err := p.Producer.SendMessage(msg)

	if err != nil {
		fmt.Println("Send message Fail")
		fmt.Println(err.Error())
	}
	fmt.Printf("Partition = %d, offset=%d, msgvalue=%s \n", partition, offset, valstr)

}

  

启动kafka测试

我们先启动zookeeper和kafka
zookeeper进入bin文件夹点击zkServer.cmd即可启动
kafka启动使用如下命令

.\bin\windows\kafka-server-start.bat .\config\server.properties

然后我们创建主题logcatchsys

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 16 --topic logcatchsys

这样我们为主题logcatchsys创建了16个分区。
接下来我们启动消费者

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic logcatchsys --from-beginning

然后我们启动我们的采集系统和测死脚本,看到如下

可以看到当日志文件不断被写入时,我们的采集系统会将修改的内容实时监控并写入kafka队列,然后kafka消费者从队列中取出这些消息。

总结

目前完成了日志采集系统所有功能的开发和测试,包括配置文件的热更新,监控协程的自动关闭和启动,异常修复和自启动,日志消息的监听,
kafka消息的读写等。但这并不是终点,只是一个起点,以后会配合前端开发不断完善,目前先告一段落。
源码下载
https://github.com/secondtonone1/golang-/tree/master/logcatchsys
感谢关注公众号

原文地址:https://www.cnblogs.com/secondtonone1/p/11996612.html

时间: 2024-10-12 19:19:01

Go项目实战:打造高并发日志采集系统(六)的相关文章

Go项目实战:打造高并发日志采集系统(三)

前文中已经完成了文件的监控,kafka信息读写,今天主要完成配置文件的读写以及热更新.并且规划一下系统的整体结构,然后将之前的功能串起来形成一套完整的日志采集系统. 前情提要 上一节我们完成了如下目标1 完成kafka消息读写2 借助tailf实现文件监控,并模拟测试实时写文件以及文件备份时功能无误. 本节目标 1 编写系统结构,在主函数中加载配置2 管理配置文件,实现热更新 实现文件管理,支持热更新 golang中vipper库提供了配置文件的读取和监控功能,我们可以监控配置文件从而实现热更新

java架构师大型分布式综合项目实战,高并发,集群,高可用,程序设计,性能优化,架构设计,负载均衡,大数据量

* { font-family: "Microsoft YaHei" !important } h1 { color: #FF0 } 15套java架构师.集群.高可用.高可扩 展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布 式项目实战视频教程 视频课程包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat

java架构师课程、性能调优、高并发、tomcat负载均衡、大型电商项目实战、高可用、高可扩展、数据库架构设计、Solr集群与应用、分布式实战、主从复制、高可用集群、大数据

15套Java架构师详情 * { font-family: "Microsoft YaHei" !important } h1 { background-color: #006; color: #FF0 } 15套java架构师.集群.高可用.高可扩展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布式项目实战视频教程 视频课程包含: 高级Java架构师包含:Spring boot.Spring  clo

系统架构~高并发日志系统设计

对于一个项目来说,日志是必须的,一般日志的持久化方式有文件和数据库,而在多数情况下,我们都采用文件系统来实现,而对于高并发的情况下,频繁进行I/O操作,对系统的性能肯定是有影响的,这个毋庸置疑!针对这种高并发的场合,我们采用一种缓存队列的方式来处理这个Case是比较明智的,本文主要是向各位展现一下,我所设计的<高并发日志系统设计>,如在功能上有什么需要改进的地方,欢迎各位来回复. 一 项目结构图 二 项目实现代码 /// <summary> /// 工作任务基类 /// </

【实战Java高并发程序设计 4】数组也能无锁:AtomicIntegerArray

除了提供基本数据类型外,JDK还为我们准备了数组等复合结构.当前可用的原子数组有:AtomicIntegerArray.AtomicLongArray和AtomicReferenceArray,分别表示整数数组.long型数组和普通的对象数组. 这里以AtomicIntegerArray为例,展示原子数组的使用方式. AtomicIntegerArray本质上是对int[]类型的封装.使用Unsafe类通过CAS的方式控制int[]在多线程下的安全性.它提供了以下几个核心API: //获得数组第

【实战Java高并发程序设计 3】带有时间戳的对象引用:AtomicStampedReference

[实战Java高并发程序设计 1]Java中的指针:Unsafe类 [实战Java高并发程序设计 2]无锁的对象引用:AtomicReference AtomicReference无法解决上述问题的根本是因为对象在修改过程中,丢失了状态信息.对象值本身与状态被画上了等号.因此,我们只要能够记录对象在修改过程中的状态值,就可以很好的解决对象被反复修改导致线程无法正确判断对象状态的问题. AtomicStampedReference正是这么做的.它内部不仅维护了对象值,还维护了一个时间戳(我这里把它

【实战Java高并发程序设计 5】让普通变量也享受原子操作

[实战Java高并发程序设计 1]Java中的指针:Unsafe类 [实战Java高并发程序设计 2]无锁的对象引用:AtomicReference [实战Java高并发程序设计 3]带有时间戳的对象引用:AtomicStampedReference [实战Java高并发程序设计 4]数组也能无锁:AtomicIntegerArray 有时候,由于初期考虑不周,或者后期的需求变化,一些普通变量可能也会有线程安全的需求.如果改动不大,我们可以简单地修改程序中每一个使用或者读取这个变量的地方.但显然

【实战Java高并发程序设计 7】让线程之间互相帮助--SynchronousQueue的实现

[实战Java高并发程序设计 1]Java中的指针:Unsafe类 [实战Java高并发程序设计 2]无锁的对象引用:AtomicReference [实战Java高并发程序设计 3]带有时间戳的对象引用:AtomicStampedReference [实战Java高并发程序设计 4]数组也能无锁:AtomicIntegerArray [实战Java高并发程序设计 5]让普通变量也享受原子操作 [实战Java高并发程序设计6]挑战无锁算法:无锁的Vector实现 在对线程池的介绍中,提到了一个非

我的《实战java高并发程序设计》纸质书上市了

在过去单核CPU时代,单任务在一个时间点只能执行单一程序,随着多核CPU的发展,并行程序开发就显得尤为重要. <实战Java高并发程序设计>主要介绍基于Java的并行程序设计基础.思路.方法和实战.首先,立足于并发程序基础,详细介绍Java中进行并行程序设计的基本方法.第二,进一步详细介绍JDK中对并行程序的强大支持,帮助读者快速.稳健地进行并行程序开发.第三,详细讨论有关"锁"的优化和提高并行程序性能级别的方法和思路.第四,介绍并行的基本设计模式及Java8对并行程序的支