第11天 日志收集系统kafka库实战

本节主要内容:

1. 日志收集系统设计
2. 日志客户端开发

1. 项目背景
    a. 每个系统都有日志,当系统出现问题时,需要通过日志解决问题
    b. 当系统机器比较少时,登陆到服务器上查看即可满足
    c. 当系统机器规模巨大,登陆到机器上查看几乎不现实

2. 解决方案
    a. 把机器上的日志实时收集,统一的存储到中心系统
    b. 然后再对这些日志建立索引,通过搜索即可以找到对应日志
    c. 通过提供界面友好的web界面,通过web即可以完成日志搜索

3. 面临的问题
    a. 实时日志量非常大,每天几十亿条
    b. 日志准实时收集,延迟控制在分钟级别
    c. 能够水平可扩展

4. 业界方案ELK

日志收集系统架构

该方案问题:

a. 运维成本高,每增加一个日志收集,都需要手动修改配置
    b. 监控缺失,无法准确获取logstash的状态
    c. 无法做定制化开发以及维护

6. 日志收集系统设计

各组件介绍:
    a. Log Agent,日志收集客户端,用来收集服务器上的日志
    b. Kafka,高吞吐量的分布式队列,linkin开发,apache顶级开源项目
    c. ES,elasticsearch,开源的搜索引擎,提供基于http restful的web接口
    d. Hadoop,分布式计算框架,能够对大量数据进行分布式处理的平台

7. kafka应用场景
    1. 异步处理, 把非关键流程异步化,提高系统的响应时间和健壮性

2. 应用解耦,通过消息队列

3. 流量削峰3. 流量削峰

 8. zookeeper应用场景

1. 服务注册&服务发现

2. 配置中心

3. 分布式锁

  • Zookeeper是强一致的
  • 多个客户端同时在Zookeeper上创建相同znode,只有一个创建成功

 9. 安装kafka

见博客:https://www.cnblogs.com/xuejiale/p/10505391.html

10. log agent设计

11. log agent流程

11. kafka示例代码

先导入第三方包:

github.com/Shopify/sarama

我的kafka和ZooKeeper都安装在Linux(Centos6.5,ip: 192.168.30.136)上:

 1 package main
 2
 3 import (
 4     "fmt"
 5     "time"
 6     "github.com/Shopify/sarama"
 7 )
 8
 9 func main() {
10
11     config := sarama.NewConfig()
12     config.Producer.RequiredAcks = sarama.WaitForAll
13     config.Producer.Partitioner = sarama.NewRandomPartitioner
14     config.Producer.Return.Successes = true
15
16     client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config)
17     if err != nil {
18         fmt.Println("producer close, err:", err)
19         return
20     }
21
22     defer client.Close()
23     for {
24         msg := &sarama.ProducerMessage{}
25         msg.Topic = "nginx_log"
26         msg.Value = sarama.StringEncoder("this is a good test, my message is good")
27
28         pid, offset, err := client.SendMessage(msg)
29         if err != nil {
30             fmt.Println("send message failed,", err)
31             return
32         }
33
34         fmt.Printf("pid:%v offset:%v\n", pid, offset)
35         time.Sleep(time.Second)
36     }
37 }

kafka示例

Windows启动程序往Linux上的kafka发送数据:

Linux上的kafka接收数据:

12. tailf组件使用

    先导入第三方包:

github.com/hpcloud/tail

 1 package main
 2
 3 import (
 4     "fmt"
 5     "github.com/hpcloud/tail"
 6     "time"
 7 )
 8 func main() {
 9     filename := "F:\\Go\\project\\src\\go_dev\\logCollect\\tailf\\my.log"
10     tails, err := tail.TailFile(filename, tail.Config{
11         ReOpen:    true,
12         Follow:    true,
13         //Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
14         MustExist: false,
15         Poll:      true,
16     })
17     if err != nil {
18         fmt.Println("tail file err:", err)
19         return
20     }
21     var msg *tail.Line
22     var ok bool
23     for {
24         msg, ok = <-tails.Lines
25         if !ok {
26             fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
27             time.Sleep(100 * time.Millisecond)
28             continue
29         }
30         fmt.Println("msg:", msg)
31     }
32 }

