Go项目实战:打造高并发日志采集系统(三)

前文中已经完成了文件的监控,kafka信息读写,今天主要完成配置文件的读写以及热更新。并且规划一下系统的整体结构,然后将之前的功能串起来形成一套完整的日志采集系统。

前情提要

上一节我们完成了如下目标
1 完成kafka消息读写
2 借助tailf实现文件监控,并模拟测试实时写文件以及文件备份时功能无误。

本节目标

1 编写系统结构,在主函数中加载配置
2 管理配置文件,实现热更新

实现文件管理,支持热更新

golang中vipper库提供了配置文件的读取和监控功能,我们可以监控配置文件从而实现热更新。
先规划下日志采集系统的目录结构

logcatchsys为项目根目录,其下logcatchsys文件夹中main.go为系统启动的主函数,该文件加载配置,根据配置启动协程,监控指定目录的日志文件,当配置更新时,main做热更新,如果路径从配置中删除,则中止对应的监控协程。如果有新的路径添加到配置文件,则启动协程监控,如果路径有修改,则中止原路径协程,启动新的协程监听修改后的路径。
logconfig为配置存放的路径,logconfig.go主要负责配置的管理,包括监控。

var onceLogConf sync.Once

type ConfigData struct {
	ConfigKey    string
	ConfigValue  string
	ConfigCancel context.CancelFunc
}

在logconfig.go中定义了once操作的变量onceLogConf,该变量保证监控配置的协程退出后只执行一次析构。

ConfigData结构体存储了配置文件中路径的信息,ConfigKey表示路径名,ConfigValue表示路径值,ConfigCancel存储上下文的CancelFunc,因为一个路径对应一个日志文件,监控日志文件就要开启协程,我是通过context管理监控日志的协程的。
在config.yaml中记录的路径信息如下:

configpath:
  logdir1: "../logdir1/log.txt"
  logdir2: "../logdir2/log.txt"
  logdir3: "../logdir3/log.txt"
  logdir5: "../logdir3/log.txt"

logdir1对应ConfigKey

../logdir1/log.txt对应ConfigValue
接下来在logconfig.go中我实现了配置文件的加载

func ReadConfig(v *viper.Viper) (interface{}, bool) {
	//设置读取的配置文件
	v.SetConfigName("config")
	//添加读取的配置文件路径
	_, filename, _, _ := runtime.Caller(0)
	fmt.Println(filename)
	fmt.Println(path.Dir(filename))
	v.AddConfigPath(path.Dir(filename))
	//设置配置文件类型
	v.SetConfigType("yaml")
	if err := v.ReadInConfig(); err != nil {
		fmt.Printf("err:%s\n", err)
		return nil, false
	}

	configPaths := v.Get("configpath")
	if configPaths == nil {
		return nil, false
	}

	return configPaths, true
}

以及配置文件的监听

func WatchConfig(ctx context.Context, v *viper.Viper, pathChan chan interface{}) {

	defer func() {
		onceLogConf.Do(func() {
			fmt.Println("watch config goroutine exit")
			if err := recover(); err != nil {
				fmt.Println("watch config goroutine panic ", err)
			}
			close(pathChan)
		})
	}()

	//设置监听回调函数
	v.OnConfigChange(func(e fsnotify.Event) {
		//fmt.Printf("config is change :%s \n", e.String())
		configPaths := v.Get("configpath")
		if configPaths == nil {
			return
		}
		pathChan <- configPaths
	})
	//开始监听
	v.WatchConfig()
	//信道不会主动关闭,可以主动调用cancel关闭
	<-ctx.Done()
}

当配置文件config.yaml有变动时,OnConfigChange传入的匿名函数会触发,从而将configpath节点的value传入chan中,这样main函数可以从外部获取最新的配置文件。

ctx为上下文,当main函数执行上下文中止时,监控配置的协程会自动退出。

根据配置变动,实现热更新

在main.go中,定义了mainOnce控制主协程资源析构,并且通过ConstructMgr全局函数构造configMgr这样的map记录最新的配置信息。

var mainOnce sync.Once
var configMgr map[string]*logconfig.ConfigData

func ConstructMgr(configPaths interface{}) {
	configDatas := configPaths.(map[string]interface{})
	for conkey, confval := range configDatas {
		configData := new(logconfig.ConfigData)
		configData.ConfigKey = conkey
		configData.ConfigValue = confval.(string)
		_, cancel := context.WithCancel(context.Background())
		configData.ConfigCancel = cancel
		configMgr[conkey] = configData
	}
}

接下来实现主函数

func main() {
	v := viper.New()
	configPaths, confres := logconfig.ReadConfig(v)
	if configPaths == nil || !confres {
		fmt.Println("read config failed")
		return
	}
	configMgr = make(map[string]*logconfig.ConfigData)
	ConstructMgr(configPaths)
	ctx, cancel := context.WithCancel(context.Background())
	pathChan := make(chan interface{})
	go logconfig.WatchConfig(ctx, v, pathChan)
	defer func() {
		mainOnce.Do(func() {
			if err := recover(); err != nil {
				fmt.Println("main goroutine panic ", err) // 这里的err其实就是panic传入的内容
			}
			cancel()
		})
	}()

  

在主函数中读取配置,并且将配置的路径信息存储在configMgr中。接着启动了一个协程用来监控配置文件,并且我实现了主协程的资源回收。

我们在main中继续添加接受监控协程的数据逻辑。

for {
		select {
		case pathData, ok := <-pathChan:
			if !ok {
				return
			}
			fmt.Println("main goroutine receive pathData")
			fmt.Println(pathData)
			pathDataNew := pathData.(map[string]interface{})

			for oldkey, oldval := range configMgr {
				_, ok := pathDataNew[oldkey]
				if ok {
					continue
				}
				oldval.ConfigCancel()
				delete(configMgr, oldkey)
			}

			for conkey, conval := range pathDataNew {
				oldval, ok := configMgr[conkey]
				if !ok {
					configData := new(logconfig.ConfigData)
					configData.ConfigKey = conkey
					configData.ConfigValue = conval.(string)
					_, cancel := context.WithCancel(context.Background())
					configData.ConfigCancel = cancel
					configMgr[conkey] = configData
					continue
				}

				if oldval.ConfigValue != conval.(string) {
					oldval.ConfigValue = conval.(string)
					oldval.ConfigCancel()
					_, cancel := context.WithCancel(context.Background())
					oldval.ConfigCancel = cancel
					continue
				}

			}

			for mgrkey, mgrval := range configMgr {
				fmt.Println(mgrkey)
				fmt.Println(mgrval)
			}
		}
	}
}

  

主协程接受数据后对比新旧数据,将旧的配置中被删除的路径剔除,增加和修改新的路径。

日志监控留给之后处理,这里打印下更新后的配置信息。
整体运行下main函数,然后我们手动修改config.yaml,将logdir4修改为logdir5,可以看到如下信息

证明我们的热更新处理逻辑没有问题。下一节基于现有的逻辑,添加日志文件的监控处理,启动多个协程管理日志文件。
源码下载
https://github.com/secondtonone1/golang-/tree/master/logcatchsys
我的公众号

原文地址:https://www.cnblogs.com/secondtonone1/p/11971309.html

时间: 2024-10-10 19:09:52

Go项目实战:打造高并发日志采集系统(三)的相关文章

Go项目实战:打造高并发日志采集系统(六)

前情回顾 前文我们完成了日志采集系统的日志文件监控,配置文件热更新,协程异常检测和保活机制. 本节目标 本节加入kafka消息队列,kafka前文也介绍过了,可以对消息进行排队,解耦合和流量控制的作用,为什么一定要用kafka呢?主要原因就是在日志高并发读取后,如果直接将消息发给前端或者写入数据库,会造成崩溃或者卡死.kafka可以对消息进行排队和减轻压力,这样无论以后将这些消息录入数据库也好,传给前端分析也好,都能保证系统稳定性.代码我们也写过和测试了,只需要将之前写好的kafka读写消息代码

java架构师大型分布式综合项目实战,高并发,集群,高可用,程序设计,性能优化,架构设计,负载均衡,大数据量

* { font-family: "Microsoft YaHei" !important } h1 { color: #FF0 } 15套java架构师.集群.高可用.高可扩 展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布 式项目实战视频教程 视频课程包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat

java架构师课程、性能调优、高并发、tomcat负载均衡、大型电商项目实战、高可用、高可扩展、数据库架构设计、Solr集群与应用、分布式实战、主从复制、高可用集群、大数据

15套Java架构师详情 * { font-family: "Microsoft YaHei" !important } h1 { background-color: #006; color: #FF0 } 15套java架构师.集群.高可用.高可扩展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布式项目实战视频教程 视频课程包含: 高级Java架构师包含:Spring boot.Spring  clo

系统架构~高并发日志系统设计

对于一个项目来说,日志是必须的,一般日志的持久化方式有文件和数据库,而在多数情况下,我们都采用文件系统来实现,而对于高并发的情况下,频繁进行I/O操作,对系统的性能肯定是有影响的,这个毋庸置疑!针对这种高并发的场合,我们采用一种缓存队列的方式来处理这个Case是比较明智的,本文主要是向各位展现一下,我所设计的<高并发日志系统设计>,如在功能上有什么需要改进的地方,欢迎各位来回复. 一 项目结构图 二 项目实现代码 /// <summary> /// 工作任务基类 /// </

【实战Java高并发程序设计 4】数组也能无锁:AtomicIntegerArray

除了提供基本数据类型外,JDK还为我们准备了数组等复合结构.当前可用的原子数组有:AtomicIntegerArray.AtomicLongArray和AtomicReferenceArray,分别表示整数数组.long型数组和普通的对象数组. 这里以AtomicIntegerArray为例,展示原子数组的使用方式. AtomicIntegerArray本质上是对int[]类型的封装.使用Unsafe类通过CAS的方式控制int[]在多线程下的安全性.它提供了以下几个核心API: //获得数组第

【实战Java高并发程序设计 3】带有时间戳的对象引用:AtomicStampedReference

[实战Java高并发程序设计 1]Java中的指针:Unsafe类 [实战Java高并发程序设计 2]无锁的对象引用:AtomicReference AtomicReference无法解决上述问题的根本是因为对象在修改过程中,丢失了状态信息.对象值本身与状态被画上了等号.因此,我们只要能够记录对象在修改过程中的状态值,就可以很好的解决对象被反复修改导致线程无法正确判断对象状态的问题. AtomicStampedReference正是这么做的.它内部不仅维护了对象值,还维护了一个时间戳(我这里把它

【实战Java高并发程序设计 5】让普通变量也享受原子操作

[实战Java高并发程序设计 1]Java中的指针:Unsafe类 [实战Java高并发程序设计 2]无锁的对象引用:AtomicReference [实战Java高并发程序设计 3]带有时间戳的对象引用:AtomicStampedReference [实战Java高并发程序设计 4]数组也能无锁:AtomicIntegerArray 有时候,由于初期考虑不周,或者后期的需求变化,一些普通变量可能也会有线程安全的需求.如果改动不大,我们可以简单地修改程序中每一个使用或者读取这个变量的地方.但显然

【实战Java高并发程序设计 7】让线程之间互相帮助--SynchronousQueue的实现

[实战Java高并发程序设计 1]Java中的指针:Unsafe类 [实战Java高并发程序设计 2]无锁的对象引用:AtomicReference [实战Java高并发程序设计 3]带有时间戳的对象引用:AtomicStampedReference [实战Java高并发程序设计 4]数组也能无锁:AtomicIntegerArray [实战Java高并发程序设计 5]让普通变量也享受原子操作 [实战Java高并发程序设计6]挑战无锁算法:无锁的Vector实现 在对线程池的介绍中,提到了一个非

我的《实战java高并发程序设计》纸质书上市了

在过去单核CPU时代,单任务在一个时间点只能执行单一程序,随着多核CPU的发展,并行程序开发就显得尤为重要. <实战Java高并发程序设计>主要介绍基于Java的并行程序设计基础.思路.方法和实战.首先,立足于并发程序基础,详细介绍Java中进行并行程序设计的基本方法.第二,进一步详细介绍JDK中对并行程序的强大支持,帮助读者快速.稳健地进行并行程序开发.第三,详细讨论有关"锁"的优化和提高并行程序性能级别的方法和思路.第四,介绍并行的基本设计模式及Java8对并行程序的支