storm-jdbc的使用

最近项目组分配到研究storm-jdbc用法

发现网上关于insert和query方法挺多的,但是自定义方法很少。而且用法上也挺多缺陷。在此自己总结记录一下

JdbcInsertBolt 的核心代码

/*
*
* JdbcInsertBolt org.apache.storm 默认包下 此类不会进行emit。所以插入数据后不能接下层bolt
* 如果需要在插入数据后继续往下游流数据必须在源码中进行添加 在github可以获得类(JdbcLookupBolt也是类似,响应还有hbase和redis)
* */
public static JdbcInsertBolt getJdbcInsertBolt() {
System.out.println("had run.........==============");
//Fields outputFields = new Fields("session","time","count");//输出给下层bolt
//get session time
List<Column> schemaColumns = Lists.newArrayList(new Column("session", Types.VARCHAR),//上游过来的字段
new Column("time", Types.INTEGER));
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(schemaColumns);
//SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, schemaColumns);
JdbcInsertBolt jdbcInsertBolt = new JdbcInsertBolt(connectionProvider,simpleJdbcMapper)
.withInsertQuery("insert into webinfo(session,time) values(?,?)")
.withQueryTimeoutSecs(50);
return jdbcInsertBolt;
}

//query的核心代码

public static JdbcLookupBolt getJdbcLookupBlot() {
Fields outputFields = new Fields("session","time","count");//输出给下层bolt
List<Column> queryParamColumns = Lists.newArrayList(new Column("session", Types.VARCHAR),
new Column("time", Types.INTEGER)//上层bolt流入
);
//String insertSQL = "insert into student_infos(name,age) values(?,?)";
String selectSql = "select count(*) count from student_infos where session=? and time=?";
SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
.withQueryTimeoutSecs(30);

//return userNameLookupBolt;
// JdbcState.Options options = new JdbcState.Options()
// .withConnectionProvider(connectionProvider)
// .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
// .withSelectQuery("select user_name from user_details where user_id = ?")
// .withQueryTimeoutSecs(30);
// JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
return userNameLookupBolt;
}

//自定义查询方法的核心代码

private JdbcClient client;
public void setup() {
// Map map = Maps.newHashMap();
// map.put("dataSourceClassName","org.hsqldb.jdbc.JDBCDataSource");//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
// map.put("dataSource.url", "jdbc:hsqldb:mem:test");//jdbc:mysql://localhost/test
// map.put("dataSource.user","SA");//root
// map.put("dataSource.password","");//password
// ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
connectionProvider.prepare();

int queryTimeoutSecs = 60;
this.client = new JdbcClient(connectionProvider, queryTimeoutSecs);
client.executeSql("UPDATE webinfo s SET s.session=‘haitao‘ where s.time = ‘4‘ ");
}

关于用stormJDBC 效率提升上还是挺大的,项目组兄弟自己写插入一条在50MS左右,大数据下这种情况是不允许的,太慢,然后通过storm开源工具插入,大概在初始化50MS,后面每条是5MS。当然想更快应该也可以做事务JDBC。暂时还没研究

更多参考资料:

https://insight.io/github.com/apache/storm/tree/HEAD/external/storm-jdbc/

时间: 2024-12-29 19:19:02

storm-jdbc的使用的相关文章

Storm集群上的开发 ,任务计算输出到mysql数据库,集成jdbc(十)

storm集成jdbc,把计算结果保存到mysql中. 首先在mysql中建表 ,表的字段与输出的tuple的schema一致: create table result( word varchar(20), total int ); 编写一个连接提供器,用于获取mysql数据库连接: 需要引入jar :/usr/local/apps/apache-storm-1.0.3/external/storm-jdbc 的 storm-jdbc-1.0.3.jar package mystorm.word

storm热力图项目(后台)

1.安装启动zookeeper /home/hadoop/app/zookeeper-3.4.5-cdh5.7.0/conf/zoo.cfg bin/zkServer start 2.安装启动logstash /home/hadoop/app/logstash-2.4.1/project.conf bin/logstash -f  project.conf 3.安装启动kafka /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properti

