Java开发FTP功能的apache工具包,小心使用为妙

本人原创,转载请注明出处!欢迎大家加入Giraph
技术交流群
: 228591158

Giraph中Aggregator的基本用法请参考官方文档:http://giraph.apache.org/aggregators.html ,本文重点在解析Giraph如何实现Aggregators后文用图示的方法描述了Aggregator的执行过程。

基本原理:在每个超级步中,每个Worker计算本地的聚集值。超级步计算完成后,把本地的聚集值发送给Master汇总。在MasterCompute()执行后,把全局的聚集值回发给所有的Workers。

缺点:当某个应用(或算法)使用了多个聚集器(Aggregators),Master要完成所有聚集器的计算。因为Master要接受、处理、发送大量的数据,无论是在计算方面还是网络通信层次,都会导致Master成为系统瓶颈。

改进:采用分片聚集 (sharded aggregators) . 在每个超级步的最后,每个聚集器被派发给一个Worker,该Worker接受和聚集其他Workers发送给该聚集器的值。然后Workers把自己的所有的聚集器发送给Master,这样Master就无需执行任何聚集,只是接收每个聚集器的最终值。在MasterCompute.compute执行后,Master不是直接把所有的聚集器发送给所有的Workers,而是发送给聚集器所属的Worker,然后每个Worker再把其上的聚集器发送给所有的Workers.

首先给出Master <-- > Worker间, Worker <--> Worker间通信协议,在每个类中的doRequest(ServerData serverData)方法中会解析并存储收到的消息。

1).  org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest 类 . Worker --> Worker Owner

功能:每个worker把当前超步的局部 aggregated values 发送到该Aggregator的拥有者。

2).  org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest 类. Worker Owner--> Master

功能:每个Worker把自己所拥有的Aggregator的最终 aggregated values 发送给 master。

3).  org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest 类. Master --> Worker Owner.

功能:master把最终的 aggregated values 或aggregators 发送给该Aggregator的拥有者。

4).  org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest 类。 Worker Owner--> Worker

功能: 发送最终的 aggregated values 到 其他workers。发送者为该Aggregator的拥有者,接受者为除发送者之外的所有workers。

Aggregator分类和 注册

Giraph中把Aggregator分为两类:regular aggregators和persistent aggregators。regular aggregators的值在每个超级步开始会被重置为初始值,然而persistent aggregators的值在整个应用(算法)中一直保持。举例来说,若LongSumAggregator在每个顶点的compute()方法中加1,如果使用regular
aggregators,在每个超级步中就可以读取前一个超级步的参与计算的顶点总数;如果使用persistent aggregators,就可以获取前面所有超级步中参与计算的顶点总和。

在使用aggregator之前,必须要在mastes上Registering aggregators。做法:继承org.apache.giraph.master.DefaultMasterCompute类,重写 void initalize() 方法。在该方法中注册aggregators,语法如下:

registerAggregator(aggregatorName, aggregatorClass)

registerPersistentAggregator(aggregatorName, aggregatorClass)

说明:MasterCompute.initalize()方法只在第 INPUT_SUPERSTEP (-1) 超级步中执行一次,具体在 BSPServiceMaster.runMasterCompute(long superstep)方法中。在MasterCompute.compute()方法中,可以使用下述方法读取或修改聚集器的值。

getAggregatedValue(aggregatorName) //获取前一个超级步的聚集器值

setAggregatedValue(aggregatorName, aggregatedValue) //修改聚集器的值

MasterCompute.compute()总是在Vertex.compute()前执行。 由于第 INPUT_SUPERSTEP ( -1)个超级步进行的是数据的加载和重分布过程,不计算Vertex.compute()。第0个超级步Vertex.compute()又是在MasterCompute.compute()方法后执行。故对第 -1 、 0个超级步MasterCompute.compute()方法中获得的聚集器值均为其初始值。从第1个超级步开始,MasterCompute.compute()方法才获得了所有Vertex.compute()在第0个超级步聚集的值。

1. 从第0个超级步开始,BspServiceMaster调用MasterAggregatorHandler类的finishSuperStep(MasterClient masterClient) 方法把聚集器派发给Worker,聚集器的value为上一个超级步的全局聚集值(final aggregated values),第一次为初始值。先给出MasterAggregatorHandler的类继承关系,如下:

finishSuperStep(MasterClient masterClient) 方法核心内容如下:

  /**
   * Finalize aggregators for current superstep and share them with workers
   */
  public void finishSuperstep(MasterClient masterClient) {
    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
      if (aggregator.isChanged()) {
        // if master compute changed the value, use the one he chose
        aggregator.setPreviousAggregatedValue(
            aggregator.getCurrentAggregatedValue());
        // reset aggregator for the next superstep
        aggregator.resetCurrentAggregator();
      }
    }

    /**
     * 把聚集器发送给所属的Worker。发送内容:
     * 1). Name of the aggregator
     * 2). Class of the aggregator
     * 3). Value of the aggretator
     */
    try {
      for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
          aggregatorMap.entrySet()) {
        masterClient.sendAggregator(entry.getKey(),
            entry.getValue().getAggregatorClass(),
            entry.getValue().getPreviousAggregatedValue());
      }
      masterClient.finishSendingAggregatedValues();
    } catch (IOException e) {
      throw new IllegalStateException("finishSuperstep: " +
          "IOException occurred while sending aggregators", e);
    }
  }

问题1:如何确定aggregator的Worker Owner ?

答:根据aggregator的Name来确定它所属的Worker,计算方法如下:

/**
 * 根据aggregatorName和所有的workers列表来计算aggregator所属的Worker
 * 参数aggregatorName:Name of the aggregator
 * 参数workers: Workers的list列表
 * 返回值:Worker which owns the aggregator
 */
public static WorkerInfo getOwner(String aggregatorName,List<WorkerInfo> workers) {
    //用aggregatorName的HashCode()值模以 Workers的总数目
    int index = Math.abs(aggregatorName.hashCode() % workers.size());
    return workers.get(index);  //返回aggregator所属的Worker
}

问题2:Worker 如何判断自身是否接收完自己所拥有的aggregators?

答:Master给某个Worker发送aggregators时,同时发送到该Worker的aggregators数目。使用的 SendAggregatorsToOwnerRequest类对消息进行封装和解析。

2. Worker接受Master发送的Aggregator,Worker把接收到的聚集体值发送给其他所有Workers,然后每个Workers就会得到上一个超级步的全局聚集值。

由前文知道,每个Worker都有一个ServerData对象,ServerData类中关于Aggregator的两个成员变量如下:

// 保存Worker在当前超步拥有的aggregators
private final OwnerAggregatorServerData ownerAggregator;
// 保存前一个超步的aggregators
private final AllAggregatorServerData allAggregatorData;

可以看到,ownerAggregatorData用来存储在当前超步Master发送给Worker的聚集器,allAggregatorData用来保存上一个超级步全局的聚集值。ownerAggregatorData和allAggregatorData值的初始化在SendAggregatorsToOwnerRequest 类中的doRequest(ServerData serverData)方法中,如下:

public void doRequest(ServerData serverData) {
    DataInput input = getDataInput();
    AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
    try {
      //收到的Aggregators数目。在CountingOutputStream类中有计数器counter,
      //每向输出流中添加一个聚集器对象,计数加1. 发送时,在flush方法中把该值插入到输出流最前面。
      int numAggregators = input.readInt();
      for (int i = 0; i < numAggregators; i++) {
        String aggregatorName = input.readUTF();
        String aggregatorClassName = input.readUTF();
        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
          LongWritable count = new LongWritable(0);
          //Master发送给该Worker的requests总数目.
          count.readFields(input);
          aggregatorData.receivedRequestCountFromMaster(count.get(),
              getSenderTaskId());
        } else {
          Class<Aggregator<Writable>> aggregatorClass =
              AggregatorUtils.getAggregatorClass(aggregatorClassName);
          aggregatorData.registerAggregatorClass(aggregatorName,
              aggregatorClass);
          Writable aggregatorValue =
              aggregatorData.createAggregatorInitialValue(aggregatorName);
          aggregatorValue.readFields(input);
          //把收到的上一次全局聚集的值赋值给allAggregatorData
          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
          //ownerAggregatorData只接受聚集器
          serverData.getOwnerAggregatorData().registerAggregator(
              aggregatorName, aggregatorClass);
        }
      }
    } catch (IOException e) {
      throw new IllegalStateException("doRequest: " +
          "IOException occurred while processing request", e);
    }
    //接受一个 request,计数减1,同时把收到的Data添加到allAggregatorServerData的List<byte[]> masterData中
    aggregatorData.receivedRequestFromMaster(getData());
 }

每个Worker在开始计算前,会调用BspServiceWorker类的prepareSuperStep()方法来进行聚集器值的派发和接受其他Workers发送的聚集器值。调用关系如下:

BspServiceWorker类的prepareSuperStep()方法如下:

@Override
public void prepareSuperstep() {
   if (getSuperstep() != INPUT_SUPERSTEP) {
     /*
      * aggregatorHandler为WorkerAggregatorHandler类型,
      * 可参考上文中MasterAggregatorHandler的类继承关系.
      * workerAggregatorRequestProcessor声明为WorkerAggregatorRequestProcessor(接口)
      * 类型,实际为NettyWorkerAggregatorRequestProcessor的实例,
      * 用于Worker间发送聚集器的值。
      */
      aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
   }
}

WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法如下:

public void prepareSuperstep(WorkerAggregatorRequestProcessor requestProcessor) {
    AllAggregatorServerData allAggregatorData =
        serviceWorker.getServerData().getAllAggregatorData();
    /**
     * 等待直到Master发送给该Worker的聚集器都已接受完,
     * 返回值为Master发送给该Worker的所有Data(聚集器)
     */
    Iterable<byte[]> dataToDistribute =
        allAggregatorData.getDataFromMasterWhenReady(
            serviceWorker.getMasterInfo());

    // 把从Master收到的Data(聚集器)发送给其他所有Workers
    requestProcessor.distributeAggregators(dataToDistribute);

    // 等待直到接受完其他Workers发送给该Workers的聚集器
    allAggregatorData.fillNextSuperstepMapsWhenReady(
        getOtherWorkerIdsSet(), previousAggregatedValueMap,
        currentAggregatorMap);
    // 只是清空allAggregatorServerData的List<byte[]> masterData对象
    // 为下一个超级步接受Master发送的聚集器做准备
    allAggregatorData.reset();
}

下面详述Worker如何判定已接收完所有Master发送的所有Request ? 主要目的在于描述分布式环境下线程间如何协作。在AllAggregatorServerData类中定义了TaskIdsPermitBarrier类型的变量masterBarrier,用来判断是否接收完Master发送的Request. TaskIdsPermitBarrier类中主要使用wait()、notifyAll()等方法来控制,当获得的aggregatorName等于AggregatorUtils.SPECIAL_COUNT_AGGREGATOR时,会调用requirePermits(long
permits,int taskId)来增加接收的arrivedTaskIds和需要等待的request数目waitingOnPermits. 接受一个Request

  /**
   * Require more permits. This will increase the number of times permits
   * were required. Doesn't wait for permits to become available.
   *
   * @param permits Number of permits to require
   * @param taskId Task id which required permits
   */
  public synchronized void requirePermits(long permits, int taskId) {
    arrivedTaskIds.add(taskId);
    waitingOnPermits += permits;
    notifyAll();
  }

接受一个Request后,会调用releaseOnePermit()方法把waitingOnPermits减1。

3. 在Vertex.compute()方法中,每个Worker聚集自身的值。计算完成后,调用WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法,把本地的聚集器的值给句聚集器的aggregatorName发送给该aggregator所属的Worker. Aggregator的属主Worker接受其他所有Workers发送的本地聚集值进行汇总,汇总完毕后发送给Master,供下一次超级步的MasterCompute.compute()方法使用。finishSuperstep方法如下:

 /**
   * Send aggregators to their owners and in the end to the master
   *
   * @param requestProcessor Request processor for aggregators
   */
  public void finishSuperstep(
      WorkerAggregatorRequestProcessor requestProcessor) {
    OwnerAggregatorServerData ownerAggregatorData =
        serviceWorker.getServerData().getOwnerAggregatorData();
    // First send partial aggregated values to their owners and determine
    // which aggregators belong to this worker
    for (Map.Entry<String, Aggregator<Writable>> entry :
        currentAggregatorMap.entrySet()) {
        boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
            entry.getValue().getAggregatedValue());
        if (!sent) {
          // If it's my aggregator, add it directly
          ownerAggregatorData.aggregate(entry.getKey(),
              entry.getValue().getAggregatedValue());
        }
    }
    // Flush
    requestProcessor.flush();
    // Wait to receive partial aggregated values from all other workers
    Iterable<Map.Entry<String, Writable>> myAggregators =
        ownerAggregatorData.getMyAggregatorValuesWhenReady(
            getOtherWorkerIdsSet());

    // Send final aggregated values to master
    AggregatedValueOutputStream aggregatorOutput =
        new AggregatedValueOutputStream();
    for (Map.Entry<String, Writable> entry : myAggregators) {
        int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
            entry.getValue());
        if (currentSize > maxBytesPerAggregatorRequest) {
          requestProcessor.sendAggregatedValuesToMaster(
              aggregatorOutput.flush());
        }
    }
    requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
    // Wait for master to receive aggregated values before proceeding
    serviceWorker.getWorkerClient().waitAllRequests();
    ownerAggregatorData.reset();
  }

调用关系如下:

4. 大同步后,Master调用MasterAggregatorHandler类的prepareSusperStep(masterClient)方法,收集聚集器的值。方法内容如下:

  public void prepareSuperstep(MasterClient masterClient) {

    // 收集上次超级步的聚集值,为master compute 做准备
    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
	// 如果是 Persistent Aggregator,则累加
	if (aggregator.isPersistent()) {
        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
      }
      aggregator.setPreviousAggregatedValue(
          aggregator.getCurrentAggregatedValue());
      aggregator.resetCurrentAggregator();
      progressable.progress();
    }
  }

然后调用MasterCompute.compute()方法(可能会修改聚集器的值),在该方法内若根据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知所有Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法,则回到步骤1继续进行迭代。

说明:Job迭代结束条件有三,满足其一就行:

1) 达到最大迭代次数

2) 没有活跃顶点且没有消息在传递

3) 终止MasterCompute计算

总结:为解决在多个Aggregator条件下,Master成为系统瓶颈的问题。采取了把所有Aggregator派发给某一部分Workers,由这些Workers完成全局的聚集值的计算与发送,Master只需要与这些Workers进行简单数据通信即可,大大降低了Master的工作量。

附加:下面用图示方法说明上述执行过程。

实验条件:

1). 一个Master,四个Worker

2). 两个Aggregators,记为A1和A2。

1. Master把Aggregators发送给Workers,收到Aggregator的Worker就作为该Aggregator的Owner。下图中Master把A1发送给Worker1,A2发送给Worker3.那么Worker1就作为A1的Owner,Worker3就是A2的Owner。该步骤在MasterAggregatorHandler类的finishSuperStep(MasterClient masterClient) 方法中完成,使用的是SendAggregatorsToOwnerRequest
通信协议。注:每个Owner Worker 可能有多个聚集器。

图1 Master分发Aggregator

2. Workers接受Master发送的Aggregator,然后把Aggregator发送给其他Workers。Worker1要把A1分别发送给Worker2、Worker3和Worker4;Worker3要把A2分别发送给Worker1、Worker2和Worker4。该步骤在WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完成,使用的是SendAggregatorsToMasterRequest
通信协议。此步骤完成后,每个Worker上都有了聚集器A1和A2(具体为上一个超步的全局最终聚集值)。

3. 每个Worker调用Vertex.compute()方法开始计算,收集本地的Aggregator聚集值。对聚集体A1来说,Worker1、Worker2、Worker3、Worker4的本地聚集值依次记为:A1、A12
A13A14;对聚集器A2来说,Worker1、Worker2、Worker3、Worker4的本地聚集值依次记为:A2、A22
A23、A24。计算完成后,每个Worker就要把本地的聚集值发送给聚集器的Owner,聚集器的Owner在接收的时候会合并聚集。那么A1、A12
A13、A14要发送给Worker1进行全局聚集得到A1’,A21 、A22
A23、A24要发送给Worker3进行全局聚集得到A2’。计算公式如下:

此部分采用的是SendWorkerAggregatorsRequest通信协议。Worker1和Worker3要把汇总的A1和A2的新值:A1’ 和A2’发送给Master,供下一次超级步的MasterCompute.compute()方法使用采用的是SendAggregatorsToMasterRequest通信协议。此部分在WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完成。过程如下图所示:

4. Master收到Worker1发送的A1’ 和Woker3发送的A2’后,此步骤在MasterAggregatorHandler类的prepareSusperStep(masterClient)方法中完成。然后调用MasterCompute.compute()方法,此方法可能会修改聚集器的值,如得到A1’’和A2’’。在masterCompute.compute()方法内若根据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知所有Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法,则回到步骤1继续进行迭代,继续把A1’’发送给Worker1,A2’’发送给Worker3。

完!

本人原创,转载请注明出处!欢迎大家加入Giraph
技术交流群
: 228591158

Java开发FTP功能的apache工具包,小心使用为妙

时间: 2024-10-16 04:46:10

Java开发FTP功能的apache工具包,小心使用为妙的相关文章

阿里云短信服务发送短信验证码(JAVA开发此功能)

开发此功能需注册阿里云账号,并开通短信服务(免费开通) 充值后,不会影响业务的正常使用!(因为发送验证类短信:1-10万范围的短信是0.045元/条).开发测试使用,充2块钱测试足够了 可参考阿里云官方开发文档了解详情,文档中写的也是很详细了... https://help.aliyun.com/product/44282.html 代码编写之前需要准备几个东西 1,aliyun-java-sdk-core.jar ,  aliyun-java-sdk-dysmsapi.jar  这2个jar包

Apache Solr采用Java开发、基于Lucene的全文搜索服务器

http://docs.spring.io/spring-data/solr/ 首先介绍一下solr: Apache Solr (读音: SOLer) 是一个开源.高性能.采用Java开发.基于Lucene的全文搜索服务器,文档通过Http利用XML加到一个搜索集合中,查询该集合也是通过 http收到一个XML/JSON响应来实现.Solr 中存储的资源是以 Document 为对象进行存储的.每个文档由一系列的 Field 构成,每个 Field 表示资源的一个属性.Solr 中的每个 Doc

JAVA开发常用工具包

一个有经验的Java开发人员特征之一就是善于使用已有的轮子来造车.<Effective Java>的作者Joshua Bloch曾经说过:“建议使用现有的API来开发,而不是重复造轮子”.在本文中,我将分享一些Java开发人员应该熟悉的最有用的和必要的库和API.顺便说一句,这里不包括框架,如Spring和Hibernate因为他们非常有名,都有特定的功能. 本文总结了日志.JSON解析.单测.XML解析.字节码处理.数据库连接池.集合类.邮件.加密.嵌入式SQL数据库.JDBC故障诊断以及序

常用jdk【java开发工具包】下载

学习和开发办公常用的java开发工具包下载,jdk全系列 收集了常用的jdk1.4(32位).jdk5.0(32位).jdk6.0(32/64位).jdk7.(32/64位)0.jdk8.0(32/64位)各种版本的安装包.提供给需要的童鞋们下载了 下载地址:链接: http://pan.baidu.com/s/1pJlxPBP 提取码:s4la

java servlet开发购物车功能,实现增删改查结算等功能。

原文:java servlet开发购物车功能,实现增删改查结算等功能. 源代码下载地址:http://www.zuidaima.com/share/1550463494130688.htm 购物车功能:增删改查,结算等功能,主要技术为:servlet对数据库的访问... 源代码截图:

Java开发和运行环境的搭建

Java开发需要准备的东西? JDK+Eclipse 其中JDK的意思是Java开发工具包,Eclipse是进行用于做Java程序开发的工具(当然你也可以用记事本什么的去做). 其他开发工具:JCreator,JBuilder,... jdk的介绍和安装教程度娘里面到处都是,这里自己也在啰嗦一下吧. 关于jdk的详细介绍: JDK是Java Development Kit的缩写,即Java开发工具集.JDK是整个Java的核心,包括了Java运行环境(JRE).Java开发工具和Java基础类库

打造Linux三流娱乐环境,二流办公环境,一流Java开发环境

写这篇文章的目的首先是为让自己以后再装linux环境时候,不用再通宵google+百度,做个备忘录,其次,给新入Linux环境的同学分享一点个人经验,再高尚点的动机也算是想做为开源技术的传播布道者.我在一开始,准备使用ubuntu,其实如果不是特别介意debian系和redhat系的区别完全可以装ubuntu,在通用linux命令和Shell脚本而言,尤其是做Java开发而言,其实二者差别不大,而且 ubuntu中文社区支持的更好,更适合个人机器安装,但我这个人有点强迫症,所以坚持了redhat

Java开发—乘风破浪

最近需要上线很多新的JAVA项目,然而很多JAVA的相关库都不太熟悉,项目实现起来遇到了不小阻力,熬了好几天夜.现在手头的工作基本完成了,因此打算好好来归纳下java的相关工具库,将来需要借助你们,好好的在JAVA的汪洋下,乘风破浪.(希望成为电影中如小马一样程序员J) 乘风破浪会有时,直挂云帆济苍海—李白 首先通过一张思维导图,来熟悉常用的java基础工具包,掌握好工具是对一个优秀工程师的基本要求哦!J 图中标红星表示必须精通的内容,黄星为需要熟练掌握的内容,其他为补充内容,本文主要会介绍下基

在MacBook Pro上设置Java开发环境

好吧,我去了地球的另一边,并且因为我的PC不在旁边,只有一台MacBook Pro可以用于开发.这篇文章应该被看作是一个加强书签,我列出了使得MacBook能实现目的的所有必需安装的工具,即用于Java和稍后也会用于JavaScript的开发. 需要提一下的是,直到现在,我仍然是Windows用户(XP / 7)和Linux(Ubuntu /Mint/Cent OS).在写这篇文章的时候,我的MacBook Pro上运行的是OS X Yosemite Version 10.10.5. JDK 所