Java使用UDP发送数据到InfluxDB

最近在做压测引擎相关的开发,需要将聚合数据发送到InfluxDB保存以便实时分析和控制QPS。

下面介绍对InfluxDB的使用。

什么是InfluxDB

InfluxDB是一款用Go语言编写的开源分布式时序、事件和指标数据库,无需外部依赖。该数据库现在主要用于存储涉及大量的时间戳数据,如DevOps监控数据,APP metrics, loT传感器数据和实时分析数据。

InfluxDB特征:

  • 无结构(无模式):可以是任意数量的列(tags)。
  • 可以设置metric的保存时间。
  • 支持与时间有关的相关函数(如min、max、sum、count、mean、median等),方便统计。
  • 支持存储策略:可以用于数据的删改(influxDB没有提供数据的删除与修改方法)。
  • 支持连续查询:是数据库中自动定时启动的一组语句,和存储策略搭配可以降低InfluxDB的系统占用量。
  • 原生的HTTP支持,内置HTTP API。
  • 支持类似SQL语法。
  • 支持设置数据在集群中的副本数。
  • 支持定期采样数据,写入另外的measurement,方便分粒度存储数据。
  • 自带web管理界面,方便使用(登入方式:http://< InfluxDB-IP >:8083)。
  • 支持Grafana画图展示。

PS:有了InfluxDB+Grafana后,你就可以写一些简单的程序了,可以只负责写后端逻辑部分,数据都可以存入InfluxDB,然后通过Grafana展示出来。

Mac安装InfluxDB

# 安装
brew install influxdb
# 启动
influxd -config /usr/local/etc/influxdb.conf
# 查看influxdb运行配置
influxd config
# 启动客户端
influx -precision rfc3339

InfluxDB开启UDP配置

vim /usr/local/etc/influxdb.conf

开启udp配置,其他为默认值

[[udp]]
  enabled = true

udp配置含义:

[[udp]] – udp配置

    enabled:是否启用该模块,默认值:false。

    bind-address:绑定地址,默认值:”:8089″。

    database:数据库名称,默认值:”udp”。

    retention-policy:存储策略,无默认值。

    batch-size:默认值:5000。

    batch-pending:默认值:10。

    read-buffer:udp读取buffer的大小,0表示使用操作系统提供的值,如果超过操作系统的默认配置则会出错。 该配置的默认值:0。

    batch-timeout:超时时间,默认值:”1s”。

    precision:时间精度,无默认值。

Java发送UDP数据报

我们知道InfluxDB是支持Http的,为什么我们还要采用UDP方式发送数据呢?

基于下列原因:

  1. TCP数据传输慢,UDP数据传输快。
  2. 网络带宽需求较小,而实时性要求高。
  3. InfluxDB和服务器在同机房,发生数据丢包的可能性较小,即使真的发生丢包,对整个请求流量的收集影响也较小。

我们采用了worker线程调用addMetric方法将数据存储到缓存 map 中,send线程池来进行每个指定时间发送数据到Influxdb。

代码如下(也可参考JmeterUdpMetricsSender类):

@Slf4j
public class InfluxDBClient implements Runnable {
    private String measurement = "example";

    private final Object lock = new Object();

    private InetAddress hostAddress;

    private int udpPort;

    private volatile Map<String, List<Response>> metrics = new HashMap<>();

    private long time;

    private String transaction;

    public InfluxDBClient(String influxdbUrl, String transaction) {
        this.transaction = transaction;
        try {
            log.debug("Setting up with url:{}", influxdbUrl);
            String[] urlComponents = influxdbUrl.split(":");
            if (urlComponents.length == 2) {
                hostAddress = InetAddress.getByName(urlComponents[0]);
                udpPort = Integer.parseInt(urlComponents[1]);
            } else {
                throw new IllegalArgumentException("InfluxDBClient url '" + influxdbUrl + "'?is wrong. The format shoule be <host/ip>:<port>");
            }
        } catch (Exception e) {
            throw new IllegalArgumentException("InfluxDBClient url '" + influxdbUrl + "'?is wrong. The format shoule be <host/ip>:<port>", e);
        }
    }

    public void addMetric(Response response) {
        synchronized (lock) {
            if (metrics.containsKey(response.getLabel())) {
                metrics.get(response.getLabel()).add(response);
            } else {
                metrics.put(response.getLabel(), new ArrayList<>(Collections.singletonList(response)));
            }
        }
    }

    @Override
    public void run() {
        sendMetrics();
    }

    private void sendMetrics() {
        Map<String, List<Response>> tempMetrics;
        //复制数据到tempMetrics,清空原来metrics并初始化上次的大小
        synchronized (lock) {
            if (isEmpty(metrics)) {
                return;
            }
            time = System.currentTimeMillis();
            tempMetrics = metrics;
            metrics = new HashMap<>();
            for (Map.Entry<String, List<Response>> entry : tempMetrics.entrySet()) {
                metrics.put(entry.getKey(), new ArrayList<>(entry.getValue().size()));
            }
        }
        final Map<String, List<Response>> copyMetrics = tempMetrics;
        final List<MetricTuple> aggregateMetrics = aggregate(copyMetrics);
        StringBuilder sb = new StringBuilder(aggregateMetrics.size() * 200);
        //发送tempMetrics,生成一行数据,然后换行
        for (MetricTuple metric : aggregateMetrics) {
            sb.append(metric.getMeasurement()).append(metric.getTag()).append(" ")
                    .append(metric.getField()).append(" ").append(metric.getTimestamp() + "000000").append("\n");
        }
        //udp发送数据到Influxdb
        try (DatagramSocket ds = new DatagramSocket()) {
            byte[] buf = sb.toString().getBytes();
            DatagramPacket dp = new DatagramPacket(buf, buf.length, this.hostAddress, this.udpPort);
            ds.send(dp);
            log.debug("send {} to influxdb", sb.toString());
        } catch (SocketException e) {
            log.error("Cannot open udp port!", e);
        } catch (IOException e) {
            log.error("Error in transferring udp package", e);
        }
    }

    /**
     * 得到聚合数据
     *
     * @param metrics
     * @return
     */
    private List<MetricTuple> aggregate(Map<String, List<Response>> metrics) {

    }

    public boolean isEmpty(Map<String, List<Response>> map) {
        for (Map.Entry<String, List<Response>> entry : map.entrySet()) {
            if (!entry.getValue().isEmpty()) {
                return false;
            }
        }
        return true;
    }
}

参考文档

  1. InfluxDB中文文档
  2. 玩转时序数据库InfluxDB

原文地址:https://www.cnblogs.com/morethink/p/9693561.html

时间: 2024-08-27 06:20:27

Java使用UDP发送数据到InfluxDB的相关文章

通过 UDP 发送数据的简单范例

package j2se.core.net.udp; import java.io.IOException;import java.net.DatagramPacket;import java.net.DatagramSocket;import java.net.InetSocketAddress;import java.util.Scanner; /** * 通过 UDP 发送数据的简单范例 */public class Sender { public static void main(Str

使用 log4js UDP 发送数据到 logstash

因为 nodejs 一般会部署在多台机器,并且每台机器会起多个进程,因此查看日志时往往要人工区分一个完整的请求包含哪些行.如果在日志中添加 服务器名称和进程id,就比较容易了. 如果在 filebeat 配置中修改正则表达式肯定是可以完成这个工作的,但今天发现 log4js(1.1.1版本) 的模块 logstashUDP 支持通过 UDP 直接发送数据到 logstash,这就更自由了,这样就不用在每台机器上跑 filebeat. 这里我们假定已经配置好了 ELK.EKL 配置方法参见本博客文

TCP和UDP发送数据包的大小问题

用UDP协议发送时,用sendto函数最大能发送数据的长度为:65535-20-8=65507字节,其中20字节为IP包头长度,8字节为UDP包头长度.用sendto函数发送数据时,如果指的的数据长度大于该值,则函数会返回错误. 用TCP协议发送时,由于TCP是数据流协议,因此不存在包大小的限制(暂不考虑缓冲区的大小),这是指在 用send函数时,数据长度参数不受限制.而实际上,所指定的这段数据并不一定会一次性发送出去,如果这段数据比较长,可能会被分段发送,如果比较短,可能会等待和下一次数据一起

Android(java)学习笔记80:UDP协议发送数据

UDP协议发送数据:我们总是先运行接收端,再运行发送端发送端: 1 package cn.itcast_02; 2 3 import java.io.IOException; 4 import java.net.DatagramPacket; 5 import java.net.DatagramSocket; 6 import java.net.InetAddress; 7 /* 8 * UDP协议发送数据: 9 * A:创建发送端Socket对象 10 * B:创建数据,并把数据打包 11 *

uip UDP 服务器广播模式(客户端可以任意端口,并且主动向客户端发送数据)

目前移植uip,发现UDP 服务器模式下,必须指定本地端口以及客户端端口,否则只能讲客户端端口设置为0,才能接收任意端口的数据,但是无法发送数据,因为此时客户端端口设置为0了,我通过将原始数据包中的客户端端口保存下来,并且在发送的时候将客户端端口替换为指定的端口,发送完成之后又设置为0,这样就实现了向任意客户端端口发送数据. uip.c if(uip_udp_conn->lport != 0 && UDPBUF->destport == uip_udp_conn->lpo

uip UDP server广播模式(client能够随意port,而且主动向client发送数据)

眼下移植uip,发现UDP server模式下,必须指定本地port以及clientport,否则仅仅能讲clientport设置为0,才干接收随意port的数据,可是无法发送数据,由于此时clientport设置为0了,我通过将原始数据包中的clientport保存下来,而且在发送的时候将clientport替换为指定的port,发送完毕之后又设置为0,这样就实现了向随意clientport发送数据. uip.c if(uip_udp_conn->lport != 0 && UDP

UDP发送数据测试

一个合作伙伴说UDP发送数据,A(IP:192.168.1.100 子网掩码255.255.255.0)网段能发数据到B网段,但B(IP:192.168.2.100 子网掩码255.255.255.0)网段不能发数据到A网段,说法是跨路由的情况下,数据只能从下层住上层发,而不能由上层住下层发.我觉得两个网段的地位应该是相等的,即使跨路由的情况下,也应该有路由映射可以让这两个网段相互可以ping通,而只要两个网段可以ping通,就可以用upd发送数据 (当然,我们说的前提都是在一个公司的局域网内)

Java基础知识强化之网络编程笔记03:UDP之UDP协议发送数据 和 接收数据

1. UDP协议发送数据 和 接收数据 UDP协议发送数据: • 创建发送端的Socket对象 • 创建数据,并把数据打包 • 调用Socket对象的发送方法,发送数据包 • 释放资源  UDP协议接收数据:       • 创建接收端的Socket对象      • 创建数据包,接收数据(接收容器)      • 调用Socket对象的接收方法,接收数据包      • 解析数据包,并显示在控制台      • 释放资源 2. 代码实现 (1)首先我们先写发送端的程序,如下: 1 packag

java下udp的DatagramSocket 发送与接收

发送 package cn.stat.p4.ipdemo; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress;