大数据-SparkStreaming

SparkStreaming

SparkStreaming是一种微批处理,准实时的流式框架。数据来源包括:Kafka, Flume,TCP sockets,Twitter,ZeroMQ等

  • SparkStreaming与storm的区别:

    • SparkStreaming微批处理数据,storm按条处理数据
    • SparkStreaming支持稍复杂的逻辑
    • SparkStreaming与storm都支持资源动态调整和事务机制

SparkStreaming的处理架构:采用recevier task持续拉取数据,拉取时间间隔为batch Interval,每次来去的数据封装为batch,batch被封装到RDD中,RDD被封装进DStream中。SparkStreaming对DStream进程处理。

数据处理与数据拉取同时进行,数据处理的速度需要与数据拉取量均衡,数据存储方式为memory_only,若数据处理速度慢于拉取速度会产生数据堆积,进而导致OOM。若数据存储方式包含disk,会加大延迟

代码实现

使用TCP sockets实现测试,liunx中命令:nc -lk 9999 实现模拟向9999端口发数据。

  • 数据拉取的间隔时长 + sparkconf/sparkcontext => JavaStreamingContext (stream上下文)
  • 数据源配置 + stream上下文 => JavaStreamingContext(首个DStream)
SparkConf sparkConf = new SparkConf();//配置参数中需要至少2个线程,一条接收数据,一条执行job任务,否则无法打印数据,格式为:主机名[2]sparkConf.setMaster("local[2]").setAppName("s01");JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);?//通过sc上下文和指定间隔获取stream的上下文JavaStreamingContext jsc = new JavaStreamingContext(sparkContext, Durations.seconds(5));//通过conf 和指定间隔获取stream的上下文//JavaStreamingContext jsc2 = new JavaStreamingContext(sparkConf,Durations.seconds(5));?//设置checkpoint路径//jsc.checkpoint("hdfs://node1:9000/spark/checkpoint");jsc.checkpoint("/checkpoint");?//使用socket监听作为数据源,获取DStreamJavaReceiverInputDStream<String> data = jsc.socketTextStream("node1", 9999);?/* 此处写入 DStream 逻辑*/?//使用 输出算子,触发DStream逻辑代码data.print();?//通过JavaStreamingContext触发代码执行jsc.start();?//阻塞线程,不断执行任务try {    streamingContext.awaitTermination();} catch (InterruptedException e) {    e.printStackTrace();}?//结束任务并关闭sparkContext//若不需要关闭sparkContext加参数false//stop之后无法再通过start启动任务jsc.stop();//jsc.stip(false);

算子

DStream能够使用RDD的算子,以下列举DStream的专属算子

对于DStream可以转为RDD的执行的操作,RDD算子内的代码在exector进程执行,RDD外代码在driver进程执行

转换算子

transform

将DStream在算子内部转为RDD运算,最后还是返回DStream。实现对DStream执行任意RDD操作

JavaDStream<String> resultDStream = DStream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {    @Override    public JavaRDD<String> call(JavaRDD<String> value) throws Exception {        //转换算子        JavaRDD<String> map = value.map(x->{return x+"1";});        //触发算子        map.foreach(x-> System.out.println(x));        return map;    }});

updateStateByKey(状态)

  • 实现修改DStream的key的状态值。

    • sparkstream中为每个key维护一个static值,static可以为任意类型
    • 每有一个新的batch数据计算,若数据中的key值对应的static执行更新
  • API
    • 指定checkpoint目录,实现static存储
    • 对于数据拉取间隔小于10s的操作,使用10秒一次的static更新,避免反复写磁盘
    • 数据由Optional对象封装
    //指定checkpoint路径,若加载了hdfs配置则为hdfs中的路径jsc.checkpoint("/checkpoint");JavaReceiverInputDStream<String> DStream = jsc.socketTextStream("node1", 9999);//数据转为KV结构JavaPairDStream<String, Integer> pair = data.mapToPair(new PairFunction<String, String, Integer>() {    @Override    public Tuple2<String, Integer> call(String s) throws Exception {        return new Tuple2(s, 1);    }});//执行updateStateByKeyJavaPairDStream<String, Integer> result = pair.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {    //内部call方法入参:本批次相同key的value集合,key对应static的值;    //出参为新的static的值,为了数据安全使用Optional进行封装    @Override    public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {        Integer value = 0;        //对本次的key的value进行处理        for (Integer i : v1) { value += i; }        //对static非空判断,并逻辑处理        if(v2.isPresent()){ value += v2.get(); }?        return Optional.of(value);    }});result.print();

reduceByKeyAndWindow(窗口)

  • 以若干时间间隔,一次性处理一段时间的数据

    • 滑动间隔:每隔多少时间处理一次数据(取值间隔时间的倍数)
    • 窗口:一次处理多少时间的数据(取值间隔时间的倍数)
//DStream为KV格式的数据JavaPairDStream<String, Integer> result = DStream.reduceByKeyAndWindow(    //对当前DStream中相同key的value进行reduce操作    new Function2<Integer, Integer, Integer>() {        @Override        public Integer call(Integer v1, Integer v2) throws Exception {            return v1 + v2;        }    },    //指定窗口时间长度    Durations.seconds(15),    //指定滑动间隔的时间长度    Durations.seconds(20));

优化

由于窗口函数需要对一段时间的数据进行计算,可能与前后一次的计算存在重复计算

设置checkpoint存储已经计算好DStream数据,在窗口调用数据减去旧的数据,加上新的数据。

//预先设置checkpoint路径
jsc.checkpoint("/checkpoint");
?
//DStream已经处理为kV结构
JavaPairDStream<String, Integer> result = DStream.reduceByKeyAndWindow(
    //以下逻辑用于对当前分区内,DStream内,窗口内,相同的key的value执行的操作
    对当前DStream中相同key的value进行reduce操作
    new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
        //v1为已累计的数据值,v2为新加入的数据值
            return v1 + v2;
        }
    },
    //对前一窗口中相同的key的value执行数据剔除的操作逻辑
    new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
            //v1为窗口中的数据值,v2为需要去除的数据值
            return v1 - v2;
        }
    },
    //指定窗口时间长度
    Durations.seconds(15),
    //指定滑动间隔的时间长度
    Durations.seconds(20));

输出算子

output operator:业务逻辑完成后需要至少一个output operator触发代码执行

foreachRDD

将DStream转为RDD进行算子运算,注意:内部RDD必须使用触发算子,否则代码不执行

//RStream使用foreachRDD转换为RDD,通过RDD执行逻辑DStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {    @Override    public void call(JavaRDD<String> stringJavaRDD) throws Exception {        //RDD转换算子        JavaRDD<String> result1 = stringJavaRDD.map(x->x);        //RDD触发算子        result1.foreach(x->{ System.out.println(x); });    }});

print

DStream.print() 实现对内部的数据的打印

Driver高可用

由于Driver需要不间断获取数据,Driver宕机重启需要借助checkpoint恢复原先的是数据,

  • 启动层面,设置Driver宕机重启

    • standalone模式与Mesos 在启动任务时加入参数: --supervise
    • yarn模式具备自动重启能力
  • 代码层面,设置基于checkpoint恢复
    • 设置checkpoint路径
    • 设置启动方式,使用JavaStreamingContext的getOrCreate方法创建DStream上下文
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("HA");
//指定逻辑恢复的路径
JavaStreamingContext jsc0 = JavaStreamingContext.getOrCreate(
    "/checkpoint",
    new Function0<JavaStreamingContext>() {
        @Override
        public JavaStreamingContext call() throws Exception {
            JavaStreamingContext ssc = new JavaStreamingContext(
                sparkConf, Durations.seconds(5));
            //设置checkpoint路径
            ssc.checkpoint("/checkpoint");
            return ssc;
        }
    });

整合kafka

Spark2.2+kafka0.8

receiver模式

  1. executor进程中的receiver task线程定时从kafka上拉取数据
  2. receiver task将数据备份到各executor进程中,默认持久化级别MEMORY_AND_DISK_SER_2
  3. receiver task将偏移量写入zk,并将备份情况汇报给driver进程
  4. driver根据备份所在节点向executor进程分发task

    若driver宕机重启后从zk读取偏移量,重启receiver task进程。

    使用High Level Consumer API ,由zookeeper维护偏移量

    zk是投票机制更新数据消耗大,不利于大量数据吞吐

并行度修改

问题:对于数据量较大的场景,由于处理速度低于数据拉取速度,存在数据堆积的问题。

方案:修改并行度,增加task的数量,但该措施需控制在机器性能允许的范围内。

原理及实现:job的并行度由spark.streaming.blockInterval参数控制,该参数默认200ms对一批数据切分为若干数据块,每个数据块就是一个分区,分区数也就对应了并行度。该参数可修改的最小值为50ms

