日志实时收集和计算的简单方案

作为互联网公司,网站监测日志当然是数据的最大来源。我们目前的规模也不大,每天的日志量大约1TB。后续90%以上的业务都是需要基于日志来完 成,之前,业务中对实时的要求并不高,最多也就是准实时(延迟半小时以上),因此,我们使用Flume将数据收集到HDFS,然后进行清洗和分析。

后来,根据业务需要,我们有了两个Hadoop集群,并且部署在不同的地方(北京和西安),而所有的日志收集服务器在北京,因此需要将日志数据通过外网传输到西安,于是有了这样的部署:

很快,通过Flume流到西安Hadoop集群的数据就遇到了问题,比原始数据多或者少一些,造成这个问题的主要原因是在网络不稳定的情况下,北京 Flume Agent发送到西安Flume Collector的过程中,会发送失败,或者响应失败。另外,之前的数据准实时也不能满足业务的需求。

为了解决数据实时跨外网传输以及实时业务的问题,于是有了现在的架构:

  1. 引入Kafka,并且和日志收集服务器部署在北京同机房;
  2. 每台日志收集服务器上的Flume Agent,通过内网将数据发送至Kafka;
  3. Kafka的第一个消费者,北京网关机上的Flume,负责从Kafka中消费数据,然后流到北京Hadoop集群;
  4. Kafka的第二个消费者,西安网关机上的Flume,负责从Kafka中消费数据,然后流到西安Hadoop集群;这里是西安的Flume通过 外网连接北京Kafka,主动拉取数据,如果网络不稳定,那么当前批次拉取失败,最多重新拉一次,数据不会进Flume channel,更不会流到HDFS上,因此,这种方式在网络不稳定的情况下,不会造成数据缺失或重复;
  5. Kafka的第三个消费者,北京网关机上的实时计算模块,后面再说;
  6. Kafka的第N个消费者,其他;

Kafka中的数据分区及副本

这种架构下,Kafka成为了统一的日志数据提供者,至关重要。我们目前有4台Broker节点,每个Topic在创建时候都指定了4个分区,副本数为2;

数据在进入Kafka分区的时候,使用了Flume的拦截器,从日志中提取用户ID,然后通过HASH取模,将数据流到Kafka相应的分区中。这 种方式,一方面,完成了简单的负载均衡,另一方面,确保相同的用户数据都处于同一个分区中,为后面实时计算模块的统计提供了极大的便利。

Flume拦截器的使用

在整个流程中,有两个地方用到了同一个Flume拦截器(Regex Extractor Interceptor),就是在Flume Source中从消息中提取数据,并加入到Header,供Sink使用;

  • 一处是在LogServer上部署的Flume Source,它从原始日志中提取出用户ID,然后加入到Header中,Flume Sink(Kafka Sink)再入Kafka之前,从Header中拿出该用户ID,然后通过应用分区规则,将该条消息写入Kafka对应的分区中;
  • 另外一处是部署在西安的Flume Source,它从Kafka中读取消息之后,从消息中抽取出时间字段,并加入到Header中,后面的Flume Sink(HDFS Sink)通过读取Header中时间,根据消息中的时间,将数据写入HDFS相应的目录和文件中。如果在HDFS Sink中仅仅使用当前时间来确定HDFS目录和文件名称,这样会造成一小部分数据没有写入到正确的目录和文件中,比如:日志中8点59分59秒的数据可 能会被写进HDFS上9点的目录和文件中,因为原始数据经过Kafka,通过外网传输到西安的Flume,有个几秒的延时,那是很正常的。

Flume消费者的负载均衡和容错

在北京部署的Flume,使用Kafka Source从Kafka中读取数据流向北京Hadoop集群,西安的也一样,在消费同一Topic的消息时候,我们都是在两台机器上启动了两个 Flume Agent,并且设置的统一消费组(group.id),根据Kafka相同的Topic,一条消息只能被同一消费组内的一个消费者消费,因 此,Kafka中的一条消息,只会被这两个Flume Agent其中的一个消费掉,如果一个Flume Agent挂掉,那么另外一个将会消费所有消息;

这种方式,也是在流向HDFS的消费者端做了负载均衡和容错。

实时计算模块

目前我们实时计算的业务比较简单,就是类似于根据不同维度统计PV和UV。比如:实时统计一个网站当天累计PV、UV、IP数等,目前我们直接开发的JAVA程序,使用streamlib统计这些指标,UV和IP数这种需要去重的指标有2%以内的误差,业务可以接受。

实时计算模块使用Kafka low-level API,针对每一个Topic,都使用和分区数相等的线程去处理,每个线程消费一个分区的数据,由于数据在进入Kafka分区的时候,都是经过相应规则的分区,因此相同用户的数据会在同一个分区中;

另外,每个线程会在Redis中维护自己当前的Offsets,比如:在实时计算当天累计指标的业务场景中,每天0天在Redis中记录当前的 Offsets,这样,如果实时计算程序挂掉,下次启动时候,从Redis中读取当天的Offsets,重新读取和计算当天的所有消息。

由于我们的需求是实时统计当天累计的指标,而且能接受一定的误差,因此采用这种方式。如果需要精确统计累计去重指标,那么可能需要采用其它方式,比如:精确统计当天实时累计用户数,一种简单的办法是在HBase中使用计数器来配合完成。

其它实时数据消费者

如果需要实时统计一小段时间(比如十分钟、一小时)之内的PV、UV等指标,那么可以使用SparkStreaming来完成,比较简单。如果单独使用Spark Streaming来完成一天内海量数据的累计去重统计,我还不太清楚有什么好的解决办法。

另外,实时OLAP也可能作为Kafka的实时消费者应用,比如:Druid。

