OpenStack_Swift源码分析——ObjectReplicator源码分析(2)

1、Replicator执行代码详细分析

上篇问中介绍了启动Replicator的具体过程,下面讲解Replicator的执行代码的具体实现,首先看replicate方法:

def replicate(self, override_devices=None, override_partitions=None):
        """Run a replication pass"""
        self.start = time.time()
        self.suffix_count = 0
        self.suffix_sync = 0
        self.suffix_hash = 0
        self.replication_count = 0
        self.last_replication_count = -1
        self.partition_times = []

        if override_devices is None:
            override_devices = []
        if override_partitions is None:
            override_partitions = []
        #heartbeat 为心跳函数 根据配置,配置没有 默认为 300
        stats = eventlet.spawn(self.heartbeat)
        #detect_lockup  检查死锁
        lockup_detector = eventlet.spawn(self.detect_lockups)
        eventlet.sleep()  # Give spawns a cycle

        try:
            #replication 的 woker 数量
            self.run_pool = GreenPool(size=self.concurrency)
            # Returns a sorted list of jobs (dictionaries) that specify the
            # partitions, nodes, etc to be synced.
            # 返回专门为分区,节点同步工作的排序的列表
            #
            jobs = self.collect_jobs()
            for job in jobs:
                #重写设备
                if override_devices and job[‘device‘] not in override_devices:
                    continue
                #重写分区
                if override_partitions and                         job[‘partition‘] not in override_partitions:
                    continue
                #如果重写设备及其重写分区在job 中
                dev_path = join(self.devices_dir, job[‘device‘])
                if self.mount_check and not ismount(dev_path):
                    self.logger.warn(_(‘%s is not mounted‘), job[‘device‘])
                    continue
                #ring没有改变
                if not self.check_ring():
                    self.logger.info(_("Ring change detected. Aborting "
                                       "current replication pass."))
                    return
                #如果
                if job[‘delete‘]:
                    self.run_pool.spawn(self.update_deleted, job)
                else:
                    #执行的是更新
                    self.run_pool.spawn(self.update, job)
            with Timeout(self.lockup_timeout):
                self.run_pool.waitall()
        except (Exception, Timeout):
            self.logger.exception(_("Exception in top-level replication loop"))
            self.kill_coros()
        finally:
            stats.kill()
            lockup_detector.kill()
            self.stats_line()

在replicate方法中,首先是为replicate方法执行的准备工作,其中最重要的是要收集要执行的job的collection_jobs方法,下面为其代码的具体实现:

def collect_jobs(self):
        """
        Returns a sorted list of jobs (dictionaries) that specify the
        partitions, nodes, etc to be synced.
        """
        jobs = []
        ips = whataremyips()
        #replication_ip 和replication_port 在  RingBuilder中 load添加
        #self.object_ring = Ring(self.swift_dir, ring_name=‘object‘)
        for local_dev in [dev for dev in self.object_ring.devs
                          if dev and dev[‘replication_ip‘] in ips and
                          dev[‘replication_port‘] == self.port]:
            dev_path = join(self.devices_dir, local_dev[‘device‘])
            obj_path = join(dev_path, ‘objects‘)
            tmp_path = join(dev_path, ‘tmp‘)
            if self.mount_check and not ismount(dev_path):
                self.logger.warn(_(‘%s is not mounted‘), local_dev[‘device‘])
                continue
        #Remove any file in a given path that that was last modified before mtime.
        #/srv/1/node/sdb1/tmp下的文件
            unlink_older_than(tmp_path, time.time() - self.reclaim_age)
            if not os.path.exists(obj_path):
                try:
                    mkdirs(obj_path)
                except Exception:
                    self.logger.exception(‘ERROR creating %s‘ % obj_path)
                continue
            #[email protected]:/srv/1/node/sdb1/objects# ls
            #13069  133971  4799  58208  94238
            for partition in os.listdir(obj_path):
                try:
                    job_path = join(obj_path, partition)
                    #判断当前路径是否为文件,如果是文件则删除
                    if isfile(job_path):
                        #
                        # Clean up any (probably zero-byte) files where a
                        # partition should be.
                        self.logger.warning(‘Removing partition directory ‘
                                            ‘which was a file: %s‘, job_path)
                        os.remove(job_path)
                        continue
                    #获得每个partion对应的设备
                    part_nodes =                         self.object_ring.get_part_nodes(int(partition))
                    #nodes为不是本机器nodes的其他replica-1个nodes
                    nodes = [node for node in part_nodes
                             if node[‘id‘] != local_dev[‘id‘]]
                    #对objects下所有partion遍历,故有jobs的长度最大为_replica2part2dev分区备份中出现此设备有此设备id的分区和
                    jobs.append(
                        dict(path=job_path,
                             device=local_dev[‘device‘],
                             nodes=nodes,
                             #len(nodes)>len(part_nodes)-1的情况是当前节点已经不再是 当前partition所对应的设备了,有可能删除了该设备
                             delete=len(nodes) > len(part_nodes) - 1,
                             partition=partition))
                except (ValueError, OSError):
                    continue
        #打乱顺序
        random.shuffle(jobs)
        if self.handoffs_first:
            # Move the handoff parts to the front of the list
            #将handoff 节点移到jobs队列的前边
            jobs.sort(key=lambda job: not job[‘delete‘])
        self.job_count = len(jobs)
        return jobs

对于第二层for循环,os.listdir(obj_path)列出objects目录下的所有partion,创建object是在objects文件夹下创建objects所映射的分区号的文件件,再在partion文件夹下创建以object的hash值后三位为名称的文件夹,然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹,object会存储为以object上传时间戳为名.data为文件后缀的文件。通过理解一致性hash算法可知,加入虚拟节点后每一个设备会多个虚拟节点和其对应,如果一个设备对应的分区为n则,obj_path下子文件夹数目会<=n,因为存入的所有文件并不一定都能映射到当前设备所对应的分区。for循环首先判读obj_path下是否为文件,若是文件则删除,若不是则获得该分区号,根据分区号获得该分区号所映射的三个备份设备,并将设备id和本地设备id不想等的加入到nodes中,将nodes、path等信息加入到jobs中,最后打乱jobs的顺序,再将handoff
节点移到队列前边。返回jobs。再到replicate方法,首先我们看job[delete]为False的情况。当job[delete]为False会执行update方法,下边看update方法的具体实现:

;

def update(self, job):
        """
        High-level method that replicates a single partition.

        :param job: a dict containing info about the partition to be replicated
        """
        self.replication_count += 1
        self.logger.increment(‘partition.update.count.%s‘ % (job[‘device‘],))
        begin = time.time()
        try:
            #get_hashes 从hashes.pkl获取hashes值并更新 获取本地的hashes job[path] 为 job_path = join(obj_path, partition) local_hash为hashes.pkl中的反序列化回来的内容 hashed为改变的
            hashed, local_hash = tpool_reraise(
                get_hashes, job[‘path‘],
                do_listdir=(self.replication_count % 10) == 0,
                reclaim_age=self.reclaim_age)
            self.suffix_hash += hashed
            self.logger.update_stats(‘suffix.hashes‘, hashed)
            #
            attempts_left = len(job[‘nodes‘])
            #此时的nodes为除去本节点外的所有节点 因为 job[‘nodes]不包含本地节点get_more_nodes(int(job[‘partition‘]))能获得除去本partion所对应节点 外的其他所有节点
            nodes = itertools.chain(
                job[‘nodes‘],
                self.object_ring.get_more_nodes(int(job[‘partition‘])))
           #此时attempts_left 为2 若果replica为3
            while attempts_left > 0:
                # If this throws StopIterator it will be caught way below
                node = next(nodes)
                attempts_left -= 1
                try:
                    with Timeout(self.http_timeout):
                        #REPLICARE方法 对应 sever里面的RELICATE方法
                        resp = http_connect(
                            node[‘replication_ip‘], node[‘replication_port‘],
                            node[‘device‘], job[‘partition‘], ‘REPLICATE‘,
                            ‘‘, headers=self.headers).getresponse()
                        if resp.status == HTTP_INSUFFICIENT_STORAGE:
                            self.logger.error(_(‘%(ip)s/%(device)s responded‘
                                                ‘ as unmounted‘), node)
                            attempts_left += 1
                            continue
                        if resp.status != HTTP_OK:
                            self.logger.error(_("Invalid response %(resp)s "
                                                "from %(ip)s"),
                                              {‘resp‘: resp.status,
                                               ‘ip‘: node[‘replication_ip‘]})
                            continue
                        #remote_hash 为 请求 ‘REPLICATE 返回的
                        remote_hash = pickle.loads(resp.read())
                        del resp
                    #找出本地后缀和远程后缀不同的
                    suffixes = [suffix for suffix in local_hash if
                                local_hash[suffix] !=
                                remote_hash.get(suffix, -1)]
                    #如果没有说明没有变动,则继续请求下一个节点
                    if not suffixes:
                        continue

                    #效果就是执行get_hashes方法
                    hashed, recalc_hash = tpool_reraise(
                        get_hashes,
                        job[‘path‘], recalculate=suffixes,
                        reclaim_age=self.reclaim_age)
                    self.logger.update_stats(‘suffix.hashes‘, hashed)
                    local_hash = recalc_hash
                    #假如 local_hash 为 123 321 122 remote_hash 123 321 124 则 122为变化的
                    #文件路径hash值后三位会不会重复
                    suffixes = [suffix for suffix in local_hash if
                                local_hash[suffix] !=
                                remote_hash.get(suffix, -1)]
                    #找到了不同的并知道其节点则将其同步到对应的节点,是基于推送模式的,故传的数据是自己本地的数据
                    self.sync(node, job, suffixes)  #同步变化的
                    with Timeout(self.http_timeout):
                        conn = http_connect(
                            node[‘replication_ip‘], node[‘replication_port‘],
                            node[‘device‘], job[‘partition‘], ‘REPLICATE‘,
                            ‘/‘ + ‘-‘.join(suffixes),
                            headers=self.headers)
                        conn.getresponse().read()
                    self.suffix_sync += len(suffixes)
                    self.logger.update_stats(‘suffix.syncs‘, len(suffixes))
                except (Exception, Timeout):
                    self.logger.exception(_("Error syncing with node: %s") %
                                          node)
            #后缀数量 写日志时会用到
            self.suffix_count += len(local_hash)
        except (Exception, Timeout):
            self.logger.exception(_("Error syncing partition"))
        finally:
            self.partition_times.append(time.time() - begin)
            self.logger.timing_since(‘partition.update.timing‘, begin)

update方法,中首先是获得本地文件中当前设备所对应hashes.pkl文件中每个后缀所对应的hahes值,形如{‘a83‘: ‘0db7b416c9808517a1bb2157af20b09b‘},其中key为文件内容hash值的后三字节,value为后缀文件夹下所有子文件夹下(即以文件内容的md5值为名字的文件夹)所有.data文件的文件名字的md5值,可以理解为所有文件名的md5值和。

            hashed, local_hash = tpool_reraise(
                get_hashes, job[‘path‘],
                do_listdir=(self.replication_count % 10) == 0,
                reclaim_age=self.reclaim_age)

如上代码片段会执行get_hashes方法,并将后边参数传递给get_hashes

def get_hashes(partition_dir, recalculate=None, do_listdir=False,
               reclaim_age=ONE_WEEK):
    """
    Get a list of hashes for the suffix dir.  do_listdir causes it to mistrust
    the hash cache for suffix existence at the (unexpectedly high) cost of a
    listdir.  reclaim_age is just passed on to hash_suffix.

    :param partition_dir: absolute path of partition to get hashes for
    :param recalculate: 形如 recalculate=[‘a83‘]
      list of suffixes(后缀,即 hash值的后缀  310即为后缀  [email protected]:/srv/1/node/sdb1/objects/94238# ls
   310  hashes.pkl   ) which should be recalculated(重新计算) when got
    :param do_listdir: force existence check for all hashes in the partition(对partion中的hashe强行执行检查)
    :param reclaim_age: age at which to remove tombstones

    :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
    """

因没有传递recalulate这个参数故只有do_listdir为True时会强制执行重新计算后缀文件下所有文件名字的hash值。文件名字是时间戳,时间戳变了说明文件有更新,故需要和远程同步,检查是否为同一个版本,不是同一个版本的需要把本地版本传递给远程服务器。

           attempts_left = len(job[‘nodes‘])
            #此时的nodes为除去本节点外的所有节点 因为 job[‘nodes]不包含本地节点get_more_nodes(int(job[‘partition‘]))能获得除去本partion所对应节点 外的其他所有节点
            nodes = itertools.chain(
                job[‘nodes‘],
                self.object_ring.get_more_nodes(int(job[‘partition‘])))

如上代码片段,attempts_left为当前job对应的分区去掉本地节点的其他的备份节点的个数。得到attempts_left后,下边接着更新了nodes,其中get_more_nodes方法会得到出去本分区所对应节点之外的其他所有节点的迭代器,所有nodes是除去本节点外所有节点的一个迭代器。

下边就是while循环,循环attempts_left次,

                        resp = http_connect(
                            node[‘replication_ip‘], node[‘replication_port‘],
                            node[‘device‘], job[‘partition‘], ‘REPLICATE‘,
                            ‘‘, headers=self.headers).getresponse()

根据迭代得到的node请求,因副本节点首先被迭代到,故首先请求副本节点。若果成功请求读取resp返回的内容,得到远程设备同一个partion下的remote_hash

 suffixes = [suffix for suffix in local_hash if
                                local_hash[suffix] !=
                                remote_hash.get(suffix, -1)]

对比两个设备相同partion下的hashes.pkl文件相同key而value不同的key。suffixes则说明和远程备份文件都是同一个版本,继续请求下一个备份。如果不为空,则需要处理,同时再一次得到自己hashes.pkl文件夹中的内容,因为上一次请求时间中可能有其他的备份已经有新的更新推送到本服务器了。得到本地最新的hashes.pkl内容后再一次对比,得到不同的相同分区下的不同后缀

执行同步:

    self.sync(node, job, suffixes)  #同步变化的

在同步变化时作者现在使用rsync方法,没有使用ssync,不过已经留出了ssync的实现,当ssync方法文档时就会把rsync替换掉。(敬请期待)

 def sync(self, node, job, suffixes):  # Just exists for doc anchor point
        """
        Synchronize local suffix directories from a partition with a remote
        node.

        :param node: the "dev" entry for the remote node to sync with
        :param job: information about the partition being synced
        :param suffixes: a list of suffixes which need to be pushed

        :returns: boolean indicating success or failure
        """
        # self.sync_method = getattr(self, conf.get(‘sync_method‘) or ‘rsync‘)
        #配置没有 sync_method方法 则执行类自己的rsync方法
        return self.sync_method(node, job, suffixes)

sync_method方法从如下获得,没有配置则执行rsync方法

self.sync_method = getattr(self, conf.get(‘sync_method‘) or ‘rsync‘)

def rsync(self, node, job, suffixes):
        """
        Uses rsync to implement the sync method. This was the first
        sync method in Swift.
        """
        if not os.path.exists(job[‘path‘]):
            return False
        args = [
            ‘rsync‘,
            ‘--recursive‘,
            ‘--whole-file‘,
            ‘--human-readable‘,
            ‘--xattrs‘,
            ‘--itemize-changes‘,
            ‘--ignore-existing‘,
            ‘--timeout=%s‘ % self.rsync_io_timeout,
            ‘--contimeout=%s‘ % self.rsync_io_timeout,
            ‘--bwlimit=%s‘ % self.rsync_bwlimit,
        ]
        node_ip = rsync_ip(node[‘replication_ip‘])
        #包含了ip信息
        if self.vm_test_mode:
            rsync_module = ‘%s::object%s‘ % (node_ip, node[‘replication_port‘])
        else:
            rsync_module = ‘%s::object‘ % node_ip
        had_any = False
        for suffix in suffixes:
            spath = join(job[‘path‘], suffix)
            if os.path.exists(spath):
                args.append(spath)
                had_any = True
        if not had_any:
            return False
        args.append(join(rsync_module, node[‘device‘],
                    ‘objects‘, job[‘partition‘]))
        #args里面包含了通的所有信息 包括设备名称,设备分区
        return self._rsync(args) == 0

rsync方法将接受的参数都放到args中,然后执行_rsync方法。

    def _rsync(self, args):
        """
        Execute the rsync binary to replicate a partition.

        :returns: return code of rsync process. 0 is successful
        """
        start_time = time.time()
        ret_val = None
        try:
            with Timeout(self.rsync_timeout):
                #此处即为同步操作了,推送模式
                proc = subprocess.Popen(args,
                                        stdout=subprocess.PIPE,
                                        stderr=subprocess.STDOUT)
                results = proc.stdout.read()
                ret_val = proc.wait()
        except Timeout:
            self.logger.error(_("Killing long-running rsync: %s"), str(args))
            proc.kill()
            return 1  # failure response code
        total_time = time.time() - start_time
        for result in results.split(‘\n‘):
            if result == ‘‘:
                continue
            if result.startswith(‘cd+‘):
                continue
            if not ret_val:
                self.logger.info(result)
            else:
                self.logger.error(result)
        if ret_val:
            error_line = _(‘Bad rsync return code: %(ret)d <- %(args)s‘) %                 {‘args‘: str(args), ‘ret‘: ret_val}
            if self.rsync_error_log_line_length:
                error_line = error_line[:self.rsync_error_log_line_length]
            self.logger.error(error_line)
        elif results:
            self.logger.info(
                _("Successful rsync of %(src)s at %(dst)s (%(time).03f)"),
                {‘src‘: args[-2], ‘dst‘: args[-1], ‘time‘: total_time})
        else:
            self.logger.debug(
                _("Successful rsync of %(src)s at %(dst)s (%(time).03f)"),
                {‘src‘: args[-2], ‘dst‘: args[-1], ‘time‘: total_time})
        return ret_val

其中如下代码片段就是执行具体的推送:

  #此处即为同步操作了,推送模式
                proc = subprocess.Popen(args,
                                        stdout=subprocess.PIPE,
                                        stderr=subprocess.STDOUT)

至此 更新操作就讲解完毕,下一篇将讲解job[delete]为True的具体代码实现。

文中若有理解不合理之处,请指正,谢谢!

OpenStack_Swift源码分析——ObjectReplicator源码分析(2)

时间: 2024-11-05 18:31:13

OpenStack_Swift源码分析——ObjectReplicator源码分析(2)的相关文章

OpenStack_Swift源码分析——ObjectReplicator源码分析(1)

1.ObjectorReplicator的启动 首先运行启动脚本 swift-init object-replicator start 此运行脚本的运行过程和ring运行脚本运行过程差不多,找到swift 源码bin下的swift-object-replicator其代码如下所示 if __name__ == '__main__': parser = OptionParser("%prog CONFIG [options]") parser.add_option('-d', '--de

OpenStack_Swift源码分析——Object-auditor源码分析(1)

1 Object-auditor 的启动 Object-auditor的启动和object-replicator的启动过程是一样的,首先是执行启动脚本 swift-init object-auditor start 启动脚本会运行swift源码bin目录下的swift-ojbect-auditor if __name__ == '__main__': parser = OptionParser("%prog CONFIG [options]") parser.add_option('-

OpenStack_Swift源码分析——Object-auditor源码分析(2)

1 Object-aduitor审计具体分析 上一篇文章中,讲解了Object-aduitor的启动,其中审计的具体执行是AuditorWorker实现的,在run_audit中实例化了AuditorWorker类,并调用audit_all_objects方法,下面看此方法的具体代码实现: def audit_all_objects(self, mode='once', device_dirs=None): #run_forever传过来的mode 为forever description =

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二)

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二) 1.通过mutate(put)操作,将单个put操作添加到缓冲操作中,这些缓冲操作其实就是Put的父类的一个List的集合.如下: private List<Row> writeAsyncBuffer = new LinkedList<>(); writeAsyncBuffer.add(m); 当writeAsyncBuffer满了之后或者是人为的调用backgroundFlushCommits操作促使缓冲池中的

nginx源码分析--从源码看nginx框架总结

nginx源码总结: 1)代码中没有特别绕特别别扭的编码实现,从变量的定义调用函数的实现封装,都非常恰当,比如从函数命名或者变量命名就可以看出来定义的大体意义,函数的基本功能,再好的架构实现在编码习惯差的人实现也会黯然失色,如果透彻理解代码的实现,领悟架构的设计初衷,觉得每块代码就想经过耐心雕琢一样,不仅仅实现了基本的功能给你,为其他人阅读也会提供很好的支持.细致恰当的命名规则就可以看出作者的功力. 2)更好更高的软件性能体现在架构设计上,好的架构会让软件更加稳定.容易维护.便于扩展.从核心模块

kafka源码分析之一server启动分析

1. 分析kafka源码的目的 深入掌握kafka的内部原理 深入掌握scala运用 2. server的启动 如下所示(本来准备用时序图的,但感觉时序图没有思维图更能反映,故采用了思维图): 2.1 启动入口Kafka.scala 从上面的思维导图,可以看到Kafka的启动入口是Kafka.scala的main()函数: def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args)

老李推荐:第6章5节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览-事件

老李推荐:第6章5节<MonkeyRunner源码剖析>Monkey原理分析-事件源-事件源概览-事件 从网络过来的命令字串需要解析翻译出来,有些命令会在翻译好后直接执行然后返回,但有一大部分命令在翻译后需要转换成对应的事件,然后放入到命令队列里面等待执行.Monkey在取出一个事件执行的时候主要是执行其injectEvent方法来注入事件,而注入事件根据是否需要往系统注入事件分为两种: 需要通过系统服务往系统注入事件:如MonkeyKeyEvent事件会通过系统的InputManager往系

老李推荐:第6章3节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览-命令翻译类

老李推荐:第6章3节<MonkeyRunner源码剖析>Monkey原理分析-事件源-事件源概览-命令翻译类 每个来自网络的字串命令都需要进行解析执行,只是有些是在解析的过程中直接执行了事,而有些是需要在解析后创建相应的事件类实例并添加到命令队列里面排队执行.负责这部分工作的就是命令翻译类.那么我们往下还是继续在MonkeySourceNetwork这个范畴中MonkeyCommand类是怎么一回事: 图6-3-1 MonkeyCommand族谱 图中间的MonkeyCommand是一个接口,

[ASP.NET]分析MVC5源码,并实现一个ASP.MVC

本节内容不是MVC入门教程,主要讲MVC原理,实现一个和ASP.NET MVC类似基本原理的项目. MVC原理是依赖于ASP.NET管道事件基础之上的.对于这块,可阅读上节内容 [ASP.NET]谈谈IIS与ASP.NET管道 本节目录: MVC简介 MVC5源码 实现一个MVC MVC简介 随着技术的发展,现在已经将MVC模式等同于三层模式. 如果要严格区分的话,UI层指View和Controller,BLL,DAL层和模型层都属于Model中. 在建立MVC项目的时候,选择空的项目,会建立一