数据丢失问题

  • 原因:dirver宕机,任务未执行完毕,但偏移量在zk中已经更新,dirver重启后丢失偏移量之前的任务数据
  • 解决:WAL机制(日志):在偏移量提交zk前hdfs保存一份数据,driver先从hdfs上读取备份数据,再从zk中取偏移量。
    • 问题1:性能低,优化:可将持久化级别修改为MEMORY_AND_DISK_SER
    • 问题2:导致数据重复消费问题
SparkConf conf = new SparkConf().setAppName("ccc").setMaster("local[2]");
//设置启动WAL机制
conf.set("spark.streaming.receiver.writeAheadLog.enable","true");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
//设置checkpoint的路径
jsc.checkpoint("/rec");
?
//通过map设置读取的topic及线程数
HashMap<String, Integer> topics = new HashMap<>();
topics.put("topic1",1);
topics.put("topic2",1);
?
//通过KafkaUtils的createStream方法创建DStream
//参数:上下文,zk,所属消费者组,读取的topic,持久化级别
JavaPairReceiverInputDStream<String,String> lines = KafkaUtils.createStream(
        jsc,
        "node03:2181,node02:2181,node01:2181",
        "ConsumerGroup",
        topics,
        StorageLevel.MEMORY_AND_DISK());

direct模式

  • task直接从kafka的分区上拉取数据,task并行度与kafka的分区数一致
  • 偏移量
    1. 默认使用内存维护偏移量(会有数据丢失),底层使用Simple Consumer API
    2. 可以添加checkpoint来维护偏移量,checkpoint也用于实现dirver宕机恢复
      • direct模式下的checkpoint中存储了代码逻辑+偏移量
      • 存在问题:在HA机制中,无法在运行过程中改变逻辑。若将新逻辑存入checkpoint,则会导致原有偏移量读取异常。同时存在driver宕机恢复时数据重复输出
    3. 可以自定义维护偏移量
      • 使用外部数据库存取更新偏移量,并实现手动提交偏移量,获取偏移量
      • 流程:构建DStream时手动获取并传入原有偏移量,从首个RDD中获取新的偏移量,在数据输出时将新的偏移量存入外部数据库。
      • 存在问题:只能保证转换1次,无法保证输出只有一次,需要通过输出幂等性或事务处理来解决

默认/checkpoint方式自动维护偏移量

//设置上下文
SparkConf conf = new SparkConf().setAppName("direct").setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));

//设置checkpoint路径
jsc.checkpoint("./ck");

//设置kafka各节点路径
HashMap<String, String> parms = new HashMap<>();
parms.put("metadata.broker.list", "node01:9092,node02:9092,node03:9092");
//设置读取的topic
HashSet<String> topics = new HashSet<>();
topics.add("topic1");topics.add("topic2");

/*
通过KafkaUtils创建DStream.参数为:
    上下文,kafka中key类型,value类型,key解码方式,vlaue解码方式,kafka参数,topic
*/
JavaPairInputDStream<String, String> ds = KafkaUtils.createDirectStream(
        jsc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        parms,
        topics
);

自定义维护offsets的代码示例:

//设置上下文与传统方式一致,不再设置checkpoint路径   jsc.checkpoint("./ck");
SparkConf conf = new SparkConf().setAppName("direct").setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
HashMap<String, String> parms = new HashMap<>();
parms.put("metadata.broker.list", "node01:9092,node02:9092,node03:9092");

//读取偏移量数据,本示例中直接指定了偏移量,生产环境中从数据度读取偏移量
//使用Map容器临时存储,key为TopicAndPartition对象,value为偏移量

Map<TopicAndPartition, Long> offsets = new HashMap<>();
TopicAndPartition tp1 = new TopicAndPartition("topic1", 0);
TopicAndPartition tp2 = new TopicAndPartition("topic1", 1);
offsets.put(tp1, 123L);
offsets.put(tp2, 456L);

//创建DStream,参数为:
//    上下文,kafka中key类型,value类型,key解码方式,vlaue解码方式,DStream的数据类型,kafka参数,存储偏移量的map容器,获取消息的value作为DStream的数据
JavaInputDStream<String> ds = KafkaUtils.createDirectStream(
        jsc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        String.class,
        parms,
        offsets,
        new Function<MessageAndMetadata<String, String>, String>() {
            @Override
            public String call(MessageAndMetadata<String, String> v1) throws Exception {
                return v1.message();
            }
        }
);

