kafka + spark Streaming + Tranquility Server发送数据到druid

  花了很长时间尝试druid官网上说的Tranquility嵌入代码进行实时发送数据到druid,结果失败了,各种各样的原因造成了失败,现在还没有找到原因,在IDEA中可以跑起,放到线上就死活不行,有成功了的同仁希望贴个链接供我来学习学习;后来又尝试了从kafka实时发送到druid,还是有些错误,感觉不太靠谱;最后没办法呀,使用Tranquility Server呗 _ _!

Tranquility Server的配置和启动请移步:https://github.com/druid-io/tranquility/blob/master/docs/server.md

(一)、在启动了自己定制的server之后可以利用druid bin目录下的generate-example-metrics生成测试数据 (定制的server.json如下)

server.json的配置

{
  "dataSources" : {
    "zcx_metrics" : {
      "spec" : {
        "dataSchema" : {
          "dataSource" : "reynold",
          "parser" : {
            "type" : "string",
            "parseSpec" : {
              "timestampSpec" : {
                "column" : "timestamp",
                "format" : "auto"
              },
              "dimensionsSpec" : {
                "dimensions" : [],
                "dimensionExclusions" : [
                  "timestamp",
                  "value"
                ]
              },
              "format" : "json"
            }
          },
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "hour",
            "queryGranularity" : "none"
          },
          "metricsSpec" : [
            {
              "type" : "count",
              "name" : "count"
            },
            {
              "name" : "value_sum",
              "type" : "doubleSum",
              "fieldName" : "value"
            },
            {
              "fieldName" : "value",
              "name" : "value_min",
              "type" : "doubleMin"
            },
            {
              "type" : "doubleMax",
              "name" : "value_max",
              "fieldName" : "value"
            }
          ]
        },
        "ioConfig" : {
          "type" : "realtime"
        },
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",
          "intermediatePersistPeriod" : "PT10M",
          "windowPeriod" : "PT10M"
        }
      },
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1"
      }
    }
  },
  "properties" : {
    "zookeeper.connect" : "tagtic-master:2181,tagtic-slave02:2181,tagtic-slave03:2181",
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "http.port" : "8200",
    "http.threads" : "16"
  }
}

(二)、创建kafka的topic并往里面发送数据

删除topic:kafka-topics  --delete --topic reynold --zookeeper localhost:2181
创建topic:kafka-topics  --create --topic reynold --zookeeper localhost:2181 --partitions 10 --replication-factor 1
消费数据:kafka-console-consumer --topic reynold --zookeeper localhost:2181 --from-beginning
生产数据:kafka-console-producer --broker-list tagtic-master:9092 --topic reynold

{"count": 1, "value_min": 74.0, "timestamp": "2017-03-09T02:34:24.000Z", "value_max": 74.0, "metricType": "request/latency", "server": "www5.example.com", "http_method": "GET", "value_sum": 74.0, "http_code": "200", "unit": "milliseconds", "page": "/"}
{"count": 1, "value_min": 75.0, "timestamp": "2017-03-09T02:34:24.000Z", "value_max": 75.0, "metricType": "request/latency", "server": "www5.example.com", "http_method": "GET", "value_sum": 75.0, "http_code": "200", "unit": "milliseconds", "page": "/list"}
{"count": 1, "value_min": 143.0, "timestamp": "2017-03-09T02:38:06.000Z", "value_max": 143.0, "metricType": "request/latency", "server": "www2.example.com", "http_method": "GET", "value_sum": 143.0, "http_code": "200", "unit": "milliseconds", "page": "/"}

(三)、使用spark streaming消费kafka中的数据并通过http发送到druid

object SparkDruid {

  val kafkaParam = Map[String, String](
    "metadata.broker.list" -> "tagtic-master:9092,tagtic-slave01:9092,tagtic-slave02:9092,tagtic-slave03:9092",
    "auto.offset.reset" -> "smallest"
  )

  def main(args: Array[String]): Unit = {
    val sparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("SparkDruidBeam"))
    val ssc = new StreamingContext(sparkContext, Seconds(3))
    val topic: String = "reynold" //消费的 topic 名字
    val topics: Set[String] = Set(topic) //创建 stream 时使用的 topic 名字集合

    var kafkaStream: InputDStream[(String, String)] = null

    kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics)

    kafkaStream.map(msg => msg._2).foreachRDD { rdd =>
      rdd.foreach(strJson => Https.post("http://tagtic-master:8200/v1/post/zcx_metrics", strJson))
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

Https类如下:

时间: 2024-12-28 13:04:18

kafka + spark Streaming + Tranquility Server发送数据到druid的相关文章

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(八)安装zookeeper-3.4.12

如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 如何安装spark2.2.1请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)kafka+spark streaming打包好的程序提交时提示虚拟内存不足(Container is running beyond virtual memory limits. Current usage: 119.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 G)

异常问题:Container is running beyond virtual memory limits. Current usage: 119.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container. spark-submit提交脚本: [[email protected] work]$ more submit.sh #! /bin/bash jars=""

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1

如何配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 安装spark的服务器: 192.168.0.120 master 192.168.0.121 slave1 192.168.0.122 slave

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 如何安装spark2.2.1请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。

Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.>配置好虚拟机,正在使用中,让它强制断电后,启动起来发现ip无法访问,而且重启网络失败: 执行:systemctl restart network.service 出现异常:Failed to start LSB: Br

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120 master 192.168.0.121 slave1 192.168.0.122 slave2 具体请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 2. 安装zookeeper分布式集群具体请参考<Kafka:ZK+Kafka+Spark Streaming集群

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十一)定制一个arvo格式文件发送到kafka的topic,通过sparkstreaming读取kafka的数据

定制avro schema: { "type": "record", "name": "userlog", "fields": [ {"name": "ip","type": "string"}, {"name": "identity","type":"str

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。

需求: 目前kafka的topic上有一批数据,这些数据被分配到9个不同的partition中(就是发布时key:{m1,m2,m3,m4...m9},value:{records items}),mx(m1,m2...m9)这些数据的唯一键值:int_id+start_time,其中int_id和start_time是topic record中的记录.这9组数据按照唯一键值可以拼接(m1.primarykey1,m2.primarykey1,m3.primarykey1.....m9.prim