HttpServer发送数据到kafka

目录

1、需求

2、框架结构图和步鄹图

3、代码结构

4、代码展现

———————————————————————-

1、需求

1.1、解析路径,将路径的最后一个字符串作为Appkey;

1.2、数据缓存,当Kafka无法正常访问时在本地Cache目录缓存数据;

1.3、安全验证,对请求的appkey进行合法性验证(签名验证待定);

1.4、自动更新appkey列表,每间隔一段时间获取一次最新的appkey列表;

1.5、增加ip字段,给每份数据增加一个ip字段;

1.6、记录日志,记录基本的统计信息日志,以及异常错误信息。

2、框架结构图和步鄹图

3、代码结构

4、代码展现

Configuration.java

package com.donews.data;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

/**
 * Created by yuhui on 16-6-23.
 */
public class Configuration {
   public static  final Config conf= ConfigFactory.load();
}

Counter.java

package com.donews.data;

import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by yuhui on 16-6-22.
 */
public class Counter {
    private Logger LOG = LoggerFactory.getLogger(Counter.class);
    AtomicLong messages = new AtomicLong(0L);
    AtomicLong bytes = new AtomicLong(0L);
    private long start = System.currentTimeMillis();

    private void reset() {
        messages.set(0L);
        bytes.set(0L);
        start = System.currentTimeMillis();
    }

    /***
     * 标记时间的方法
     二月 14, 2017 3:49:53 下午 com.donews.data.Counter
     信息: start Counter
     二月 14, 2017 3:49:54 下午 com.donews.data.Counter
     信息: start Counter
     二月 14, 2017 3:49:55 下午 com.donews.data.Counter
     信息: start Counter
     二月 14, 2017 3:49:56 下午 com.donews.data.Counter
     信息: start Counter
     * @param vertx
     */
    public void start(Vertx vertx) {
        LOG.info("start Counter");
        long delay = Configuration.conf.getDuration("server.counter.delay", TimeUnit.MILLISECONDS);
        vertx.setPeriodic(delay, h -> {
            long time = System.currentTimeMillis() - start;
            double rps = messages.get() * 1000.0 / time;
            double mbps = (bytes.get() * 1000.0 / 1024.0 / 1024.0) / time;
            Runtime runtime = Runtime.getRuntime();
            double totalMem = runtime.totalMemory() * 1.0 / 1024 / 1024;
            double maxMem = runtime.maxMemory() * 1.0 / 1024 / 1024;
            double freeMem = runtime.freeMemory() * 1.0 / 1024 / 1024;
            LOG.info("{0}:Message/S, {1}:MBytes/S", rps, mbps);
            LOG.info("totalMem:{0}MB maxMem:{1}MB freeMem:{2}MB", totalMem, maxMem, freeMem);
            reset();
        });
    }

}

KafkaHttpServer.java

package com.donews.data;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;

import java.io.*;
import java.sql.*;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