Apache Storm 1.1.0 中文文档 | ApacheCN

前言  Apache Storm 是一个免费的,开源的,分布式的实时计算系统. 官方文档: http://storm.apache.org 中文文档: http://storm.apachecn.org ApacheCN 最近组织了翻译 Storm 1.1.0 中文文档 的活动,整体 翻译进度 为 96%. 感谢大家参与到该活动中来 感谢无私奉献的 贡献者,才有了这份 Storm 1.1.0 中文文档 感谢一路有你的陪伴,我们才可以做的更好,走的更快,走的更远,我们一直在努力 ... 网页地址:

使用Storm实现实时大数据分析

摘要:随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战.Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析.CSDN在此编译.整理. 简单和明了,Storm让大数据分析变得轻松加愉快. 当今世界,公司的日常运营经常会生成TB级别的数据.数据来源囊括了互联网装置可以捕获的任何类型数据,网站.社交媒体.交易型商业数据以及其它商业环境中创建的数据.考虑到数据的生成量,实时处理成为了许多机

Flume-ng+Kafka+storm的学习笔记

Flume-ng Flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统. Flume的文档可以看http://flume.apache.org/FlumeUserGuide.html 官方的英文文档 介绍的比较全面. 不过这里写写自己的见解 这个是flume的架构图 从上图可以看到几个名词: Agent: 一个Agent包含Source.Channel.Sink和其他的组件.Flume就是一个或多个Agent构成的. Source:数据源.简单的说就是agent获取数据的入口

Storm构建分布式实时处理应用初探(转)

最近利用闲暇时间,又重新研读了一下Storm.认真对比了一下Hadoop,前者更擅长的是,实时流式数据处理,后者更擅长的是基于HDFS,通过MapReduce方式的离线数据分析计算.对于Hadoop,本身不擅长实时的数据分析处理.两者的共同点都是分布式的架构,而且,都类似有主/从关系的概念.本文中我就不具体阐述Strom集群和Zookeeper集群如何部署的问题,我想通过一个实际的案例切入,分析一下如何利用Storm,完成实时分析处理数据的. Storm本身是Apache托管的开源的分布式实时计

Storm构建分布式实时处理应用初探

最近利用闲暇时间,又重新研读了一下Storm.认真对比了一下Hadoop,前者更擅长的是,实时流式数据处理,后者更擅长的是基于HDFS,通过MapReduce方式的离线数据分析计算.对于Hadoop,本身不擅长实时的数据分析处理.两者的共同点都是分布式的架构,而且,都类似有主/从关系的概念.本文中我就不具体阐述Strom集群和Zookeeper集群如何部署的问题,我想通过一个实际的案例切入,分析一下如何利用Storm,完成实时分析处理数据的. Storm本身是Apache托管的开源的分布式实时计

大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

做软件开发的都知道模块化思想,这样设计的原因有两方面: 一方面是可以模块化,功能划分更加清晰,从"数据采集--数据接入--流失计算--数据输出/存储" 1).数据采集 负责从各节点上实时采集数据,选用cloudera的flume来实现 2).数据接入 由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka 3).流式计算 对采集到的数据进行实时分析,选用apache的storm 4).数据输出 对分析后的结果持久化,暂定用mysql

storm 与mysql整合问题

首先说明下问题的情况, 1.我storm 环境已经搭建完成,在本地测试wordcount是没问题的, 2.我在wordcount中加入一个MysqlBolt,此Bolt只是简单的把 wordcount的结果存入mysql数据库中,在本地模式测试测试时,完全可以把结果插入指定表. 3.我的每个storm 节点都已经把mysql-connector-java-5.1.23.jar 放到storm的lib目录下. 4.每个节点均可以访问指定数据库,都已经开通相应权限 5.并且在远程模式下执行原始wor

Log Processing With Storm

有代码的书籍看起来就是爽,看完顺便跑个demo,感觉很爽! 场景分析 主要是利用apache的访问日志来进行分析统计 如用户的IP来源,来自哪个国家或地区,用户使用的Os,浏览器等信息,以及像搜索的热词等信息的统计 这里日志信息如下 24.25.135.19 - - [1-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "M