windows下kafka+ELK的日志系统

用到的软件:zookeeper、kafka、logstash(6.3.2版本)、ES(6.3.2版本)、Kibana(6.3.2版本)。具体安装步骤不在此说明,基本都是下载解压,改一下配置文件,即可使用。(以下所述均在Windows下)
1、zookeeper:
kafka中自带zookeeper,可以不用装zookeeper,如果想自己另装,需配置环境变量,如下:
ZOOKEEPER_HOME => D:\nomalAPP\zookeeper-3.4.13
path 里面加入 %ZOOKEEPER_HOME%\bin
如果配置好以后,在cmd里运行zkserver报找不到java错误的话,可能是java环境变量的位置放置有问题,可以将path里面配置的java环境位置移到最前面。</b>

2、kafka:
kafka如果启动报找不到java的错误,原因在于kafka-run-class.bat的179行,找到里面的 %CLASSPATH% ,将其用双引号括起来。即改为:set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %* </b>
kafka的配置文件server.properties里面要修改的:
日志所放置的目录(可以选择用默认的):log.dirs=D:/nomalAPP/kafka_2.12-2.0.0/kafka-logs,
连接zookeeper的ip和端口:zookeeper.connect=localhost:2181
其他的均可用默认配置。</b>
启动命令:cmd锁定安装目录,然后 .\bin\windows\kafka-server-start.bat .\config\server.properties
</b>
3、logstash:
在config目录下,logstash.yml里面,如果想启动多个,在里面可以配置 http.port=9600-9700,如果没有配置这一项,会使用默认的9600端口。
在config目录下,增加一个logstash.conf的配置文件,用来配置数据源,和数据过滤,数据输出的位置,如下:

input {
            #   ==>    用kafka的日志作为数据源
   kafka{
     ····#(kafka的IP和端口)
        bootstrap_servers => "10.8.22.15:9092"
        # (在有多个相同类型的数据源时,需要配置)
        client_id => "test1"
        # (消费者分组,可以通过组 ID 去指定,不同的组之间消费是相互不受影响的,相互隔离)
        group_id => "test1"
        auto_offset_reset => "latest"
        # (消费者线程个数)
        consumer_threads => 5
        # (在输出消息的时候会输出自身的信息包括:消费消息的大小, topic 来源以及 consumer 的 group 信息)
        decorate_events => true
        # (主题)
        topics => ["bas-binding-topic"]
        # (用于ES索引)
        type => "bas-binding-topic"
        # (数据格式)
        codec => "json"
        tags => ["bas-binding-topic"]
      }
      kafka{
        bootstrap_servers => "10.8.22.15:9092"
        client_id => "test2"
        group_id => "test2"
        auto_offset_reset => "latest"
        consumer_threads => 5
        #decorate_events => true
        topics => ["bas-cus-request-topic"]
        type => "bas-cus-request-topic"
        codec => "json"
        tags => ["bas-cus-request-topic"]
      }
}

filter {
    filter插件负责过滤解析input读取的数据,可以用grok插件正则解析数据,
    date插件解析日期,json插件解析json等等
}

output {
    # 输出到ES
    if "bas-binding-topic" in [tags] {
       elasticsearch{
             # ES的IP和端口
            hosts => ["localhost:9201"]
            # 索引名称:主题+时间
            index => "bas-binding-topic-%{+YYYY.MM.dd+HH:mm:ss}"
            timeout => 300
        }
        stdout {
            codec => rubydebug
         }
  }
  if "bas-cus-request-topic" in [tags] {
       elasticsearch{
            hosts => ["localhost:9201"]
            index => "bas-cus-request-topic-%{+YYYY.MM.dd+HH:mm:ss}"
            timeout => 300
        }
        stdout {
            codec => rubydebug
         }
  }
}

启动logstash的命令: .\bin\logstash.bat -f .\config\logstash.conf </b>

4、ES:
在config目录下,elasticsearch.yml配置文件中,ES的默认端口为9200,可以通过http.port=9201来修改
启动命令: .\bin\elasticsearch.bat
启动后,在浏览器访问: http://localhost:9201,
如果出现了一些信息,表示启动成功。</b>

5、Kibana:
在config目录下,kibana.yml配置文件中,通过elasticsearch.url: "http://localhost:9201" 来配置地址
启动命令: .\bin\kibana.bat
访问 "http://localhost:9201" ,如果出现需要输入用户名密码的界面,表示ES启动失败,如果直接显示Kibana界面,则启动成功。</b>

logstash数据输出到ES时,会选择ES默认的映射来解析数据。如果觉得默认映射不满足使用条件,可以自定模板:
借助postman工具创建ES模板,如下:

收到此响应表示创建成功。然后可以通过GET请求查询你刚才创建的模板,通过DELETE请求删除刚才创建的模板。</b>

向kafka主题中发送日志信息,发送的信息会在cmd窗口显示,如下:

同时,在Kibana界面会看到相应的索引,索引名称就是在logstash.conf的输出中配置的,如下:

Visualize:可以选择自己想要的索引去进行图形分析,效果如下:

Dashboard:将做过的图整合到仪表盘,效果如下:

另外,可以设置自动刷新,即当有新的数据发送到kafka时,所做的图会自动根据主题来进行刷新。
项目中,可以通过logback来讲日志输出到kafka,具体配置如下:

<appender name="bingdings-log-to-kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender">
      <encoder>
        <pattern>%msg%n</pattern>
      </encoder>
            <!-- 指定kafka主题,消费时要用到-->
      <topic>bas-binding-topic</topic>
      <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
      <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
      <!-- 配置输出到指定ip、端口的kafka -->
      <producerConfig>bootstrap.servers=localhost:9092,10.8.22.13:9092</producerConfig>
      <!-- this is the fallback appender if kafka is not available. -->
    </appender>
    <logger name = "bingdings-log-to-kafka" level="INFO" additivity = "false">
       <appender-ref ref="bingdings-log-to-kafka"/>
    </logger>

然后在记日志的时候,记到对应的主题,日志就会写到kafka相应的位置。可以通过消费者来消费主题,查看日志是否成功写入,如下:

@SuppressWarnings("resource")
  public static void main(String[] args) {

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "127.0.0.1:9092");
    properties.put("group.id", "group-1");
    properties.put("enable.auto.commit", "false");
    properties.put("auto.commit.interval.ms", "1000");
    properties.put("auto.offset.reset", "earliest");
    properties.put("session.timeout.ms", "30000");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    kafkaConsumer.subscribe(Arrays.asList("bas-cus-request-topic", "bas-binding-topic"));
    while (true) {
      ConsumerRecords<String, String> records = kafkaConsumer.poll(Long.MAX_VALUE);
      System.err.println("+++++++++++++++++++++++++++++++++++++++++");
      for (ConsumerRecord<String, String> record : records) {
        System.err.println(record.offset() + ">>>>>>" + record.value());
      }
      System.err.println("+++++++++++++++++++++++++++++++++++++++++++++");
      break;
    }
  }

暂时想到的就这些,有建议的,欢迎评论提醒我进行补充,谢谢!

原文地址:http://blog.51cto.com/12181171/2323533

时间: 2024-10-09 10:12:41

windows下kafka+ELK的日志系统的相关文章

scribe、chukwa、kafka、flume日志系统对比

scribe.chukwa.kafka.flume日志系统对比 1. 背景介绍许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理 这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:(1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦:(2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统:(3) 具有高可扩展性.即:当数据量增加时,可以通过增加节点进行水平扩展. 本文从设计架构,负载均衡,可扩展性和容错性等方面对比了当今开

Linux下一个简单的日志系统的设计及其C代码实现

1.概述 在大型软件系统中,为了监测软件运行状况及排查软件故障,一般都会要求软件程序在运行的过程中产生日志文件.在日志文件中存放程序流程中的一些重要信息, 包括:变量名称及其值.消息结构定义.函数返回值及其执行情况.脚本执行及调用情况等.通过阅读日志文件,我们能够较快地跟踪程序流程,并发现程序问题. 因此,熟练掌握日志系统的编写方法并快速地阅读日志文件,是对一个软件开发工程师的基本要求. 本文详细地介绍了Linux下一个简单的日志系统的设计方法,并给出了其C代码实现.本文为相关开发项目Linux

在Windows下Kafka的基本配置及简单使用

首先下载zookeeper(版本zookeeper-3.4.13)和kafka(版本kafka_2.12-2.0.0),解压就可以使用了. 1.zookeeper:在config目录下,有zoo.conf这个配置文件,可以配置端口和日志存储的位置,可以根据自己习惯更改.首先启动zookeeper,如果环境变量中配置过了zookeeper,则可以直接打开cmd窗口,用zkserver来启动,如下:如果打印的日志都是INFO 或者WARN,并且进入守护线程状态,表示启动成功. 2.Kafka:在co

windows下kafka源码阅读环境搭建

工具准备:jdk1.8,scala-2.11.11,gradle-3.1,zookeeper-3.4.5,kafka-0.10.0.1-src.tgz, kafka_2.11-0.10.0.1.tgz 安装jdk 安装scala 搭建zookeeper kafka源码构建 解压kafka-0.10.0.1-src.tgz,命令行进行kafka-0.10.0.1-src,执行gradle idea 注: 1)gradle idea,用国外的库构建的下载速度很慢,在进入项目里面,找到build.gr

Kafka+ELK完成日志采集处理

此文档为了做一次记录,按回忆粗略补写. 环境信息 Centos    V7.6.1810 JDK  V1.8.0_171 Rsyslog  V8.24.0-34.el7 Kafka  V2.12-0.10.2.1 zookeeper  V3.4.10 ELK V6.2.3 服务器分配 配置尽量高点,此次部署kafka+zookeeper和ES皆为集群模式. 服务器名 IP地址 配置 备注 node1 192.168.101.55 CPU:2C 内存:4G 磁盘:100G   node2 192.

ping通windows下虚拟机上的linux系统

直接ping  linux的ip 直接就失败了. 现在我的windows有两个虚拟网卡 接下来让linux使用VMnet8网卡 修改我的linux系统下的lo网卡的ip地址为VMnet8的ip地址 现在ping就可以了 没有什么路由包的缓慢问题,时间非常短. 版权声明:本文为博主原创文章,未经博主允许不得转载.

windows下启动tomcat,日志乱码问题,日志架构springboot+logback

最近在做架构迁移工作,将原有springmvc项目调整至springboot架构上,迁移完后,发现用springboot以jar包形式启动正常,用tomcat去启动时日志乱码,但是项目部署至Linux环境启动也正常,以下是日志乱码代码: <?xml version="1.0" encoding="UTF-8"?><configuration><include resource="org/springframework/boot

在windows下安装elk

一.下载elasticsearch-5.1.1 cd D:\bigdata\elasticsearch-5.1.1\bin elasticsearch-service.bat cmd 运行 service install,会提示安装成功 service manager 会弹出服务管理界面,可以设置自动启动,并启动之 浏览器访问 127.0.0.1:9200 ,出现成功的json 程序端口为9300 二.安装elasticsearch-head 三.安装logstash-5.1.1 cd到logs

windows下多用户远程链接ubuntu系统

这个问题真的是困扰我很久很久了,花了我接近一周的时间,查阅了几乎网上的所有博客,google百度都搜遍了,大神也请了好几回,一直没有搞定.今天终于自己摸索着完美解决了. 不吹牛的说,本篇博客绝对是中文网上上解决此问题不多的文章之一. 1. 首先ubuntu系统,建议安装英文语言的,我装的是14.04英文语言的. 2. 然后来安装xubuntu桌面(xfce4): $ sudo apt-get install xubuntu-desktop 3. ubuntu系统默认的是gnome桌面,我们需要先