tailf示例代码

my.log文件内容(unix格式):

在Windows上,当我的上面代码里日志文件(my.log)为Windows格式,代码执行结果如下:

当时用notepade++将文件格式转换为Unix格式,执行代码结果如下:

注意:最后一行必须有换行符,否则该行无法读取。

13. 配置文件库使用

先导入第三方包:

github.com/astaxie/beego/config

1) 初始化配置库

iniconf, err := NewConfig("ini", "testini.conf")
if err != nil {
    log.Fatal(err)
}

2) 读取配置项

String(key string) string
Int(key string) (int, error)
Int64(key string) (int64, error)
Bool(key string) (bool, error)
Float(key string) (float64, error)

例如:

iniconf.String("server::listen_ip")
iniconf.Int("server::listen_port")

[server]
listen_ip = "0.0.0.0"
listen_port = 8080

[logs]
log_level=debug
log_path=./logs/logagent.log

[collect]
log_path=/home/work/logs/nginx/access.log
topic=nginx_log

 1 package main
 2
 3 import (
 4     "fmt"
 5     "github.com/astaxie/beego/config"
 6 )
 7
 8 func main() {
 9     conf, err := config.NewConfig("ini", "./logcollect.conf")
10     if err != nil {
11         fmt.Println("new config failed, err:", err)
12         return
13     }
14
15     port, err := conf.Int("server::listen_port")
16     if err != nil {
17         fmt.Println("read server:port failed, err:", err)
18         return
19     }
20
21     fmt.Println("Port:", port)
22     log_level := conf.String("log::log_level")
23     if err != nil {
24         fmt.Println("read log_level failed, ", err)
25         return
26     }
27     fmt.Println("log_level:", log_level)
28
29     log_path := conf.String("log::log_path")
30     fmt.Println("log_path:", log_path)
31 }

config示例代码

配置文件内容:

[server]
listen_ip = "0.0.0.0"
listen_port = 8080

[log]
log_level=debug
log_path=./logs/logagent.log

[collect]
log_path=/home/work/logs/nginx/access.log
topic=nginx_log

执行结果:

14. 日志库的使用

先导入第三方包:

github.com/astaxie/beego/logs

1) 配置log组件

config := make(map[string]interface{})
config["filename"] = "./logs/logcollect.log"
config["level"] = logs.LevelDebug

configStr, err := json.Marshal(config)
if err != nil {
    fmt.Println("marshal failed, err:", err)
    return
}

2) 初始化日志组件

logs.SetLogger(“file”, string(configStr))

 1 package main
 2
 3 import (
 4     "encoding/json"
 5     "fmt"
 6     "github.com/astaxie/beego/logs"
 7 )
 8
 9 func main() {
10     config := make(map[string]interface{})
11     config["filename"] = "./logcollect.log"
12     config["level"] = logs.LevelDebug
13
14     configStr, err := json.Marshal(config)
15     if err != nil {
16         fmt.Println("marshal failed, err:", err)
17         return
18     }
19
20     logs.SetLogger(logs.AdapterFile, string(configStr))
21
22     logs.Debug("this is a test, my name is %s", "stu01")
23     logs.Trace("this is a trace, my name is %s", "stu02")
24     logs.Warn("this is a warn, my name is %s", "stu03")
25 }

logs示例

15. 日志收集项目整体实现

原文地址:https://www.cnblogs.com/xuejiale/p/10657989.html

时间: 2024-08-01 10:40:59

第11天 日志收集系统kafka库实战的相关文章

ELK+kafka构建日志收集系统

ELK+kafka构建日志收集系统 原文  http://lx.wxqrcode.com/index.php/post/101.html 背景: 最近线上上了ELK,但是只用了一台Redis在中间作为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,并且Redis作为消息队列并不是它的强项:所以最近将Redis换成了专业的消息信息发布订阅系统Kafka, Kafka的更多介绍大家可以看这里: 传送门 ,关于ELK的知识网上有很多的哦, 此篇博客主要是总结一下目前线上这个

一共81个,开源大数据处理工具汇总(下),包括日志收集系统/集群管理/RPC等

