Java 1.7 ThreadPoolExecutor源代码解析

相比1.6,1.7有些变化:

1、        添加了一个TIDYING状态。这个状态是介于STOP和TERMINATED之间的。假设运行完terminated钩子函数后状态就变成TERMINATED了;

2、        内部类Worker继承了AQS类作为一个独享锁,在执行每一个任务前会获取自己的锁。

3、        runState和poolSize两个字段被合并成一个原子字段ctl了,不再使用mainLock保护了。

一、成员变量介绍

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * ctl字段事实上表示两个含义:runState和workerCount(近似1.6中的poolSize)
     * int类型。高3位表示runState,低29位表示workerCount。

眼下这个版本号也就限
     * 制了线程个数不会超过2^29-1。
     * RUNNING: 能接受新的任务且能处理队列里的请求
     * SHUTDOWN: 不能接受新的任务可是能处理队列里的请求
     * STOP: 不能接受新的任务、不能处理队列里的请求。workers会被interrupt
     * TIDYING: 全部的线程都已经terminated了,正准备调用terminated()方法
     * TERMININATED: terminated()方法已经调用结束了
     *
     * RUNNING->SHUTDOWN: 调用shutdown方法
     * (RUNNING/SHUTDOWN)>STOP: 调用shutdownNow方法
     * SHUTDOWN->TIDYING: 当workers和queue都空的时候
     * STOP->TIDYING: 当workers为空的时候
     * TIDYING->TERMINATED: 当terminated方法调用结束的时候。
     * awaitTermination()直到状态为TERMINATED时才会返回。
     * 

     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 取ctl的高三位。获取runState(执行状态)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 取ctl的低29位,获取workerCount(worker的数量)
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 把runState和workerCount合并成ctl,上面两个函数的反操作
    private static int ctlOf(int rs, int wc) { return rs | wc; }

二、execute函数

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 三步走:
         * 1. 假设RUNNING的线程数目小于corePoolSize,直接调用addWorker方法
         * 启动一个新线程。addWorker函数会检查runState和workerCount。假设不
         * 须要新建一个thread就会返回false了
         *
         * 2. 假设任务被成功的放入了workQueue,我们仍然须要做个double-check
         * 由于调用完isRunning(c)后池中的线程可能都退出了或者线程池被shut
         * down了。又一次检查状态看是要remove掉新来的任务还是创建一个新线程来执
         * 行(假设没有活动的线程了)
         *
         * 3. 假设放入workQueue失败了,我们尝试创建一个新worker。

假设失败了,
         * 说明线程池被关闭了或者饱和了(超过最大值了)。就直接拒了。
         *
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
// addWorker有可能会失败,失败后又一次获取状态并继续往下走
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
// 假设isRunning(c)&&workQueue.offer中间并发发生了shutdown,须要remove
// 掉刚放入workQueue的command任务。注意:此时假设有一个worker刚运行完一个task
// 然后从workQueue获取下一个task时,这里的remove就会失败了。
            if (! isRunning(recheck) && remove(command))
                reject(command);
// 假设是RUNNING状态可是没有可工作的线程。须要直接new一个
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

execute函数大体思路和1.6一致,就三种情况:

①          当前线程池中线程数目小于corePoolSize,直接new一个thread。

②          当先线程池数据大于corePoolSize,则放入workQueue中;

③          假设workQueue满了且线程池中线程数小于maximumPoolSize,则new一个thread。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

// 假设被shutdown了。一般就直接返回false。可是须要排除一个特例情况:当线程池状
// 态是shutdown。但workQueue不空且workers空了,会调用addWorker(null,false)
// 方法创建一个线程处理workQueue里的任务,这时不能直接返回false。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
// 假设当前workers数目大于CAPACITY或者大于用户设置了。直接返回false
                if (wc >= CAPACITY ||
                    wc >= (core ?

corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
// 假设不过workerCount变化了,那么继续内层的循环;假设连runState也变化了,
// 则要又一次继续外层的循环。

if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
// 再次检查runState的状态,假设是RUNNING或者SHUTDOWN可是firstTask不空,则
// 把new出来的worker放入workers中。
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
// 创建worker成功后直接启动线程了
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
// 创建失败要做清理操作
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker函数尝试新建一个thread来执行传递给它的task。当线程池被STOP或SHUTDOWN或threadFactory返回null时或者OOM时。会返回false并做对应的清理。整个过程分为两步:1、尝试设置workerCount,成功了就到步骤2;2、尝试创建一个worker并增加到workers里。

private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

addWorkerFailed函数做些清理操作:1、把创建的worker从workers中删除;2、把workerCount减1;3、检查能否够terminated线程池,防止这个worker的存在导致运行awaitTermination操作的client线程堵塞了。

  final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
// 假设是以下三种情况直接返回:
// 1.RUNNING状态; 2.runState>=TIDYING。说明有其它线程运行了tryTerminate操
// 作; 3.SHUTDOWN状态且workQueue不空
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
// 假设workerCount大于0。则中断一个空暇的worker,就返回了。为啥仅仅中断一个呢?
// 由于worker线程退出时也会调用tryTerminate方法(一个接一个的传播)
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
// 走到这里说明workers数量为0了,尝试把线程池状态改成TIDYING并调用terminated
// 函数->状态再设置成TERMINATED。

假设设置TIDYING失败,则继续循环。
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
// terminated函数抛异常也须要运行以下的操作。
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

tryTerminate函数尝试TERMINATED线程池(当a、SHUTDOWN且queue和pool都空;b、STOP且queue为空了)。假设workers不为0,则中断随意一个空暇的worker后直接返回。否则:首先,将线程池状态改成TIDYING;其次,调用用户的钩子函数terminated;最后,将状态设置成TERMINATED。

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
// 假设tryLock成功,就说明这个worker是空暇的。

if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
// 假设仅仅中断一个就break,仅仅有tryTerminate函数中使用到这样的情况。
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

interruptIdleWorkers函数依据onlyOne參数决定中断一个或全部空暇的workers(这些workers都堵塞在getTask方法中)。

三、shutdown函数

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
// 检查调用者是否有权限运行shutdown
            checkShutdownAccess();
// 将线程池的状态改成SHUTDOWN
            advanceRunState(SHUTDOWN);
// 中断全部空暇的workers
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
// 尝试终止线程池
        tryTerminate();
    }

shutdown函数就运行几步:把状态改成SHUTDOWN。中断全部空暇的workers,调用onShutdown钩子函数,最后调用tryTerminate尝试终止线程池。

private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

advanceRunState函数将线程池的状态改成指定状态值。假设如今状态值比target值大就直接返回。targeState的值是SHUTDOWN或者STOP,不能是TIDYING或者TERMINATED(这两种状态须要调用tryTerminate函数设置)。

四、shutdownNow函数

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
// 检查调用者是否有权限运行关闭
            checkShutdownAccess();
// 将线程池的状态改成STOP
            advanceRunState(STOP);
// 和shutdown不同,这里中断全部的worker线程
            interruptWorkers();
// 删除workQueue里的任务并返回任务列表
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
// 尝试终止线程池
        tryTerminate();
        return tasks;
    }

shutdownNow函数会中断全部的worker线程,删除workQueue里的任务,最后尝试终止线程池并返回workQueue里的任务。

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
// 中断全部的worker线程
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        List<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

五、Worker内部类

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
// 初始值为-1,防止worker还没启动就被interrupt了;在start開始时会将状态改成0
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
// 參数1没有意义,是独占锁
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

Worker类主要维护着中断的管理和其它操作(runWorker函数),继承了AQS类实现了一个不可重入的Lock。在获取到一个任务后,准备运行前首先要获取这个锁。

同一时候。在中断空暇的worker时也要先获取到这个锁。

public void run() {
            runWorker(this);
        }
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
// 有时我们不想从workQueue取第一个任务,直接运行刚提交的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
// 把state设置成0,同意中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
// 进入循环了
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 假设是STOP状态,须要保证线程是被中断了的;
                // 假设不是须要清空中断状态,可是须要又一次检查下状态防止在清除
                // 中断时发生了shutdownNow
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
// 运行前的钩子函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
// 运行后的钩子函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker函数循环从workQueue里获取task并运行,可是须要注意下面几个问题:1.假设不想从workQueue里获取第一个任务运行,那就给worker.firstTask赋值。2、假设getTask获取的值为null,或者你的task里抛异常了,那循环就退出了,然后worker线程也就退出了。3、在运行任务前先要获取worker的锁,这里防止中断正在运行的线程。4、假设你的钩子函数beforeExecute函数抛异常了,那么你的任务就不会被运行了,worker线程也会退出。5、假设task.run方法抛出Runtime或Error异常,会原样抛出。假设是Throwable,则会包装成一个Error抛出,抛出异常前会运行afterExecute钩子函数。最后线程会退出。6、假设afterExecute钩子函数抛出异常。那么worker线程也会退出。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 假设SHUTDOWN且workQueue为空,或者STOP了。worker线程直接退出
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
// 是否要回收这个worker线程?
            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 假设还没有超时过(循环第一次运行到这里)直接break
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
// 否则。假设线程数大于最大限制或者已经超时过了说明这个worker线程要准备退出了
// 先设置workerCount-1,成功的话直接退出。否则,看下runState是否和rs一样,如
// 果一样就在内部循环,不一样就要到外部循环
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
// 无限堵塞或超时堵塞
                Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
// 没有获取到task肯定是超时了
                timedOut = true;
            } catch (InterruptedException retry) {
// 假设被中断了,不能算作超时
                timedOut = false;
            }
        }
    }

getTask函数是从workQueue里获取一个task,有两种策略(无限堵塞或者超时,详细要看client的配置)。

假设这个函数返回了null,那么worker线程就会退出了。退出的原因不外乎下面几种:

1.   当前线程池中worker数量大于maximumPoolSize了。

2.   线程池被STOP了(workQueue.poll/take时会捕获到InterruptedException异常);

3.   线程池被SHUTDOWN了且workQueue为空(workQueue.poll/take时会捕获到InterruptedException异常);

4.   获取task超时了(timedOut)&&(timed)。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 用户的函数抛异常了,须要调整workerCount的值,由于worker线程准备退出了
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

// 做些统计操作(bookkeeping)
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
// 尝试终止线程池
        tryTerminate();

        int c = ctl.get();
// 假设是RUNNING或SHUTDOWN状态,要看下workQueue是否为空,
// 不能直接退出。

假设workQueue不空,至少要保留1或corePoolSize个
// 线程(看allowCoreThreadTimeOut配置)。少于这个数目,就须要通过
// addWorker(null,false)方法补充新的线程进来。

if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

processWorkerExit函数是在runWork循环退出后做的清理和bookkeeping(应该就是指completedTaskCount等变量的操作吧)操作。

completedAbruptly參数的含义是指用户的函数是否抛异常了(before/after/run等)。注意下函数最后会依据线程池的状态和配置决定是否新建一个worker线程。

时间: 2024-07-30 21:33:58

Java 1.7 ThreadPoolExecutor源代码解析的相关文章

Android Java 线程池 ThreadPoolExecutor源代码篇

线程池简单点就是任务队列+线程组成的. 接下来我们来简单的了解下ThreadPoolExecutor的源代码. 先看ThreadPoolExecutor的简单类图,对ThreadPoolExecutor总体来个简单的认识. 为了分析ThreadPoolExecutor我们得下扯点队列和队列里面的任务这东西. 常见三种BlockingQueue堵塞队列SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue当然还有其它的,简单类图(仅仅画了Sy

[源码]源代码解析 SynchronousQueue

简析SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue 三者都是blockingQueue. LinkedBlockingQueue,ArrayBlockingQueue 有界,默认是Integer.Max; SynchronousQueue没什么界不界的概念.之所以这么说.是因为它的操作必须成对. 注记方案: oppo(oppo手机)是一对,offer和pool不阻塞 ppt是一对.put和take都阻塞. 1. 里面没有任何数据.调

Android源代码解析之(三)--&amp;gt;异步任务AsyncTask

转载请标明出处:一片枫叶的专栏 上一篇文章中我们解说了android中的异步消息机制. 主要解说了Handler对象的使用方式.消息的发送流程等.android的异步消息机制是android中多任务处理的基础,Handler是整个android应用层体系异步消息传递的基础组件,通过对Handler源代码的解析的解析相信大家对android中的异步消息机制有了一个大概的了解.很多其它关于android中的异步消息机制的知识可參考我的:android源代码解析之(二)–>异步消息机制 android

Spark MLlib LDA 源代码解析

1.Spark MLlib LDA源代码解析 http://blog.csdn.net/sunbow0 Spark MLlib LDA 应该算是比較难理解的,当中涉及到大量的概率与统计的相关知识,并且还涉及到了Spark GraphX图计算方面的知识.要想明确当中的原理得要下一番功夫. LDA源代码解析前的基础知识: 1)LDA主题模型的理论知识 參照:LDA数学八卦 2)SparkGraphX 基础知识 http://blog.csdn.net/sunbow0/article/details/

【java项目实战】dom4j解析xml文件,连接Oracle数据库

简介 dom4j是由dom4j.org出品的一个开源XML解析包.这句话太官方,我们还是看一下官方给出的解释.如下图: dom4j是一个易于使用的.开源的,用于解析XML,XPath和XSLT等语言的库.它应用于Java平台,采用了Java集合框架并完全支持DOM,SAX和JAXP等编程标准. 特点 dom4j是一个非常非常优秀的Java XML API,具有性能优异.功能强大和极端易用的特点,同时它也是一个开放源代码的软件.如今你可以看到越来越多的Java软件都在使用dom4j来读写XML,例

android7.x Launcher3源代码解析(3)---workspace和allapps载入流程

Launcher系列目录: 一.android7.x Launcher3源代码解析(1)-启动流程 二.android7.x Launcher3源代码解析(2)-框架结构 三.android7.x Launcher3源代码解析(3)-workspace和allapps载入流程 前两篇博客分别对Lancher的启动和Launcher的框架结构进行了一些分析.这一篇.将着重開始分析界面的载入流程. 1.总体流程 先上一张总体的流程图吧.(图片看不清能够下载下来看或者右击新开个页面查看图片) wate

Spark技术内幕:Client,Master和Worker 通信源代码解析

Spark的Cluster Manager能够有几种部署模式: Standlone Mesos YARN EC2 Local 在向集群提交计算任务后,系统的运算模型就是Driver Program定义的SparkContext向APP Master提交,有APP Master进行计算资源的调度并终于完毕计算.具体阐述能够阅读<Spark:大数据的电花火石!>. 那么Standalone模式下,Client.Master和Worker是怎样进行通信,注冊并开启服务的呢? 1. node之间的RP

Android源代码解析之(六)--&amp;gt;Log日志

转载请标明出处:一片枫叶的专栏 首先说点题外话,对于想学android framework源代码的同学,事实上能够在github中fork一份,详细地址:platform_frameworks_base 这里面基本都是android framework层的源代码了.并且近期发现了一个比較不错的github插件:OctoTree,它 是一个浏览器插件,它能够让你在Github 看代码时,左边栏会出现一个树状结构.就像我们在IDE 一样.当我们看一个项目的结构,或者想看详细的某个文件,这样就会非常方

使用NoSQL实现高并发CRM系统实践(源代码+解析)

又想速度快,又要大数据,又要保证数据不出错,还要拥抱变化,改需求的时候不那么痛苦,特别是字段的调整,按照以前的做法,想想就头疼.使用NoSQL,简直就是随心所欲,再奇葩的数据结构,处理起来也很容易.下面看我如何用NoSQL数据库实现高并发,高可靠的CRM系统. 1.前言 随着facebook.微博等WEB2.0互联网网站的兴起,传统的关系数据库在应付web2.0网站,特别是超大规模和高并发的SNS类型的web2.0纯动态网站已经显得力不从心,暴露了很多难以克服的问题,而非关系型的数据库则由于其本