时间: 2025-01-05 18:27:56

日志实时收集和计算的简单方案的相关文章

从Apache的日志文件收集和提供统计数据(一个Python插件架构的简单实现)

从Apache的日志文件收集和提供统计数据 这一章我们将介绍基于插件程序的架构和实现.作为例子,我们将构建一个分析Apache服务器log文件的框架.这一次我们不再使用单片机的方式来创建,而是改为采用模块化的方式.一旦我们有了一个基本框架,我们就可以为它创建一个插件.这个插件可以基于请求者的地理位置执行分析. 程序的结构和功能 在数据维护和统计收集领域,很难有一个单一的应用程序可以适合多个用户的需求.让我们以分析Apache的web服务器日志文件为例.web服务器接受到的每一个请求都被记录在日志

一个轻客户端,多语言支持,去中心化,自动负载,可扩展的实时数据写服务的实现方案讨论

背景 背景是设计一个实时数据接入的模块,负责接收客户端的实时数据写入(如日志流,点击流),数据支持直接下沉到HBase上(后续提供HBase上的查询),或先持久化到Kafka里,方便后续进行一些计算和处理,再下沉到文件系统或做别的输出. 在设计中,对于客户端和服务端有这么些目标. 客户端需要支持多语言(Java,C++),做得尽量轻量级,只要连上服务端的ip:port,以RPC的形式调用简单的write就可以把数据写出去.客户端不承担任何逻辑的处理,服务端的负载均衡对客户端是透明的. 服务端想要

使用kibana和elasticsearch日志实时绘制图表

前言: 此文接的是上篇,上次的内容是,用python操作elasticsearch存储,实现数据的插入和查询.  估计有些人一看我的标题,以为肯定是 logstash kibana elasticsearch的组合.这三个家伙也确实总是勾搭在一块. 其实logstash是可以被别的替换,比如常见的fluented .剩下的那两个,kibana和elasticsearch是一伙的,不好做分离. 这次用的不是那种开源的.cs模式日志收集应用,还真不是.而是自己直接从程序里面把日志打到elastics

一款全面高效的日志分析工具,操作更简单

一款全面高效的日志分析工具,操作更简单 Eventlog Analyzer是用来分析和审计系统及事件日志的管理软件,能够对全网范围内的主机.服务器.网络设备.数据库以及各种应用服务系统等产生的日志,进行全面收集和细致分析,通过统一的控制台进行实时可视化的呈现.通过定义日志筛选规则和策略,帮助IT管理员从海量日志数据中精确查找关键有用的事件数据,准确定位网络故障并提前识别安全威胁,从而降低系统宕机时间.提升网络性能.保障企业网络安全. 事件日志监控.分析.报表和归档软件监控和报表网络范围内的Win

2019年优选大数据计算平台搭建方案之BR-odp数据安全、管理模块,数道云大数据

[前言]大数据计算平台,使用了Hadoop.Spark.Storm.Flink等这些分布式的实时或者离线计算框架,建立计算集群,并在上面运行各种计算任务. 21世纪的现在,大数据这个名词对我们来说并不陌生,大数据受到了不同行业,不同领域的各界人士的关注,就在今年已经过去的两会中,大数据的发展及使用也成为两会的热门话题. 大数据行业火热的发展,大数据技术将海量数据的价值化来赋予传统行业不一样的发展前景,大数据不仅助力企业的发展,同时也在政府等关于民意收集等等多个领域得到广泛应用,因此,大数据技术在

ELK日志分析收集

日志的作用:系统运维和开发人员可以通过日志了解服务器软硬件,检查配置过程当中发生的错误和错误原因.以便了解服务器的负荷,性能的安全性,从而及时的采取措施纠正错误. 日志的功能:解决系统故障和发现问题的主要手段. 日志主要包括:系统日志,应用程序日志,安全日志. 集中化管理日志.对于日志的统计和检索是一件比较麻烦的事情.一般使用grep ,awk和wc等Linux命令去实现检索和统计.但是对于查询,排序和统计,就显得有些力不存心.而ELK正好能解决上述问题 ELK是开源分布式的搜索的引擎.它是由E

使用Flume进行数据的实时收集处理

在已经成功安装Flume的基础上,本文将总结使用Flume进行数据的实时收集处理,具体步骤如下: 第一步,在$FLUME_HOME/conf目录下,编写Flume的配置文件,命名为flume_first_conf,具体内容如下: #agent1表示代理名称 agent1.sources=source1 agent1.sinks=sink1 agent1.channels=channel1 #Spooling Directory是监控指定文件夹中新文件的变化,一旦新文件出现,就解析该文件内容,然后

开源分布式搜索平台ELK+Redis+Syslog-ng实现日志实时搜索

logstash + elasticsearch + Kibana+Redis+Syslog-ng ElasticSearch是一个基于Lucene构建的开源,分布式,RESTful搜索引擎.设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便.支持通过HTTP使用JSON进行数据索引. logstash是一个应用程序日志.事件的传输.处理.管理和搜索的平台.你可以用它来统一对应用程序日志进行收集管理,提供 Web 接口用于查询和统计.其实logstash是可以被别的替换,比如常见

CentOS7搭建开源分布式搜索平台ELK实现日志实时搜索并展示图表

    一.简介  Elasticsearch是个基于Lucene实现的开源.分布式.restful的全文本搜索引擎,此外他还是一个分布式实时文档存储,其中每个文档的每个filed均是可被索引的数据,且可被搜索,也是一个带实时分析功能的搜索引擎,能够扩展至数以百计的节点实时处理PB级别的数据.它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等.日志主要包括系统日志.应用程序日志和安全日志.系统运维和开发人员可以通过日志了解服务器软硬