【慕课网实战】Spark Streaming实时流处理项目实战笔记二十之铭文升级版

铭文一级:

Spring Boot整合Echarts动态获取HBase的数据
1) 动态的传递进去当天的时间
a) 在代码中写死
b) 让你查询昨天的、前天的咋办?
在页面中放一个时间插件(jQuery插件),默认只取当天的数据
2) 自动刷新展示图
每隔多久发送一个请求去刷新当前的数据供展示

统计慕课网当天实战课程从搜索引擎过来的点击量
数据已经在HBase中有的
自己通过Echarts整合Spring Boot方式自己来实现

铭文二级:

在Spring Boot项目pom.xml下引入<repositories>

<repositories>
  <repository>
    <id>cloudera</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
  </repository>
</repositories>

  

引入HBase的依赖:org.apache.hbase:hbase-client:1.2.0-cdh5.7.0

拷贝HBaseUtils进来过来改,添加方法:

/**
     * 根据表名和输入条件获取HBase的记录数
     */
    public Map<String, Long> query(String tableName, String condition) throws Exception {
        Map<String, Long> map = new HashMap<>();
        HTable table = getTable(tableName);
        String cf = "info";
        String qualifier = "click_count";
        Scan scan = new Scan();
        Filter filter = new PrefixFilter(Bytes.toBytes(condition));
        scan.setFilter(filter);
        ResultScanner rs = table.getScanner(scan);
        for(Result result : rs) {
            String row = Bytes.toString(result.getRow());
            long clickCount = Bytes.toLong(result.getValue(cf.getBytes(), qualifier.getBytes()));
            map.put(row, clickCount);
        }
        return  map;
    }

重点:

先得到table,再根据查询条件condition过滤出rs,

然后遍历rs->直接获得row,根据cf、qualifier得到点击数clickCount

将row、clickCount添加put进map

过滤条件:

Scan scan = new Scan();

scan.setFilter(new PrefixFilter(Bytes.toBytes(condition)));

ResultScanner rs = getTable(tableName).getScanner(scan);

添加主函数进行测试:

public static void main(String[] args) throws Exception {
        Map<String, Long> map = HBaseUtils.getInstance().query("imooc_course_clickcount" , "20171022");

        for(Map.Entry<String, Long> entry: map.entrySet()) {
            System.out.println(entry.getKey() + " : " + entry.getValue());
        }
    }  

重点:

Map.Entry<String, Long> entry: map.entrySet()

entry.getKey() + " : " + entry.getValue()

此处用到的是第三种:Map集合的四种遍历方式

返回的结果为:20171022_128 :1066  //只是获得课程id无法满足要求要求

为了匹配Echarts的name、value属性,所以需要组装一个domain:CourseClickCount

新建一个数据访问类:CourseClickCountDAO

新建query方法,返回值为List<CourseClickCount>

参考代码:

package com.imooc.dao;
import com.imooc.domain.CourseClickCount;
import com.imooc.utils.HBaseUtils;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * 实战课程访问数量数据访问层
 */
@Component
public class CourseClickCountDAO {
    /**
     * 根据天查询
     */
    public List<CourseClickCount> query(String day) throws Exception {
        List<CourseClickCount> list = new ArrayList<>();
        // 去HBase表中根据day获取实战课程对应的访问量
        Map<String, Long> map = HBaseUtils.getInstance().query("imooc_course_clickcount","20171022");
        for(Map.Entry<String, Long> entry: map.entrySet()) {
            CourseClickCount model = new CourseClickCount();
            model.setName(entry.getKey());
            model.setValue(entry.getValue());
            list.add(model);
        }
        return list;
    }
    public static void main(String[] args) throws Exception{
        CourseClickCountDAO dao = new CourseClickCountDAO();
        List<CourseClickCount> list = dao.query("20171022");
        for(CourseClickCount model : list) {
            System.out.println(model.getName() + " : " + model.getValue());
        }
    }
}

但是,得到的还是课程Id,还没有具体的名称,所以还要加一层映射关系

建立Web层ImoocStatApp:

/**
 * web层
 */
@RestController
public class ImoocStatApp {
    private static Map<String, String> courses = new HashMap<>();
    static {
        courses.put("112","Spark SQL慕课网日志分析");
        courses.put("128","10小时入门大数据");
        courses.put("145","深度学习之神经网络核心原理与算法");
        courses.put("146","强大的Node.js在Web开发的应用");
        courses.put("131","Vue+Django实战");
        courses.put("130","Web前端性能优化");
    }
    @Autowired
    CourseClickCountDAO courseClickCountDAO;
//    @RequestMapping(value = "/course_clickcount_dynamic", method = RequestMethod.GET)
//    public ModelAndView courseClickCount() throws Exception {
//        ModelAndView view = new ModelAndView("index");
//        List<CourseClickCount> list = courseClickCountDAO.query("20171022");
//        for(CourseClickCount model : list) {
//            model.setName(courses.get(model.getName().substring(9)));
//        }
//        JSONArray json = JSONArray.fromObject(list);
//        view.addObject("data_json", json);
//        return view;
//    }
    @RequestMapping(value = "/course_clickcount_dynamic", method = RequestMethod.POST)
    @ResponseBody
    public List<CourseClickCount> courseClickCount() throws Exception {
        List<CourseClickCount> list = courseClickCountDAO.query("20171022");
        for(CourseClickCount model : list) {
            model.setName(courses.get(model.getName().substring(9)));
        }
        return list;
    }
    @RequestMapping(value = "/echarts", method = RequestMethod.GET)
    public ModelAndView echarts(){
        return new ModelAndView("echarts");
    }
}