作者:大数据女神-诺蓝(微信公号:dashujunvshen).本文是36大数据专稿,转载必须标明来源36大数据. 接上一部分:一共81个,开源大数据处理工具汇总(上),第二部分主要收集整理的内容主要有日志收集系统.消息系统.分布式服务.集群管理.RPC.基础设施.搜索引擎.Iaas和监控管理等大数据开源工具. 日志收集系统 一.Facebook Scribe 贡献者:Facebook 简介:Scribe是Facebook开源的日志收集系统,在Facebook内部已经得到大量的应用.它能够从各种

[转载] 一共81个,开源大数据处理工具汇总(下),包括日志收集系统/集群管理/RPC等

原文: http://www.36dsj.com/archives/25042 接上一部分:一共81个,开源大数据处理工具汇总(上),第二部分主要收集整理的内容主要有日志收集系统.消息系统.分布式服务.集群管理.RPC.基础设施.搜索引擎.Iaas和监控管理等大数据开源工具. 日志收集系统 一.Facebook Scribe 贡献者:Facebook 简介:Scribe是Facebook开源的日志收集系统,在Facebook内部已经得到大量的应用.它能够从各种日志源上收集日志,存储到一个中央存储

日志收集之kafka

日志收集之kafka http://www.jianshu.com/p/f78b773ddde5 一.介绍 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输 同时支持离线数据处理和实时数据处

用fabric部署维护kle日志收集系统

最近搞了一个logstash kafka elasticsearch kibana 整合部署的日志收集系统.部署参考lagstash + elasticsearch + kibana 3 + kafka 日志管理系统部署 02 上线过程中有一些环节,觉得还是值的大家注意的比如: 1,应用运维和研发人员要讨论一下日志格式的定义, 2,在logstash取日志和消费端logstash消费日志麻.过滤日志的时候怎么要高效,避免服务本身告成系统压力过大,如果每天要处理过亿日志量,性能不注意,哈哈,可以使

分布式日志收集系统Apache Flume的设计详细介绍

问题导读: 1.Flume传输的数据的基本单位是是什么? 2.Event是什么,流向是怎么样的? 3.Source:完成对日志数据的收集,分成什么打入Channel中? 4.Channel的作用是什么? 5.取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器,由谁来完成? 6.Flume支那些数据格式? 7.对于直接读取文件Source,有两种方式,分别是什么? 8.Channel有多种方式有哪些方式? 概述Flume是Cloudera公司的一款高性能.高可能的分布

基于Flume的美团日志收集系统(一)架构和设计【转】

美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流.美团的日志收集系统基于Flume设计和搭建而成. <基于Flume的美团日志收集系统>将分两部分给读者呈现美团日志收集系统的架构设计和实战经验. 第一部分架构和设计,将主要着眼于日志收集系统整体的架构设计,以及为什么要做这样的设计. 第二部分改进和优化,将主要着眼于实际部署和使用过程中遇到的问题,对Flume做的功能修改和优化等. 1 日志收集系统简介 日志收集是大数据的基石.

Flume日志收集系统架构详解--转

2017-09-06 朱洁 大数据和云计算技术 任何一个生产系统在运行过程中都会产生大量的日志,日志往往隐藏了很多有价值的信息.在没有分析方法之前,这些日志存储一段时间后就会被清理.随着技术的发展和分析能力的提高,日志的价值被重新重视起来.在分析这些日志之前,需要将分散在各个生产系统中的日志收集起来.本节介绍广泛应用的Flume日志收集系统. 一.概述 Flume是Cloudera公司的一款高性能.高可用的分布式日志收集系统,现在已经是Apache的顶级项目.同Flume相似的日志收集系统还有F

基于Flume的美团日志收集系统(一)架构和设计

来自:美团技术博客 http://tech.meituan.com/mt-log-system-arch.html 美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流.美团的日志收集系统基于Flume设计和搭建而成. <基于Flume的美团日志收集系统>将分两部分给读者呈现美团日志收集系统的架构设计和实战经验. 第一部分架构和设计,将主要着眼于日志收集系统整体的架构设计,以及为什么要做这样的设计. 第二部分改进和优化,将主要着眼于