public class KafkaHttpServer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaHttpServer.class);
    private final Counter statistic = new Counter();
    private static final String DBDRIVER = "com.mysql.jdbc.Driver";
    private static final String URL = Configuration.conf.getString("mysql.url");
    private static final String USER = Configuration.conf.getString("mysql.user");
    private static final String PASSWORD = Configuration.conf.getString("mysql.password");
    private static HashSet<String> appkeys = new HashSet<>();
    private static boolean deleteFile = true;

    private void error(HttpServerResponse response, String message) {
        response.setStatusCode(500).end(new JsonObject()
                .put("code", 3)
                .put("msg", message)
                .encode());
    }

    private void ok(HttpServerResponse response, String message) {
        response.putHeader("Access-Control-Allow-Origin", "*");
        response.setStatusCode(200).end(new JsonObject()
                .put("code", 0)
                .put("msg", message)
                .encode());
    }

    private void startService(int port) {
        KafkaProducerWrapper sender = new KafkaProducerWrapper();
        Vertx vertx = Vertx.vertx();
        HttpServer server = vertx.createHttpServer();
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        //post请求:http://192.168.1.10:10002/mininfo/logs
        //router.route 这里是路由 ,/mininfo/logs类似于路由房间
        router.route("/mininfo/logs").handler(ctx -> {
            try {
                JsonArray array = ctx.getBodyAsJsonArray();
                String[] messages = new String[array.size()];
                for (int i = 0; i < array.size(); i++) {
                    JsonObject message = array.getJsonObject(i);
                    message.put("ip", ctx.request().remoteAddress().host());
                    if (!message.containsKey("timestamp")) {
                        message.put("timestamp", Instant.now().toString());
                    }
                    messages[i] = array.getJsonObject(i).encode();
                }
                sendMessages(sender, ctx, "appstatistic_production", messages);
            } catch (Exception e) {
                error(ctx.response(), e.getMessage());
            }
        });
        router.routeWithRegex("/mininfo/v1/logs/[^/]+").handler(routingContext -> {
            String path = routingContext.request().path();
            String topic = path.substring(path.lastIndexOf("/") + 1);
            LOG.info("现在处理的topic(appkey)为:" + topic);
            if (appkeys.contains(topic)) {
                LOG.info("经过验证,该topic(appkey)有效");
                String[] messages = routingContext.getBodyAsString().split("\n");
                //用于执行阻塞任务(有序执行和无序执行),默认顺序执行提交的阻塞任务
                vertx.executeBlocking(future -> {
                    sendMessages(sender, routingContext, topic, messages);
                    future.complete();
                }, result -> {
                });
            } else {
                LOG.info("您的topic(appkey)还没有配置,请在mysql中配置先");
                error(routingContext.response(), "please configurate " + topic + "(appkey) in Mysql first! After 10mins it`ll take action");
            }
        });
        router.route("/mininfo/v1/ip").handler(ctx -> {
            LOG.info("x-real-for" + ctx.request().getHeader("x-real-for"));
            LOG.info("x-forwarded-for" + ctx.request().getHeader("x-forwarded-for"));
            ok(ctx.response(), ctx.request().getHeader("x-forwarded-for"));
        });
        router.route("/*").handler(ctx -> error(ctx.response(), "wrong! check your path..."));
        server.requestHandler(router::accept).listen(port, result -> {
            if (result.succeeded()) {
                LOG.info("listen on port:{0}", String.valueOf(port));
                this.statistic.start(vertx);
            } else {
                LOG.error(result.cause());
                vertx.close();
            }
        });
        //如果你需要在你的程序关闭前采取什么措施,那么关闭钩子(shutdown hook)是很有用的,类似finally
        Runtime.getRuntime().addShutdownHook(new Thread(sender::close));
    }

    private void sendMessages(KafkaProducerWrapper sender, RoutingContext ctx, String topic, String[] messages) {
        AtomicInteger counter = new AtomicInteger(0);
        for (String message : messages) {
            if (message == null || "".equals(message)) {
                ok(ctx.response(), "Success");
                continue;
            }
            //将ip增加到数据的ip字段
            JSONObject jsonObject = JSON.parseObject(message);
            if (jsonObject.get("ip") == null) {
                LOG.info("正在增加ip字段");
                String ip;
                String header = ctx.request().getHeader("x-forwarded-for");
                if (!(header == null || header.trim().length() == 0 || header.trim().equals("null"))) {
                    ip = header.split(",")[0];
                } else {
                    ip = ctx.request().remoteAddress().host();
                }
                jsonObject.put("ip", ip);
                LOG.info("ip增加成功");
            }
            //topic, message, callback,以匿名函数的形式实现接口中的onCompletion函数
            sender.send(topic, jsonObject.toString(), (metadata, exception) -> {
                if (exception != null) {
                    LOG.warn(exception);
                    String msg = new JsonObject()
                            .put("error", exception.getMessage())
                            .put("commit", counter.get())
                            .encode();
                    error(ctx.response(), msg);
                    cacheLocal(jsonObject.toString(), "/home/lihui/httpkafka/data_bak/" + topic + ".txt");
                    LOG.info("连接kafka失败,写入cache缓存目录以备份数据");
                } else {
                    statistic.messages.incrementAndGet();  // Counter
                    statistic.bytes.addAndGet(message.length());
                    if (counter.incrementAndGet() == messages.length) {
                        ok(ctx.response(), "Success");
                    }
                }
            });
        }
    }

    /**
     * 将发送到kafka失败的消息缓存到本地
     *
     * @param message   message
     * @param cachePath cachePath
     */
    private void cacheLocal(String message, String cachePath) {
        try {
            FileWriter fileWriter = new FileWriter(cachePath, true);
            BufferedWriter bw = new BufferedWriter(fileWriter);
            bw.write(message);
            bw.newLine();
            bw.flush();
            bw.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送缓存数据到kafka,发送成功,删除缓存数据,失败过10分钟重试
     *
     * @param path 保存缓存数据的[目录]
     */
    private static void sendToKafka(String path) {
        String message;
        KafkaProducerWrapper sender = new KafkaProducerWrapper();
        File file = new File(path);
        if (file.isDirectory()) {
            String[] fileList = file.list();
            if (fileList != null && fileList.length != 0) {
                LOG.info("正在将缓存目录中的备份数据发送到kafka中...");
                for (String str : fileList) {
                    String topic = str.split("\\.")[0];
                    try {
                        BufferedReader reader = new BufferedReader(new FileReader(path + str));
                        while ((message = reader.readLine()) != null) {
                            sender.send(topic, message, (metadata, exception) -> {
                                if (metadata != null) {
                                    LOG.info("缓存的备份数据正在一条一条的插入kafka中");
                                } else {
                                    //程序错误重新运行
//                                    exception.printStackTrace();
                                    LOG.error("kafka连接异常为:===> 10分钟后会自动重试," + exception.getMessage(), exception);
                                    deleteFile = false;
                                }
                            });
                        }
                        if (deleteFile) {
                            LOG.info("开始删除已经插入到kafka中的缓存备份数据");
                            deleteFile(path, topic);
                            LOG.info("删除完毕!");
                        }
                        reader.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            } else {
                LOG.info("缓存目录中没有备份文件");
            }
        }
    }

    private static void deleteFile(String path, String appkey) {
        String appkeyPath = path + "/" + appkey + ".txt";
        File file = new File(appkeyPath);
        file.delete();
        LOG.info("成功删除appkey为" + appkey + "的缓存数据");
    }

    private static Set<String> getAppkeys() {
        Set<String> appkeys = new HashSet<>();
        String sql = "select appkey from service_config_yarn_properties_table";
        try {
            Class.forName(DBDRIVER);
            Connection conn = DriverManager.getConnection(URL, USER, PASSWORD);
            PreparedStatement ps = conn.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            while (rs.next()) {
                appkeys.add(rs.getString(1));
            }
            rs.close();
            conn.close();
        } catch (ClassNotFoundException | SQLException e) {
            e.printStackTrace();
        }
        return appkeys;
    }

    public static void main(String[] args) throws Exception {
        Timer timer = new Timer();
        //1、10十分钟检查cache目录是否有数据,2、同步数据库的APPKEY,做安全验证
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                appkeys.addAll(getAppkeys());
                LOG.info("同步完数据库中的appkey(每隔十分钟)");
                sendToKafka("/home/lihui/httpkafka/data_bak/");
//              sendToKafka("C:\\Dell\\UpdatePackage\\log");
            }
        }, 0L, 10 * 60 * 1000L);

        //主线程
        try {
            int port = Configuration.conf.getInt("server.port");
            KafkaHttpServer front = new KafkaHttpServer();
            front.startService(port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

KafkaProducerWrapper.java

package com.donews.data;

import com.typesafe.config.Config;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by yuhui on 16-6-22.
 *
 * kafka的生产,通过send方法()
 */
public class KafkaProducerWrapper {
    private Logger LOG = LoggerFactory.getLogger(KafkaProducerWrapper.class);
    private KafkaProducer<String, String> producer = init();

    private KafkaProducer<String, String> init() {
        Config conf = Configuration.conf.getConfig("kafka");
        Properties props = new Properties();
        props.put("bootstrap.servers", conf.getString("bootstrap.servers"));
        props.put("acks", conf.getString("acks"));
        props.put("retries", conf.getInt("retries"));
        props.put("batch.size", conf.getInt("batch.size"));
        props.put("linger.ms", conf.getInt("linger.ms"));
        props.put("buffer.memory", conf.getLong("buffer.memory"));
        props.put("key.serializer", conf.getString("key.serializer"));
        props.put("value.serializer", conf.getString("value.serializer"));
        LOG.info("KafkaProducer Properties: {0}", props.toString());
        return new KafkaProducer<>(props);
    }

    public void send(String topic, String message, Callback callback) {
        producer.send(new ProducerRecord<>(topic, message), callback);
    }

    public void close() {
        producer.close();
        LOG.info("Kafka Producer Closed");
    }

    public static void main(String[] args) {
        //KafkaProducerWrapper sender=new KafkaProducerWrapper();
        //sender.producer.partitionsFor("xxxxx").forEach(System.out::println);
    }
}

application.conf

server {
  port = 20000
  counter.delay = 30s
}
kafka {
  bootstrap.servers = "XXX"
  acks = all
  retries = 1
  batch.size = 1048576
  linger.ms = 1
  buffer.memory = 33554432
  key.serializer = "org.apache.kafka.common.serialization.StringSerializer"
  value.serializer = "org.apache.kafka.common.serialization.StringSerializer"
}
mysql {
  url = "jdbc:mysql://XXX/user_privileges"
  user = "XXX"
  password = "XXX"
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.donews.data</groupId>
    <artifactId>kafkahttp</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-web</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>6.0.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
时间: 2024-10-10 02:45:28

HttpServer发送数据到kafka的相关文章

kafka + spark Streaming + Tranquility Server发送数据到druid

花了很长时间尝试druid官网上说的Tranquility嵌入代码进行实时发送数据到druid,结果失败了,各种各样的原因造成了失败,现在还没有找到原因,在IDEA中可以跑起,放到线上就死活不行,有成功了的同仁希望贴个链接供我来学习学习:后来又尝试了从kafka实时发送到druid,还是有些错误,感觉不太靠谱:最后没办法呀,使用Tranquility Server呗 _ _! Tranquility Server的配置和启动请移步:https://github.com/druid-io/tran

kafka 发送数据,如何确定partition源码解释

每一个record 都有一个key 其作用在于 1 作为一个record的元数据 2 用于分区,以便确定record进入到哪个,那么具体代码是如何实现的呢?我们查看下源码 partition 如果所有的message是同一个key,将会被放入同一个partition kafka发送一个消息的流程 1 判断有没有拦截器,如果有拦截器就会执行拦截器的send 2 拿到cluster 3 对key和value序列化 4 计算此消息发送到哪个partition partition计算过程具体如下: 第二

大数据-12-Spark+Kafka构建实时分析Dashboard

0.案例概述 本案例利用Spark+Kafka实时分析男女生每秒购物人数,利用Spark Streaming实时处理用户购物日志,然后利用websocket将数据实时推送给浏览器,最后浏览器将接收到的数据实时展现,案例的整体框架图如下: 下面分析详细分析下上述步骤: 应用程序将购物日志发送给Kafka,topic为"sex",因为这里只是统计购物男女生人数,所以只需要发送购物日志中性别属性即可.这里采用模拟的方式发送购物日志,即读取购物日志数据,每间隔相同的时间发送给Kafka. 接着

log4j2发送消息至Kafka

title: 自定义log4j2发送日志到Kafka 图片描述(最多50字) tags: log4j2,kafka 为了给公司的大数据平台提供各项目组的日志,而又使各项目组在改动上无感知.做了一番调研后才发现log4j2默认有支持将日志发送到kafka的功能,惊喜之下赶紧看了下log4j对其的实现源码!发现默认的实现是同步阻塞的,如果kafka服务一旦挂掉会阻塞正常服务的日志打印,为此本人在参考源码的基础上做了一些修改. log4j日志工作流程 log4j2对于log4j在性能上有着显著的提升,

OGG 从Oracle备库同步数据至kafka

OGG 从Oracle备库同步数据至kafka Table of Contents 1. 目的 2. 环境及规划 3. 安装配置JDK 3.1. 安装jdk 3.2. 配置环境变量 4. 安装Dataguard 4.1. 安装备库软件 4.2. 配置dataguard 4.2.1. 主库 4.2.2. 备库 4.3. 完成操作 4.4. 启动实时复制 5. zookeeper集群 5.1. 上传并解压 5.2. 配置 5.3. 创建myid文件 5.4. 配置环境变量 5.5. 启动和查看服务

FusionInsight大数据开发---Kafka应用开发

Kafka应用开发 了解Kafka应用开发适用场景 熟悉Kafka应用开发流程 熟悉并使用Kafka常用API 进行Kafka应用开发 Kafka的定义Kafka是一个高吞吐.分布式.基于发布订阅的消息系统Kafka有如下几个特点: 高吞吐量 消息持久化到磁盘 分布式系统易扩展 容错性好 Kafka的适用场景 适用于离线和在线的消息消费 已对接组件 Streaming.Spark.Flume 使用Kafka的好处 解耦--使得消息生产.消费系统能够独立变更 可靠--有效解决单点故障引发系统不可用

.net推送数据之Kafka

.NET VS工具添加程序包源 在NuGet包管理中选择程序包源为上面添加的私有仓库. 搜索Data.Pipelines并安装. 在app.congif或者web.config中添加Kafka配置 <appSettings> <add key="kafka.ip" value="172.20.105.205"/> <add key="kafka.prot" value="9092"/> &l

linux串口驱动分析——发送数据

一.应用程序中write函数到底层驱动历程 和前文提到的一样,首先先注册串口,使用uart_register_driver函数,依次分别为tty_register_driver,cdev_init函数,找到使用的file_operations,即应用程序与tty架构的统一接口.步骤不再赘述. static const struct file_operations tty_fops = { .llseek = no_llseek, .read = tty_read, .write = tty_wr

WebService(二)发送数据+接收数据并进行处理操作

(一)使用WebService发送数据 1.定义webService接口 import java.util.List; import javax.jws.WebParam; import javax.jws.WebService; import com.mxz.fvp.dto.ADto; @WebService public interface MxzReceiveService { public boolean addExpressBarRecord(@WebParam(name = "rec