static{}内容应该配到数据库里面去,此处只是为了方便

另外需要添加注解

ImoocStatApp:@RestController

CourseClickCountDAO层:@Component

HelloBoot:@RestController

domain/CourseClickCount:@Component

自动装载:@Autowired

如果不是用ajax获取数据则:ModelAndView courseClickCount()

若是则升级成:List<CourseClickCount> courseClickCount()

注意:json格式需要添加依赖

net.sf.json-lib:json-lib:2.4:jdk1.5(classifier)

原文地址:https://www.cnblogs.com/kkxwz/p/8408895.html

时间: 2024-11-03 19:23:50

【慕课网实战】Spark Streaming实时流处理项目实战笔记二十之铭文升级版的相关文章

【慕课网实战】Spark Streaming实时流处理项目实战笔记十之铭文升级版

铭文一级: 第八章:Spark Streaming进阶与案例实战 updateStateByKey算子需求:统计到目前为止累积出现的单词的个数(需要保持住以前的状态) java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint(). 需求:将统计结果写入到MySQLcre

【慕课网实战】Spark Streaming实时流处理项目实战笔记八之铭文升级版

铭文一级: Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Spark Streaming个人的定义: 将不同的数据源的数据经过Spark Streaming处理之后将结果输出到外部文件系统 特点 低延时 能从错误中高效的恢复:fault-toler

【慕课网实战】Spark Streaming实时流处理项目实战笔记十五之铭文升级版

铭文一级:[木有笔记] 铭文二级: 第12章 Spark Streaming项目实战 行为日志分析: 1.访问量的统计 2.网站黏性 3.推荐 Python实时产生数据 访问URL->IP信息->referer和状态码->日志访问时间->写入到文件中 本地与虚拟机都要装了python才能运行 重要代码: #coding=UTF-8 #数组最后一个没有"," url_paths = [ "class/128.html", "class

【慕课网实战】Spark Streaming实时流处理项目实战笔记十六之铭文升级版

铭文一级: linux crontab 网站:http://tool.lu/crontab 每一分钟执行一次的crontab表达式: */1 * * * * crontab -e */1 * * * * /home/hadoop/data/project/log_generator.sh 对接python日志产生器输出的日志到Flumestreaming_project.conf 选型:access.log ==> 控制台输出 exec memory logger exec-memory-log

【慕课网实战】Spark Streaming实时流处理项目实战笔记十八之铭文升级版

铭文一级: 功能二:功能一+从搜索引擎引流过来的 HBase表设计create 'imooc_course_search_clickcount','info'rowkey设计:也是根据我们的业务需求来的 20171111 +search+ 1 项目打包:mvn clean package -DskipTests 报错:[ERROR] /Users/rocky/source/work/sparktrain/src/main/scala/com/imooc/spark/project/dao/Cou

【慕课网实战】Spark Streaming实时流处理项目实战笔记七之铭文升级版

铭文一级: 第五章:实战环境搭建 Spark源码编译命令:./dev/make-distribution.sh \--name 2.6.0-cdh5.7.0 \--tgz \-Pyarn -Phadoop-2.6 \-Phive -Phive-thriftserver \-Dhadoop.version=2.6.0-cdh5.7.0 铭文二级: 第五章:实战环境搭建(所有都配置到环境变量) 1.Scala的安装:Download->previous releases  //课程使用2.11.8

Spark Streaming实时流处理项目实战

第1章 课程介绍   1-1 -导学-   1-2 -授课习惯和学习建议   1-3 -OOTB环境使用演示   1-4 -Linux环境及软件版本介绍   1-5 -Spark版本升级第2章 初识实时流处理   2-1 -课程目录   2-2 -业务现状分析   2-3 -实时流处理产生背景   2-4 -实时流处理概述   2-5 -离线计算和实时计算对比   2-6 -实时流处理框架对比   2-7 -实时流处理架构及技术选型   2-8 -实时流处理在企业中的应用第3章 分布式日志收集框

【慕课网实战】Spark Streaming实时流处理项目实战笔记三之铭文升级版

铭文一级: Flume概述Flume is a distributed, reliable, and available service for efficiently collecting(收集), aggregating(聚合), and moving(移动) large amounts of log data webserver(源端) ===> flume ===> hdfs(目的地) 设计目标: 可靠性 扩展性 管理性 业界同类产品的对比 (***)Flume: Cloudera/A

【慕课网实战】Spark Streaming实时流处理项目实战笔记十九之铭文升级版

铭文一级:(没有内容) 铭文二级: 创建Spring boot项目: 看官网,Quick Start下面有两个依赖,必须得使用 但是如果用IDEA构建Spring boot,则会自动添加 New Project->Spring Initializr->Next 任意确定: com.imooc.spark web 选版本.点击左边的web->勾上web project name:imooc_web Reference里修改成本地的maven版本 删除多余的文件:mvn.mvnw.mvnw.