Paxos算法实现

package paxos;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import org.apache.commons.lang3.StringUtils;

import com.google.common.base.Charsets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

/**
 *
 * 〈一句话功能简述〉<br>
 * 〈功能详细描述〉Paxos算法实现
 *
 * @author shichang.liu
 * @date 2017年4月6日上午10:25:34
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
public final class PaxosDemo {

    private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
    private static final Random RANDOM = new Random();
    private static final String[] PROPOSALS = { "ProjectA", "ProjectB", "ProjectC" };

    public static void main(String[] args) {
        List<Acceptor> acceptors = new ArrayList<Acceptor>();
        Arrays.asList("A", "B", "C", "D", "E").forEach(name -> acceptors.add(new Acceptor(name)));
        Proposer.vote(new Proposal(1L, null), acceptors);
    }

    private static void printInfo(String subject, String operation, String result) {
        System.out.println(subject + ":" + operation + "<" + result + ">");
    }

    /**
     * 对于提案的约束,第三条约束要求: 如果maxVote不存在,那么没有限制,下一次表决可以使用任意提案; 否则,下一次表决要沿用maxVote的提案
     *
     * @param currentVoteNumber
     * @param proposals
     * @return
     */
    private static Proposal nextProposal(long currentVoteNumber, List<Proposal> proposals) {
        long voteNumber = currentVoteNumber + 1;
        if (proposals.isEmpty())
            return new Proposal(voteNumber, PROPOSALS[RANDOM.nextInt(PROPOSALS.length)]);
        Collections.sort(proposals);
        Proposal maxVote = proposals.get(proposals.size() - 1);
        long maxVoteNumber = maxVote.getVoteNumber();
        String content = maxVote.getContent();
        if (maxVoteNumber >= currentVoteNumber)
            throw new IllegalStateException("illegal state maxVoteNumber");
        if (content != null)
            return new Proposal(voteNumber, content);
        else
            return new Proposal(voteNumber, PROPOSALS[RANDOM.nextInt(PROPOSALS.length)]);
    }

    private static class Proposer {

        /**
         * @param proposal
         * @param acceptors
         */
        public static void vote(Proposal proposal, Collection<Acceptor> acceptors) {
            int quorum = Math.floorDiv(acceptors.size(), 2) + 1;
            int count = 0;
            while (true) {
                printInfo("VOTE_ROUND", "START", ++count + "");
                List<Proposal> proposals = new ArrayList<Proposal>();
                for (Acceptor acceptor : acceptors) {
                    Promise promise = acceptor.onPrepare(proposal);
                    if (promise != null && promise.isAck())
                        proposals.add(promise.getProposal());
                }
                if (proposals.size() < quorum) {
                    printInfo("PROPOSER[" + proposal + "]", "VOTE", "NOT PREPARED");
                    proposal = nextProposal(proposal.getVoteNumber(), proposals);
                    continue;
                }
                int acceptCount = 0;
                for (Acceptor acceptor : acceptors) {
                    if (acceptor.onAccept(proposal))
                        acceptCount++;
                }
                if (acceptCount < quorum) {
                    printInfo("PROPOSER[" + proposal + "]", "VOTE", "NOT ACCEPTED");
                    proposal = nextProposal(proposal.getVoteNumber(), proposals);
                    continue;
                }
                break;
            }
            printInfo("PROPOSER[" + proposal + "]", "VOTE", "SUCCESS");
        }

    }

    private static class Acceptor {

        // 上次表决结果
        private Proposal last = new Proposal();
        private String name;

        public Acceptor(String name) {
            this.name = name;
        }

        public Promise onPrepare(Proposal proposal) {
            // 假设这个过程有50%的几率失败
            if (Math.random() - 0.5 > 0) {
                printInfo("ACCEPTER_" + name, "PREPARE", "NO RESPONSE");
                return null;
            }
            if (proposal == null)
                throw new IllegalArgumentException("null proposal");
            if (proposal.getVoteNumber() > last.getVoteNumber()) {
                Promise response = new Promise(true, last);
                last = proposal;
                printInfo("ACCEPTER_" + name, "PREPARE", "OK");
                return response;
            } else {
                printInfo("ACCEPTER_" + name, "PREPARE", "REJECTED");
                return new Promise(false, null);
            }
        }

        public boolean onAccept(Proposal proposal) {
            // 假设这个过程有50%的几率失败
            if (Math.random() - 0.5 > 0) {
                printInfo("ACCEPTER_" + name, "ACCEPT", "NO RESPONSE");
                return false;
            }
            printInfo("ACCEPTER_" + name, "ACCEPT", "OK");
            return last.equals(proposal);
        }
    }

    private static class Promise {

        private final boolean ack;
        private final Proposal proposal;

        public Promise(boolean ack, Proposal proposal) {
            this.ack = ack;
            this.proposal = proposal;
        }

        public boolean isAck() {
            return ack;
        }

        public Proposal getProposal() {
            return proposal;
        }
    }

    private static class Proposal implements Comparable<Proposal> {

        private final long voteNumber;
        private final String content;

        public Proposal(long voteNumber, String content) {
            this.voteNumber = voteNumber;
            this.content = content;
        }

        public Proposal() {
            this(0, null);
        }

        public long getVoteNumber() {
            return voteNumber;
        }

        public String getContent() {
            return content;
        }

        @Override
        public int compareTo(Proposal o) {
            return Long.compare(voteNumber, o.voteNumber);
        }

        @Override
        public boolean equals(Object obj) {
            if (obj == null)
                return false;
            if (!(obj instanceof Proposal))
                return false;
            Proposal proposal = (Proposal) obj;
            return voteNumber == proposal.voteNumber && StringUtils.equals(content, proposal.content);
        }

        @Override
        public int hashCode() {
            return HASH_FUNCTION.newHasher().putLong(voteNumber).putString(content, Charsets.UTF_8).hash().asInt();
        }

        @Override
        public String toString() {
            return new StringBuilder().append(voteNumber).append(‘:‘).append(content).toString();
        }
    }

}
时间: 2024-10-25 18:43:24

Paxos算法实现的相关文章

【转】Paxos算法细节详解

最近研究zookeeper因此就学习了一下paxos算法,一直没太看懂,今天找到了这篇文章描述的很清晰,转自:http://www.cnblogs.com/endsock/p/3480093.html Paxos分析 最近研究paxos算法,看了许多相关的文章,概念还是很模糊,觉得还是没有掌握paxos算法的精髓,所以花了3天时间分析了libpaxos3的所有代码,此代码可以从https://bitbucket.org/sciascid/libpaxos 下载.对paxos算法有初步了解之后,再

Paxos算法

转载请注明出处http://www.cnblogs.com/dvd0423/p/4185697.html 浮躁的人请绕道,这篇文章不适合你. 把Paxos作为分布式系统系列的第一篇,是因为我迫不及待的想告诉别人这个算法是多么牛逼.相比于Map/Reduce架构.一致性Hash.两阶段提交等等,我觉得Paxos是一个让人第一次看到会感觉废话连篇,再仔细品味会拍手称赞的算法.Paxos算法最初由lamport提出,他第一次提出时模拟的是一个叫Paxos的希腊城邦立法的问题.由于这个算法太抽象了,所以

一步一步理解Paxos算法

一步一步理解Paxos算法 背景 Paxos 算法是Lamport于1990年提出的一种基于消息传递的一致性算法.由于算法难以理解起初并没有引起人们的重视,使Lamport在八年后重新发表到 TOCS上.即便如此paxos算法还是没有得到重视,2001年Lamport用可读性比较强的叙述性语言给出算法描述.可见Lamport对 paxos算法情有独钟.近几年paxos算法的普遍使用也证明它在分布式一致性算法中的重要地位.06年google的三篇论文初现“云”的端倪,其中的chubby锁服务使用p

【转】Paxos算法深入分析

http://blog.csdn.net/anderscloud/article/details/7175209 在分布式系统设计领域,Paxos可谓是最重要一致性的算法.Google的大牛们称   All working protocols for asynchronous consensus we have so far encountered have Paxos at their core. 可见此算法的地位.网络上讨论此算法的文章多如牛毛,但大多数让人看了之后仍然是一头雾水,就连维基百

【转】Paxos算法1-算法形成理论

——转自:{老码农的专栏} Paxos算法的难理解与算法的知名度一样令人敬仰,从我个人的经历而言,难理解的原因并不是该算法高深到大家智商不够,而在于Lamport在表达该算法时过于晦涩且缺乏一个完整的应用场景.如果大师能换种思路表达该算法,大家可能会更容易接受: 首先提出算法适用的场景,给出一个多数读者能理解的案例 其次描述Paxos算法如何解决这个问题 再次给出算法的起源(就是那些希腊城邦的比喻和算法过程) Lamport首先提出算法的起源,在没有任何辅助场景下,已经让很多人陷于泥潭,在满脑子

分布式理论之一:Paxos算法的通俗理解

维基的简介:Paxos算法是莱斯利·兰伯特(Leslie Lamport,就是 LaTeX 中的"La",此人现在在微软研究院)于1990年提出的一种基于消息传递且具有高度容错特性的一致性算法. Paxos算法目前在Google的Chubby.MegaStore.Spanner等系统中得到了应用,Hadoop中的ZooKeeper也使用了Paxos算法,在上面的各个系统中,使用的算法与Lamport提出的原始Paxos并不完全一样,这个以后再慢慢分析.本博文的目的是,如何让一个小白在半

底层算法系列:Paxos算法

关于算法,面太广.本系列只研究实际应用中遇到的核心算法.了解这些算法和应用,对java码农进阶是很有必要的. 对于Paxos学习论证过程中,证实一句话:有史以来学习paxos最好的地方wiki:Paxos (computer science) 目录 1.背景 2.Paxos算法 3.Muti-Paxos算法 4.Muti-Paxos在google chubby中的应用 ===============正文分割线============================ 一.背景 Paxos 协议是一

Paxos算法介绍

Paxos算法是为了实现分布式环境中为保证数据的一致性而设计的,在一个分布式系统中,通过投票的方式来确定一个值(决议).只不过这个步骤分为两步: 准备阶段(prepare)和批准阶段(accept). 同时又分为若干角色,以下都可以兼任. proposer 信使 acceptor 决策者 learner 学习者(学习最终决策) 准备阶段: 一个proposer向多个acceptor发出提案,每个提案有一个数字编号(递增的), 如果多数acceptor接受了该提案,则进入批准阶段. 当然也可以拒绝

Paxos算法之旅(四)zookeeper代码解析

ZooKeeper是近期比较热门的一个类Paxos实现.也是一个逐渐得到广泛应用的开源的分布式锁服务实现.被认为是Chubby的开源版,虽然具体实现有很多差异.ZooKeeper概要的介绍可以看官方文档:http://hadoop.apache.org/zookeeper 这里我们重点来看下它的内部实现. ZooKeeper集群中的每个server都要知道其他成员,通过在配置文件zoo.cfg中作如下配置实现: tickTime=2000 dataDir=/var/zookeeper/ clie

【转】Paxos算法3-实现探讨

——转自:{老码农的专栏} 前两篇Paxos算法的讨论,让我们对paxos算法的理论形成过程有了大概的了解,但距离其成为一个可执行的算法程序还有很长的路要走,原因是很多的细节和错误未被考虑.Google Chubby的作者说,paxos算法实现起来远没有看起来简单,原因是paxos的容错仅限于server crash这一种情况,但在实际工程实现时要考虑磁盘损坏.文件损坏.Leader身份丢失等诸多的错误. 1. Paxos各角色的职能 在paxos算法中存在Client.Proposer.Pro