//设置新的偏移量封装容器
AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference();

//必须在第一个DStream中通过RDD,获取当前的偏移量
JavaDStream<String> ds1 = ds.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
    @Override
    public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
        //通过RDD获取当前偏移量
        OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        //将偏移量存入偏移量封装容器中,返回出去
        offsetRanges.set(offsets);
        return rdd;
    }
});

//通过output算子触发逻辑,在执行完成时,将偏移量提交到数据库
//这一过程中为了避免数据重复消费,输出操作应当是幂等输出,或者使用事务,目的都是保证偏移量提交与输出操作的一致性

ds1.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    @Override
    public void call(JavaRDD<String> RDD) throws Exception {
        //以下触发算子的输出操作和偏移量向外存储,写在一个事务中(建议使用异常机制)

        //触发算子
        RDD.foreach(x -> System.out.println(x));

        //将偏移量封装容器转换为数组,每个元素就是一个OffsetRange对象。
        //OffsetRange对象存储了一个topic分区的偏移量信息,可以对OffsetRange解析后存入数据库
        OffsetRange[] offsets = offsetRanges.get();
        for (OffsetRange x : offsets) {
            //解析
            String topic = x.topic();
            int partition = x.partition();
            long offset = x.untilOffset();
        }
    }
});

Spark2.3+kafka0.10+

  • 只有direct模式
  • 偏移量维护策略:
    1. 使用kafka维护,kafka中使用特殊的topic(_consumer_offsets)实现
    2. 若设置了checkpoint,则偏移量会存在checkpoint中维护。
    3. 使用自定义方式维护偏移量
  • 任务分配策略
    • LocationStrategies.PreferBrokers:如Executor在kafka 集群中的节点上,该executor 读取当前broker节点的数据(一般用这个)
    • LocationStrategies.PreferFixed:节点之间的分区有明显的分布不均,通过一个map 指定将topic分区分布在哪些节点中
  • 缓存策略
    • kafka的数据预先读取到Executor的缓存中再处理,该缓存默认64K,能够加快数据的处理速度
    • spark.streaming.kafka.consumer.cache.maxCapacity 控制缓存大小
    • spark.streaming.kafka.consumer.cache.enabled 控制缓存机制开关

kafka维护偏移量

  • 提交时间

    • 自动提交默认5s提交一次偏移量,参数auto.commit.interval.ms控制
    • 手动提交偏移量,底层api也是异步的
  • 问题
    • 若一天不访问kafka偏移量数据,kafka会自动清空
    • 使用手动提交偏移量的方式,偏移量提交与结果输出是异步的,也重复输出的问题

本示例中:使用手动向kafka提交偏移量

//创建JavaStreamingContext
SparkConf conf = new SparkConf().setAppName("direct").setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));

//创建kafka参数对象
HashMap<String, Object> kafkaParams = new HashMap<>();
//kafka节点
kafkaParams.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
//第一次读取的偏移量位置
kafkaParams.put("auto.offset.reset", "earliest");
//消费者组
kafkaParams.put("group.id", "myGroup");
//key序列化方式
kafkaParams.put("key.deserializer", StringDeserializer.class);
//key序列化方式
kafkaParams.put("value.deserializer", StringDeserializer.class);
//设置偏移量手动提交
kafkaParams.put("enable.auto.commit", false);

//设置读取的topics
Collection<String> topics = Arrays.asList("sm3");

//构建初始DStream,其数据元素为一条kafka的消息具备value值与kafuka的相关属性
//传入 上下文,task分配策略 ,kafkfa参数(topic+参数map)
JavaInputDStream<ConsumerRecord<String, String>> stream =
        KafkaUtils.createDirectStream(
                jsc,
                LocationStrategies.PreferBrokers(),
                ConsumerStrategies.Subscribe(topics, kafkaParams)
        );

//偏移量封装类具备原子更新属性
AtomicReference<OffsetRange[]> newoffset = new AtomicReference();

JavaDStream<String> stream1 = stream.transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<String>>() {
    @Override
    public JavaRDD<String> call(JavaRDD<ConsumerRecord<String, String>> RDD) throws Exception {
        //获取偏移量,新的偏移量必须从初始DStream的第一个RDD中获取,其他的RDD弃置该信息
        OffsetRange[] offsetRanges = ((HasOffsetRanges) RDD.rdd()).offsetRanges();
        //将偏移量设置在封装对象中
        newoffset.set(offsetRanges);

        //将初始DStream解析为普通的RDD,即取出value值
        return RDD.map(new Function<ConsumerRecord<String, String>, String>() {
            @Override
            public String call(ConsumerRecord<String, String> record) throws Exception {
                //可以从每条记录中取出消息值及其kafka相关信息
                record.key(); record.topic(); record.partition();
                return record.value();
            }
        });
    }
});

//在执行触发算子时提交偏移量
stream1.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    @Override
    public void call(JavaRDD<String> RDD) throws Exception {

        /*为了避免数据重复输出,采用以下两种方式解决:
        1.实现输出代码的幂等性
        2.将输出代码与偏移量提交代码写在一个事务中
        */

        //触发算子
        RDD.foreach(s->System.out.println(s));

        //通过偏移量封装对象获取偏移量
        OffsetRange[] offsetRanges = newoffset.get();
        //通过初始DStream转换,使用自动提交偏移量
        ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
    }
});

自定义维护偏移量

  • 实现从外部数据库读取偏移量,将新的偏移量存入外部数据库
  • 这种方式也存在重复输出的问题,需要实现输出幂等或(输出+offset)事务
//kafka参数
HashMap<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("group.id", "myGroup");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("enable.auto.commit", false);

//本例子中直接给定偏移量,生产环境中偏移量从外部外部数据库读入
//偏移量通过map封装
Map<TopicPartition, Long> offsetdata = new HashMap<>();
offsetdata.put(new TopicPartition("topic1", 0), 123L);
offsetdata.put(new TopicPartition("topic1", 1), 444L);

//构建初始DStream。参数:
// 上下文,task分配策略 ,kafkfa参数(topic+参数map+偏移量map)
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
        jsc,
        LocationStrategies.PreferBrokers(),
        ConsumerStrategies.Assign(offsetdata.keySet(), kafkaParams, offsetdata));

//获取偏移量方式与上述方式相同,区别在于提交偏移量的方式不同
AtomicReference<OffsetRange[]> newoffset = new AtomicReference();
JavaDStream<String> stream1 = stream.transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<String>>() {
    @Override
    public JavaRDD<String> call(JavaRDD<ConsumerRecord<String, String>> RDD) throws Exception {
        //获取偏移量
        newoffset.set(((HasOffsetRanges) RDD.rdd()).offsetRanges());
        return RDD.map(x->x.value());
    }
});

stream1.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    @Override
    public void call(JavaRDD<String> RDD) throws Exception {
        /*为了避免数据重复输出,采用以下两种方式解决:
        1.实现输出代码的幂等性
        2.将输出代码与偏移量提交代码写在一个事务中
        */
        //触发算子
        RDD.foreach(s->System.out.println(s));

        //通过偏移量封装对象获取偏移量,遍历偏移量存入数据库
        OffsetRange[] offsetRanges = newoffset.get();
       for(OffsetRange x : offsetRanges ){
           System.out.println(x);
       }

/**
* 开启事务
*/
//1.提交偏移量到redis
//2.提交计算结果到mysql中
//3.做好事务原子性的维护
    }
});

checkpoint维护偏移量

  • driver宕机恢复容易产生数据重复消费
  • 修改代码逻辑,会导致原checkpoint偏移量丢失

代码实现kafka维护基本一致,只需要指定checkpoint的路径

整合参数

  • receiver模式的参数

    • spark.streaming.receiver.writeAheadLog.enable 默认false 是否开启预写日志
    • spark.streaming.blockInterval 默认200ms 对一批数据的切分间隔,用于控制receiver模式下的分区数
    • spark.streaming.receiver.maxRate 无默认值 修改receiver task的最大拉取速率
  • Direct模式
    • spark.streaming.kafka.maxRatePerPartition 修改每个分区拉取数据的最大速率
  • 反压机制
    • spark.streaming.backpressure.enabled 默认false 微调数据拉取速率,提高运行效率
  • 关停任务
    • spark.streaming.stopGracefullyOnShutdown 设置 true
    • kill -15/sigterm driverpid

注:以上参数均可以在conf中设置

遗留问题:HA的ck,对direct模式的影响

原文地址:https://www.cnblogs.com/javaxiaobu/p/11775076.html

时间: 2024-10-08 05:05:23

大数据-SparkStreaming的相关文章

王家林 大数据Spark超经典视频链接全集[转]

