使用 log4js UDP 发送数据到 logstash

因为 nodejs 一般会部署在多台机器,并且每台机器会起多个进程,因此查看日志时往往要人工区分一个完整的请求包含哪些行。如果在日志中添加 服务器名称和进程id,就比较容易了。

如果在 filebeat 配置中修改正则表达式肯定是可以完成这个工作的,但今天发现 log4js(1.1.1版本) 的模块 logstashUDP 支持通过 UDP 直接发送数据到 logstash,这就更自由了,这样就不用在每台机器上跑 filebeat。

这里我们假定已经配置好了 ELK。EKL 配置方法参见本博客文章 http://www.cnblogs.com/jasonxuli/p/6397244.html

根据 logstashUDP 的代码,以 log4js.getLogger().info("message", arg1) 为例,第二个参数 arg1 会被作为 kv 结构,与 appender 配置中的 fields 对象合并,然后发送给 logstash。

也就是说,如果 arg1 = {"customField1": "", "customField2": ""},那么最终在 elasticsearch 中,数据大体结构为: {"fields.customField1": "", "fields.customField2": ""}。

appender 配置:


var logLayout = {                type:"pattern",                    pattern: "%h %x{pid} [%d] [%p] %c > %m",                    tokens: {                    pid: function () {                        return process.pid                    }                }            };
appenders     : [
        {
            type: "console",
            layout  : logLayout
        },
        {
            type    : "logstashUDP",  // 定义默认的 appender
            level   : "ALL",
            layout  : logLayout, // 自定义 layout
            host    : "192.168.1.123",
            port    : "5050",
            fields  : {  // fields 会和 第二个参数 arg1 合并,然后发送给 logstash;并且这个 fields 会作为 config.fields 用于之后每次日志事件
                machine : os.hostname(), // 指定机器名
                processId: process.pid   // 指定线程ID
            }
        }   ],
    replaceConsole: true
}

如果想要动态指定某个字段的值,需要包装一下 logger 的各个方法:

exports.logger = function getLogger (moduleName) {
    var l = log4js.getLogger(moduleName);

    l.trace = wrapper(l.trace);
    l.debug = wrapper(l.debug);
    l.info  = wrapper(l.info);
    l.warn  = wrapper(l.warn);
    l.error = wrapper(l.error);
    l.fatal = wrapper(l.fatal);

    return l;
};

function wrapper(func){
    return function(message){
        var data = {log: Array.prototype.join.call(arguments, ‘, ‘)}; // 拼接原始log参数;logger.info("attr1", "attr2"), 那么不会把 "attr2" 当做 data 发给 logstash
        return func.call(this, data.log, data);  // 这里默认传递 data 参数给 logstash,不需要在每次 info() 时手动传递第二个参数
    }
}

logstash 的配置, /etc/logstash/conf.d/first-pipeline.conf

input {
    beats {
        port => "5043"
    }
    udp {
        port => "5050"
        codec => json  // 这一行一定要指定
    }
}
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
    if [fields][category] != "memory" {  // 因为 logstashUDP 模块会将第二个参数合并到 config.fields 中, [][] 的结构用于指定二级字段 fields.category
        elasticsearch {
            hosts => [ "192.168.20.50:9200" ]
            index => "vrslog-%{+YYYY.MM.dd}"
        }
    }
    if [fields][category] == "memory" {  // 为 memory 日志生成单独的 elasticsearch 索引
        elasticsearch {
            hosts => [ "192.168.20.50:9200" ]
            index => "memory-%{+YYYY.MM.dd}"
        }
    }
}
时间: 2024-10-26 18:26:23

使用 log4js UDP 发送数据到 logstash的相关文章

通过 UDP 发送数据的简单范例

package j2se.core.net.udp; import java.io.IOException;import java.net.DatagramPacket;import java.net.DatagramSocket;import java.net.InetSocketAddress;import java.util.Scanner; /** * 通过 UDP 发送数据的简单范例 */public class Sender { public static void main(Str

Java使用UDP发送数据到InfluxDB

最近在做压测引擎相关的开发,需要将聚合数据发送到InfluxDB保存以便实时分析和控制QPS. 下面介绍对InfluxDB的使用. 什么是InfluxDB InfluxDB是一款用Go语言编写的开源分布式时序.事件和指标数据库,无需外部依赖.该数据库现在主要用于存储涉及大量的时间戳数据,如DevOps监控数据,APP metrics, loT传感器数据和实时分析数据. InfluxDB特征: 无结构(无模式):可以是任意数量的列(tags). 可以设置metric的保存时间. 支持与时间有关的相

TCP和UDP发送数据包的大小问题

用UDP协议发送时,用sendto函数最大能发送数据的长度为:65535-20-8=65507字节,其中20字节为IP包头长度,8字节为UDP包头长度.用sendto函数发送数据时,如果指的的数据长度大于该值,则函数会返回错误. 用TCP协议发送时,由于TCP是数据流协议,因此不存在包大小的限制(暂不考虑缓冲区的大小),这是指在 用send函数时,数据长度参数不受限制.而实际上,所指定的这段数据并不一定会一次性发送出去,如果这段数据比较长,可能会被分段发送,如果比较短,可能会等待和下一次数据一起

uip UDP 服务器广播模式(客户端可以任意端口,并且主动向客户端发送数据)

目前移植uip,发现UDP 服务器模式下,必须指定本地端口以及客户端端口,否则只能讲客户端端口设置为0,才能接收任意端口的数据,但是无法发送数据,因为此时客户端端口设置为0了,我通过将原始数据包中的客户端端口保存下来,并且在发送的时候将客户端端口替换为指定的端口,发送完成之后又设置为0,这样就实现了向任意客户端端口发送数据. uip.c if(uip_udp_conn->lport != 0 && UDPBUF->destport == uip_udp_conn->lpo

Android(java)学习笔记80:UDP协议发送数据

UDP协议发送数据:我们总是先运行接收端,再运行发送端发送端: 1 package cn.itcast_02; 2 3 import java.io.IOException; 4 import java.net.DatagramPacket; 5 import java.net.DatagramSocket; 6 import java.net.InetAddress; 7 /* 8 * UDP协议发送数据: 9 * A:创建发送端Socket对象 10 * B:创建数据,并把数据打包 11 *

uip UDP server广播模式(client能够随意port,而且主动向client发送数据)

眼下移植uip,发现UDP server模式下,必须指定本地port以及clientport,否则仅仅能讲clientport设置为0,才干接收随意port的数据,可是无法发送数据,由于此时clientport设置为0了,我通过将原始数据包中的clientport保存下来,而且在发送的时候将clientport替换为指定的port,发送完毕之后又设置为0,这样就实现了向随意clientport发送数据. uip.c if(uip_udp_conn->lport != 0 && UDP

UDP发送数据测试

一个合作伙伴说UDP发送数据,A(IP:192.168.1.100 子网掩码255.255.255.0)网段能发数据到B网段,但B(IP:192.168.2.100 子网掩码255.255.255.0)网段不能发数据到A网段,说法是跨路由的情况下,数据只能从下层住上层发,而不能由上层住下层发.我觉得两个网段的地位应该是相等的,即使跨路由的情况下,也应该有路由映射可以让这两个网段相互可以ping通,而只要两个网段可以ping通,就可以用upd发送数据 (当然,我们说的前提都是在一个公司的局域网内)

Java基础知识强化之网络编程笔记03:UDP之UDP协议发送数据 和 接收数据

1. UDP协议发送数据 和 接收数据 UDP协议发送数据: • 创建发送端的Socket对象 • 创建数据,并把数据打包 • 调用Socket对象的发送方法,发送数据包 • 释放资源  UDP协议接收数据:       • 创建接收端的Socket对象      • 创建数据包,接收数据(接收容器)      • 调用Socket对象的接收方法,接收数据包      • 解析数据包,并显示在控制台      • 释放资源 2. 代码实现 (1)首先我们先写发送端的程序,如下: 1 packag

UDP 发送与接收数据

//UDP 发送端 package liu.net.udp; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; public class Send2 { public