Storm中的LocalState 代码解析

官方的解释这个类为:

/**
 * A simple, durable, atomic K/V database. *Very inefficient*, should only be
 * used for occasional reads/writes. Every read/write hits disk.
 */

简单来理解就是这个类每次读写都会将一个Map<Object, Object>的对象序列化存储到磁盘中,读的时候将其反序列化。

构造函数指定的参数就是你在磁盘中存储的目录,同时也作为VersionedStore的构造函数的参数。

这些文件在目录中是以一个long类型的id进行命名

public LocalState(String backingDir) throws IOException {
        _vs = new VersionedStore(backingDir);
    }

snapshot函数,找到最近的版本,将其反序列化

    public synchronized Map<Object, Object> snapshot() throws IOException {
        int attempts = 0;
        while (true) {
            String latestPath = _vs.mostRecentVersionPath();   //获取最近的版本
            if (latestPath == null)
                return new HashMap<Object, Object>();
            try {
                return (Map<Object, Object>) Utils.deserialize(FileUtils
                        .readFileToByteArray(new File(latestPath)));
            } catch (IOException e) {
                attempts++;
                if (attempts >= 10) {
                    throw e;
                }
            }
        }
    }
    public Object get(Object key) throws IOException {
        return snapshot().get(key);
    }

    public synchronized void put(Object key, Object val) throws IOException {
        put(key, val, true);
    }

    public synchronized void put(Object key, Object val, boolean cleanup)
            throws IOException {
        Map<Object, Object> curr = snapshot();
        curr.put(key, val);
        persist(curr, cleanup);    //persist会将其写入到磁盘中
    }

    public synchronized void remove(Object key) throws IOException {
        remove(key, true);
    }

    public synchronized void remove(Object key, boolean cleanup)
            throws IOException {
        Map<Object, Object> curr = snapshot();
        curr.remove(key);
        persist(curr, cleanup);
    }

    public synchronized void cleanup(int keepVersions) throws IOException {
        _vs.cleanup(keepVersions);
    }

可以看到,基本暴露的接口都通过synchronized关键字来保证串行化的操作,同时多次调用了以下的persist方法,

    private void persist(Map<Object, Object> val, boolean cleanup)
            throws IOException {
        byte[] toWrite = Utils.serialize(val);
        String newPath = _vs.createVersion();    //创建一个新的版本号
        FileUtils.writeByteArrayToFile(new File(newPath), toWrite);
        _vs.succeedVersion(newPath);    //如果写入成功,那么会生成 id.version 文件来声明该文件写入成功
        if (cleanup)
            _vs.cleanup(4);  //默认保留4个版本
    }

接下来看看VersionedStore这个类,它是进行实际存储操作的类,提供了接口给LocalState

    public void succeedVersion(String path) throws IOException {
        long version = validateAndGetVersion(path);   //验证一下这个文件是否存在
        // should rewrite this to do a file move
        createNewFile(tokenPath(version));   //创建对应的 id.version 文件说明写入成功
    }

path的值是一个long类型的id,表示对应的文件

    private long validateAndGetVersion(String path) {
        Long v = parseVersion(path);
        if (v == null)
            throw new RuntimeException(path + " is not a valid version");
        return v;
    }

//解析出版本号,如果以.version结尾的,去掉.version

    private Long parseVersion(String path) {
        String name = new File(path).getName();
        if (name.endsWith(FINISHED_VERSION_SUFFIX)) {
            name = name.substring(0,
                    name.length() - FINISHED_VERSION_SUFFIX.length());
        }
        try {
            return Long.parseLong(name);
        } catch (NumberFormatException e) {
            return null;
        }
    }
 createNewFile(tokenPath(version));   //创建对应的 id.version 文件说明写入成功

token file就是一种标志文件,用于标志对应的文件已经写入成功,以.version 结尾

    private String tokenPath(long version) {
        return new File(_root, "" + version + FINISHED_VERSION_SUFFIX)
                .getAbsolutePath();
    }
    private void createNewFile(String path) throws IOException {
        new File(path).createNewFile();
    }

cleanup函数,保留versionsToKeep版本,清除其他的版本

    public void cleanup(int versionsToKeep) throws IOException {
        List<Long> versions = getAllVersions();   //获取所有的版本,这个返回的是以倒序排列的,最新的版本在最前面
        if (versionsToKeep >= 0) {
            versions = versions.subList(0,
                    Math.min(versions.size(), versionsToKeep));   //所以可以用subList来得到需要的版本
        }
        HashSet<Long> keepers = new HashSet<Long>(versions);   //存在HashSet中方便快速存取

        for (String p : listDir(_root)) {
            Long v = parseVersion(p);
            if (v != null && !keepers.contains(v)) {
                deleteVersion(v);    //删除其他的版本
            }
        }
    }

getAllVersions,注意这里是获取所有以version结尾的文件,也就是说所有写入成功的文件,不包括某些还没写成功的文件

    /**
     * Sorted from most recent to oldest
     */
    public List<Long> getAllVersions() throws IOException {
        List<Long> ret = new ArrayList<Long>();
        for (String s : listDir(_root)) {   //获取该目录下的所有文件
            if (s.endsWith(FINISHED_VERSION_SUFFIX)) {
                ret.add(validateAndGetVersion(s));   //验证该文件是否存在
            }
        }
        Collections.sort(ret);
        Collections.reverse(ret);  //逆序排列
        return ret;
    }