压缩过的大数据Spark蘑菇云行动前置课程视频百度云分享链接 链接:http://pan.baidu.com/s/1cFqjQu SCALA专辑 Scala深入浅出经典视频 链接:http://pan.baidu.com/s/1i4Gh3Xb 密码:25jc DT大数据梦工厂大数据spark蘑菇云Scala语言全集(持续更新中) http://www.tudou.com/plcover/rd3LTMjBpZA/ 1 Spark视频王家林第1课:大数据时代的“黄金”语言Scala 2 Spark视

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark"蘑菇云"行动之spark streaming消费flume采集的kafka数据Directf方式作业.     一.基本背景 Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式.具体的流程是这样的: 1.Direct方式是直接连接到kafka的节点上获取数据了. 2.基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offs

Spark 大数据中文分词统计(三) Scala语言实现分词统计

Java版的Spark大数据中文分词统计程序完成之后,又经过一周的努力,把Scala版的Spark 大数据中文分词统计程序也搞出来了,在此分享给各位想学习Spark的朋友. 如下是程序最终运行的界面截图,和Java版差别不大: 如下是Scala工程结构: 当你在工程主类文件WordCounter.scala上单击右键,选择Run As Scala Application: 然后选择唐诗宋词进行分词统计,就会出现前面显示的分词结果. 工程代码已经上传CSDN:http://download.csd

大数据与机器学习的一些博文整理

Spark VS MapReduce 时间节约66%,计算节约40% http://mp.weixin.qq.com/s?__biz=MzA3MjY1MTQwNQ==&mid=200820787&idx=1&sn=638a4b16445a5ee7a184b7a9becf4d5d&scene=2&from=timeline&isappinstalled=0#rd 数据挖掘十大算法总结--核心思想,算法优缺点,应用领域 http://mp.weixin.qq.c

一文读懂大数据计算框架与平台

1.前言 计算机的基本工作就是处理数据,包括磁盘文件中的数据,通过网络传输的数据流或数据包,数据库中的结构化数据等.随着互联网.物联网等技术得到越来越广泛的应用,数据规模不断增加,TB.PB量级成为常态,对数据的处理已无法由单台计算机完成,而只能由多台机器共同承担计算任务.而在分布式环境中进行大数据处理,除了与存储系统打交道外,还涉及计算任务的分工,计算负荷的分配,计算机之间的数据迁移等工作,并且要考虑计算机或网络发生故障时的数据安全,情况要复杂得多. 举一个简单的例子,假设我们要从销售记录中统

如何成为大数据Spark高手

原文连接:http://blog.csdn.net/rlnLo2pNEfx9c/article/details/78778959 Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,从多迭代批量处理出发,兼收并蓄数据仓库.流处理和图计算等多种计算范式,是罕见的全能选手.Spark采用一个统一的技术堆栈解决了云计算大数据的如流处理.图技术.机器学习.NoSQL查询等方面的所有核心问题,具有完善的生态系统,这直接奠定了其一统云计算大数据领域

大数据最佳学习路线总结

一,题记 要说当下IT行业什么最火?ABC无出其右.所谓ABC者,AI + Big Data + Cloud也,即人工智能.大数据和云计算(云平台).每个领域目前都有行业领袖在引领前行,今天我们来讨论下大数据Big Data这个方向. 二,大数据里面的角色 角色一:大数据工程 大数据工程需要解决数据的定义.收集.计算与保存的工作,因此大数据工程师们在设计和部署这样的系统时首要考虑的是数据高可用的问题,即大数据工程系统需要实时地为下游业务系统或分析系统提供数据服务: 角色二:大数据分析 大数据分析

大数据?这些你了解吗?(新手)

一.学习大数据需要的基础 java SE,EE(SSM)        90%的大数据框架都是java写的    MySQL        SQL on Hadoop    Linux        大数据的框架安装在Linux操作系统上 二.需要学什么 第一方面:大数据离线分析 一般处理T+1数据            Hadoop 2.X:(common.HDFS.MapReduce.YARN)                环境搭建,处理数据的思想            Hive:    

大数据技术学习路线,该怎么学?

如果你看完有信心能坚持学习的话,那就当下开始行动吧! 一.大数据技术基础 1.linux操作基础 linux系统简介与安装linux常用命令–文件操作linux常用命令–用户管理与权限linux常用命令–系统管理linux常用命令–免密登陆配置与网络管理linux上常用软件安装linux本地yum源配置及yum软件安装linux防火墙配置linux高级文本处理命令cut.sed.awklinux定时任务crontab2.shell编程 shell编程–基本语法shell编程–流程控制shell编