python 日志收集服务器

引因:  python 的日志收集服务是线程安全的(对同一个文件的写入,使用了锁),但是对于多进程的情况,它是无法处理的。python 官方文档推荐的做法是,使用tcp 服务器专门用于日志的收集,以确保对的文件的写入是安全的。这里提供了日志收集服务器基于twisted的实现,可供参考,程序在centos上进行了测试,并可用于生产环境

当client 和server 在一台机器上时,每秒可以处理6000条日志记录

#!/usr/bin/env python
# -*- coding:utf-8 -*-
##################################
# logserver
# 通过socket接收并存储日志信息
# 注:测试程序为test_logserver.py
# 修改时间:2013-11-22
###################################

# standard lib
import os, sys
import time
import twisted
import logging
import traceback
import struct
import cPickle                 # use cPickle for speed
from logging.handlers import TimedRotatingFileHandler
from twisted.internet.protocol import Protocol, Factory

# our own code
import settings
from log_record import initlog

# 根据logger name生成一个
def gen_log_name(name):
    name = name.replace('.', '_')
    return name + '.log'

class ReceiveProtocol(Protocol):

    HEADER_LENGTH = 4
    def __init__(self):
        self.buf = ''
        self.state = 'init'
        self.require_length = ReceiveProtocol.HEADER_LENGTH
        # 记录当前完成初始化的logger名称
        self.logger_dict = {}

    def connectionMade(self):
        logger = logging.getLogger('log_server')
        logger.info('[makeConnection]与logger_client建立socket连接。')

    def dataReceived(self, data):
        logger = logging.getLogger('log_server')
        try:
            if len(self.buf + data) >= self.require_length:
                self.buf = self.buf + data
                # 数据就绪进行相应动作
                data = self.buf[0:self.require_length]
                # 把数据从缓冲区中取走
                self.buf = self.buf[self.require_length:]
                self.solve(data)
                # 可能一次读到了多条日志记录
                if self.buf:
                    self.dataReceived('')
            else:
                self.buf += data

        except BaseException, e:
            logger.error('处理短信记录失败' + str(e) + '\n' + traceback.format_exc())

    def solve(self, data):
        logger = logging.getLogger('log_server')
        statehandler = None
        try:
            pto = 'proto_' + self.state
            statehandler = getattr(self,pto)
        except AttributeError:
            logger.error('callback',self.state,'not found')
            self.transport.loseConnection()

        statehandler(data)
        if self.state == 'done':
            self.transport.loseConnection()

    def connectionLost(self, reason):
        logger = logging.getLogger('log_server')
        logger.info('[connectionLost]与logger_client的socket连接关闭。')

    # 记录日志
    def proto_record(self, data):
        logRecord = logging.makeLogRecord(cPickle.loads(data))
        if logRecord.name not in self.logger_dict:
            logger = initlog(logRecord.name, settings.PATH, gen_log_name(logRecord.name), logLevel=logging.DEBUG)
            self.logger_dict[logRecord.name] = logger

        self.logger_dict[logRecord.name].handle(logRecord)

        # 修改下一步动作以及所需长度
        self.state = 'init'
        self.require_length = ReceiveProtocol.HEADER_LENGTH

    # 处理头部信息
    def proto_init(self, data):
        length = struct.unpack('!I', data[0:ReceiveProtocol.HEADER_LENGTH])[0]

        # 修改下一步动作以及所需长度
        self.state = 'record'
        self.require_length = length

        if len(self.buf) >= self.require_length:
            data = self.buf[0:self.require_length]
            # 把数据从缓冲区中取走
            self.buf = self.buf[self.require_length:]
            self.solve(data)

class ReceiveFactory(Factory):
    def buildProtocol(self, addr):
        return ReceiveProtocol()

def main():
    print 'logserver has started.'
    logger = logging.getLogger('log_server')
    logger.info('logserver has started.')

    from twisted.internet import epollreactor
    epollreactor.install()
    reactor = twisted.internet.reactor
    reactor.listenTCP(settings.PORT, ReceiveFactory())
    reactor.run()

if __name__ == '__main__':
    main()

完成程序:

https://github.com/vearne/log_server/tree/master

参考资料:

https://docs.python.org/2/howto/logging-cookbook.html#logging-cookbook

时间: 2024-08-05 10:49:29

python 日志收集服务器的相关文章

Logback+ELK+SpringMVC搭建日志收集服务器

(转) 1.ELK是什么? ELK是由Elasticsearch.Logstash.Kibana这3个软件的缩写. Elasticsearch是一个分布式搜索分析引擎,稳定.可水平扩展.易于管理是它的主要设计初衷 Logstash是一个灵活的数据收集.加工和传输的管道软件 Kibana是一个数据可视化平台,可以通过将数据转化为酷炫而强大的图像而实现与数据的交互将三者的收集加工,存储分析和可视转化整合在一起就形成了 ELK . 2.ELK流程 ELK的流程应该是这样的:Logback->Logst

elkb+redis建立日志收集分析系统

一.ELKB说明 elastic提供了一套非常高级的工具ELKB来满足以上这几个需求.ELKB指的是用于日志分析或者说数据分析的四个软件,各自拥有独立的功能又可以组合在一起.先来简单介绍一下这四个软件. Elastic Search: 从名称可以看出,Elastic Search 是用来进行搜索的,提供数据以及相应的配置信息(什么字段是什么数据类型,哪些字段可以检索等),然后你就可以自由地使用API搜索你的数据. Logstash:.日志文件基本上都是每行一条,每一条里面有各种信息,这个软件的功

日志收集以及分析:Splunk

写代码的人都知道日志很重要,机器不多的时候,查看日志很简单,ssh 上去 grep + awk + perl 啥的 ad hoc 的搞几把就行,但面对上百台甚至上千台机器时,如何有效的收集和分析日志就成了个很头疼的事情.日志处理必然有如下过程: 从各个服务器读取日志 把日志存放到集中的地方 挖掘日志数据,用友好的 UI 展示出来,最好能做到实时的输入表达式做过滤.聚合 下面分三个方面聊聊,整个过程是需要多方配合的,包括写日志.读日志.转储日志.分析日志,注意聊这些的背景是互联网行业,机器多,日志

轻量级日志收集技术方案

1.     传统架构 1.1. Rsync方式 说明: 在生产环境上部署rsync传输脚本并设置定时,按天或按小时将日志传输到日志收集服务器 1) 优点 对生产服务器和日志收集服务器造成的压力较小 数据较精确,且可以比较方便的重复运行 2) 缺点 不能实时或者方便的得到想要的统计数据 不方便实施分布式 需要对每种日志正价同步脚本和设置定时,维护起来比较麻烦 Flume方式 说明: Flume是一个分布式.可靠.和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方. 采用了分层架构:分别

CentOS6.8下使用rsyslog+ldap部署日志服务器来实现日志收集

在我们的运维工作中,常常会对系统上的日志进行收集,手动管理少量的几台服务器的日志收集没有太大难度,但是企业当中批量的管理成千上万台服务器的时候,这时候想一台台的收集日志未免太浪费时间了,这时候我们需要一个批量管理日志的系统来解决这一难题,今天我给大家带来的使用 1.syslog介绍 日志服务在Centos5上位syslog,随着系统版本的升级之后,日志服务改为rsyslog,rsyslog是syslog的升级版,提供了许多高级的特性.syslog由klogd和syslogd组成,klogd记录的

用fabric部署维护kle日志收集系统

最近搞了一个logstash kafka elasticsearch kibana 整合部署的日志收集系统.部署参考lagstash + elasticsearch + kibana 3 + kafka 日志管理系统部署 02 上线过程中有一些环节,觉得还是值的大家注意的比如: 1,应用运维和研发人员要讨论一下日志格式的定义, 2,在logstash取日志和消费端logstash消费日志麻.过滤日志的时候怎么要高效,避免服务本身告成系统压力过大,如果每天要处理过亿日志量,性能不注意,哈哈,可以使

网站数据统计分析中的日志收集原理及其实现

> 网站数据统计分析工具是网站站长和运营人员经常使用的一种工具,比较常用的有谷歌分析.百度统计 和 腾讯分析等等.所有这些统计分析工具的第一步都是网站访问数据的收集.目前主流的数据收集方式基本都是基于javascript的.本文将简要分析这种数据收集的原理,并一步一步实际搭建一个实际的数据收集系统. 1.数据收集原理分析 简单来说,网站统计分析工具需要收集到用户浏览目标网站的行为(如打开某网页.点击某按钮.将商品加入购物车等)及行为附加数据(如某下单行为产生的订单金额等).早期的网站统计往往只收

一共81个,开源大数据处理工具汇总(下),包括日志收集系统/集群管理/RPC等

作者:大数据女神-诺蓝(微信公号:dashujunvshen).本文是36大数据专稿,转载必须标明来源36大数据. 接上一部分:一共81个,开源大数据处理工具汇总(上),第二部分主要收集整理的内容主要有日志收集系统.消息系统.分布式服务.集群管理.RPC.基础设施.搜索引擎.Iaas和监控管理等大数据开源工具. 日志收集系统 一.Facebook Scribe 贡献者:Facebook 简介:Scribe是Facebook开源的日志收集系统,在Facebook内部已经得到大量的应用.它能够从各种

[转载] 一共81个,开源大数据处理工具汇总(下),包括日志收集系统/集群管理/RPC等

原文: http://www.36dsj.com/archives/25042 接上一部分:一共81个,开源大数据处理工具汇总(上),第二部分主要收集整理的内容主要有日志收集系统.消息系统.分布式服务.集群管理.RPC.基础设施.搜索引擎.Iaas和监控管理等大数据开源工具. 日志收集系统 一.Facebook Scribe 贡献者:Facebook 简介:Scribe是Facebook开源的日志收集系统,在Facebook内部已经得到大量的应用.它能够从各种日志源上收集日志,存储到一个中央存储