删除对应的version文件和token文件

    public void deleteVersion(long version) throws IOException {
        File versionFile = new File(versionPath(version));
        File tokenFile = new File(tokenPath(version));

        if (versionFile.exists()) {
            FileUtils.forceDelete(versionFile);
        }
        if (tokenFile.exists()) {
            FileUtils.forceDelete(tokenFile);
        }
    }

在最开始的地方,snapshot()函数调用了 mostRecentVersionPath() 来获取最近的版本,也就是调用getAllVersions,然后拿到最新的version

    public String mostRecentVersionPath() throws IOException {
        Long v = mostRecentVersion();
        if (v == null)
            return null;
        return versionPath(v);
    }
    public Long mostRecentVersion() throws IOException {
        List<Long> all = getAllVersions();
        if (all.size() == 0)
            return null;
        return all.get(0);
    }

如果提供了version号的话,可以看到是取出了比这个version号小的最大的version

    public String mostRecentVersionPath(long maxVersion) throws IOException {
        Long v = mostRecentVersion(maxVersion);
        if (v == null)
            return null;
        return versionPath(v);
    }
    public Long mostRecentVersion(long maxVersion) throws IOException {
        List<Long> all = getAllVersions();
        for (Long v : all) {
            if (v <= maxVersion)   //取出比maxVersion小的最大version
                return v;
        }
        return null;
    }

Storm中的LocalState 代码解析,布布扣,bubuko.com

时间: 2024-10-24 19:51:19

Storm中的LocalState 代码解析的相关文章

Delphi中methodaddress的代码解析

class function TObject.MethodAddress(const Name: ShortString): Pointer;asm        { ->    EAX     Pointer to class        }        {       EDX     Pointer to name }        PUSH    EBX        PUSH    ESI        PUSH    EDI        XOR     ECX,ECX      

捕捉WPF应用程序中XAML代码解析异常

原文:捕捉WPF应用程序中XAML代码解析异常 由于WPF应用程序中XAML代码在很多时候是运行时加载处理的.比如DynamicResource,但是在编译或者运行的过程中,编写的XAML代码很可能有错误,此时XAML代码解析器通常会抛出称为XamlParseException的异常.但是抛出的XamlParseException异常提供的信息非常简单,或者是很不准确.此时我们关于通过对变通的方法来获取更多的异常信息: 我们知道,WPF应用程序中的XAML代码是在InitializeCompon

Ajax.ActionLink浏览器中代码解析问题

<%=Ajax.ActionLink("动态", "Index", "Index", new { id = 1 }, new AjaxOptions() { LoadingElementId = "Loading", UpdateTargetId = "Main", HttpMethod = "Get", OnFailure = "dialogOpen" })%

storm源码之理解Storm中Worker、Executor、Task关系【转】

[原]storm源码之理解Storm中Worker.Executor.Task关系 Storm在集群上运行一个Topology时,主要通过以下3个实体来完成Topology的执行工作:1. Worker(进程)2. Executor(线程)3. Task 下图简要描述了这3者之间的关系:                                                    1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服

Gradle中的buildScript代码块

在编写Gradle脚本的时候,在build.gradle文件中经常看到这样的代码: build.gradle 1 2 3 4 5 6 7 8 9 buildScript { repositories { mavenCentral() } } repositories { mavenCentral() } 这样子很容易让人奇怪,为什么repositories要声明两次哪?buildscript代码块中的声明与下半部分声明有什么不同? 其实答案非常简单.buildscript中的声明是gradle脚

微信中QQ表情的解析(php)

微信公众平台接受的消息中,标签是用'/:'开头的字符串表示的,假设要在网页上显示(比方制作微信大屏幕),就须要进行转换. 所以我向微信公众平台按顺序发送了各个QQ表情,在微信公众平台后台能够看到接受的表情会被解析成https://res.wx.qq.com/mpres/htmledition/images/icon/emotion/0.gif 这种图片.所以自己写一套解析函数就可以. 在微信公众平台后台发现,腾讯自己干了一件错误的事情:有一些表情没有被正确解析,这些标签的特点是有括号.引號这种字

Twitter Storm中Topology的状态

Twitter Storm中Topology的状态 状态转换如下,Topology 的持久化状态包括: active, inactive, killed, rebalancing 四个状态. 代码上看到每种状态都可以转换成一些持久化 ( 写入到 zk 中的状态 ) 或者中间状态. Java代码 (defn state-transitions [nimbus storm-id status] {:active {:monitor (reassign-transition nimbus storm-

ABP中动态WebAPI原理解析

ABP中动态WebAPI原理解析 动态WebAPI应该算是ABP中最Magic的功能之一了吧.开发人员无须定义继承自ApiController的类,只须重用Application Service中的类就可以对外提供WebAPI的功能,这应该算是对DRY的最佳诠释了. 如下图所示,一行代码就为所有实现了IApplicationService的类型,自动创建对应的动态WebAPI. 这么Magic的功能是如何实现的呢? 本文为你揭开其Magic的外表.你会发现,实现如此Magic的功能,最关键的代码

Storm系统架构以及代码结构学习

转自:http://blog.csdn.net/androidlushangderen/article/details/45955833 storm学习系列:http://blog.csdn.net/Androidlushangderen/article/category/2647213 前言 什么是storm,storm是做什么的,一个简单的描述,你可以理解为是一个“准实时”的Hadoop,Hadoop是专门做的是离线数据处理,而storm则弥补了Hadoop在这方面的不足,他是一个实时数据处