Storm常见模式——批处理

Storm对流数据进行实时处理时,一种常见场景是批量一起处理一定数量的tuple元组,而不是每接收一个tuple就立刻处理一个tuple,这样可能是性能的考虑,或者是具体业务的需要。

例如,批量查询或者更新数据库,如果每一条tuple生成一条sql执行一次数据库操作,数据量大的时候,效率会比批量处理的低很多,影响系统吞吐量。

当然,如果要使用Storm的可靠数据处理机制的话,应该使用容器将这些tuple的引用缓存到内存中,直到批量处理的时候,ack这些tuple。

下面给出一个简单的代码示例:

现在,假设我们已经有了一个DBManager数据库操作接口类,它至少有两个接口:

(1)getConnection(): 返回一个java.sql.Connection对象;

(2)getSQL(Tuple tuple): 根据tuple元组生成数据库操作语句。

为了在Bolt中缓存一定数量的tuple,构造Bolt时传递int n参数赋给Bolt的成员变量int count,指定每个n条tuple批量处理一次。

同时,为了在内存中缓存缓存Tuple,使用java concurrent中的ConcurrentLinkedQueue来存储tuple,每当攒够count条tuple,就触发批量处理。

另外,考虑到数据量小(如很长时间内都没有攒够count条tuple)或者count条数设置过大时,因此,Bolt中加入了一个定时器,保证最多每个1秒钟进行一次批量处理tuple。

下面是Bolt的完整代码(仅供参考):

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class BatchingBolt implements IRichBolt {
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;
    private Queue<Tuple> tupleQueue = new ConcurrentLinkedQueue<Tuple>();
    private int count;
    private long lastTime;
    private Connection conn;

    public BatchingBolt(int n) {
        count = n; //批量处理的Tuple记录条数
        conn = DBManger.getConnection(); //通过DBManager获取数据库连接
        lastTime = System.currentTimeMillis(); //上次批量处理的时间戳
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        tupleQueue.add(tuple);
        long currentTime = System.currentTimeMillis();
        // 每count条tuple批量提交一次,或者每个1秒钟提交一次
        if (tupleQueue.size() >= count || currentTime >= lastTime + 1000) {
            Statement stmt = conn.createStatement();
            conn.setAutoCommit(false);
            for (int i = 0; i < count; i++) {
                Tuple tup = (Tuple) tupleQueue.poll();
                String sql = DBManager.getSQL(tup); //生成sql语句
                stmt.addBatch(sql); //加入sql
                collector.ack(tup); //进行ack
            }
            stmt.executeBatch(); //批量提交sql
            conn.commit();
            conn.setAutoCommit(true);
            System.out.println("batch insert data into database, total records: " + count);
            lastTime = currentTime;
        }
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}

时间: 2024-12-09 18:25:45

Storm常见模式——批处理的相关文章

Storm入门(九)Storm常见模式之流聚合

流聚合(stream join)是指将具有共同元组(tuple)字段的数据流(两个或者多个)聚合形成一个新的数据流的过程. 从定义上看,流聚合和SQL中表的聚合(table join)很像,但是二者有明显的区别:table join的输入是有限的,并且join的语义是非常明确的:而流聚合的语义是不明确的并且输入流是无限的. 数据流的聚合类型跟具体的应用有关.一些应用把两个流发出的所有的tuple都聚合起来--不管多长时间:而另外一些应用则只会聚合一些特定的tuple.而另外一些应用的聚合逻辑又可

Storm常见模式——流聚合

转自:http://www.cnblogs.com/panfeng412/archive/2012/06/04/storm-common-patterns-of-stream-join.html 流聚合(stream join)是指将具有共同元组(tuple)字段的数据流(两个或者多个)聚合形成一个新的数据流的过程. 从定义上看,流聚合和SQL中表的聚合(table join)很像,但是二者有明显的区别:table join的输入是有限的,并且join的语义是非常明确的:而流聚合的语义是不明确的

Twitter Storm: storm的一些常见模式

这篇文章列举出了storm topology里面的一些常见模式: 流聚合(stream join) 批处理(Batching) BasicBolt 内存内缓存 + fields grouping 组合 计算top N 用TimeCacheMap来高效地保存一个最近被更新的对象的缓存 分布式RPC: CoordinatedBolt和KeyedFairBolt 流聚合(stream join) 流聚合把两个或者多个数据流聚合成一个数据流 — 基于一些共同的tuple字段.流聚合和SQL里面table

Storm本地模式异常

来自:http://isuifengfei.iteye.com/blog/1998265 问题1 java.net.SocketException: Address family not supported by protocol family: connect 查了下 http://stackoverflow.com/questions/16373906/address-family-not-supported-by-protocol-family-socketexception-on-a-s

《JavaScript高级程序设计》之面向对象创建与继承常见模式

1 //=================method 工厂模式================ 2 var Person = function (name, age) { 3 var object = {}; 4 5 object.name = name; 6 object.age = age; 7 object.say = function (words) { 8 console.info(words); 9 }; 10 object.getName = function () { 11 c

storm本地模式运行无执行问题

在本地编辑工具写了strom的例子,用 LocalCluster 运行后,并没有打印出预期的内容,反而在日志里有出现错误: org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x164258d72a2000b, likely client has closed socket

RabbitMQ Queue一些常见模式

懒队列:lazy Queue,即用到的时候才会加载,3.6.0及之后新添加的.当新添加数据后,不会将其放入到内存中,而是将其放入到磁盘中. 普通队列:1).in-memory,数据直接放入到内存中. 2).on-desk,将数据放入到内存中的同时,还会将消息放到磁盘中,当内存压力升高或GC需要回收内存等情况,会将内存中的数据保存到磁盘中. 一些常见的组合: 1). 原文地址:https://www.cnblogs.com/fanqisoft/p/10397172.html

LB-LVS常见模式NAT/DR部署

Linux Viryual server 项目 LVS NAT模式部署 我们要准备四台虚拟机: 2台webserver 1台分发器 1台client 1.两台server 配置好相同网段 172.16.1.102:172.16.1.103: 网关指向分发器172.16.1.101 (安装webserver) webserver配置ip: [[email protected]1 ~]# ip addr add 172.16.1.102/24 dev ens33 [[email protected]

Node.js 全栈开发的常见模式探究

参考:https://myslide.cn/slides/9954# 原文地址:https://www.cnblogs.com/mengfangui/p/12349319.html