读写锁:ReadWriteLock

http://my.oschina.net/20076678/blog/173165

一、在JDK文档中关于读写锁的相关说明 
ReadWriteLock 维护了一对相关的  ,一个用于只读操作,另一个用于写入操作。只要没有 writer, 读取锁 可以由多个 reader 线程同时保持。 写入锁 是独占的。  
所有 ReadWriteLock 实现都必须保证 writeLock 操作的内存同步效果也要保持与相关 readLock 的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。  
与互斥锁相比,读-写锁允许对共享数据进行更高级别的并发访问。虽然一次只有一个线程( writer  线程)可以修改共享数据,但在许多情况下,任何数量的线程可以同时读取共享数据( reader  线程),读-写锁利用了这一点。从理论上讲,与互斥锁相比,使用读-写锁所允许的并发性增强将带来更大的性能提高。在实践中,只有在多处理器上并且只在访问模式适用于共享数据时,才能完全实现并发性增强。  
与 互斥锁相比,使用读-写锁能否提升性能则取决于读写操作期间读取数据相对于修改数据的频率,以及数据的争用——即在同一时间试图对该数据执行读取或写入操 作的线程数。例如,某个最初用数据填充并且之后不经常对其进行修改的 collection,因为经常对其进行搜索(比如搜索某种目录),所以这样的 collection 是使用读-写锁的理想候选者。但是,如果数据更新变得频繁,数据在大部分时间都被独占锁,这时,就算存在并发性增强,也是微不足道的。更进一步地说,如果 读取操作所用时间太短,则读-写锁实现(它本身就比互斥锁复杂)的开销将成为主要的执行成本,在许多读-写锁实现仍然通过一小段代码将所有线程序列化时更 是如此。最终,只有通过分析和测量,才能确定应用程序是否适合使用读-写锁。  
尽管读-写锁的基本操作是直截了当的,但实现仍然必须作出许多决策,这些决策可能会影响给定应用程序中读-写锁的效果。这些策略的例子包括:

  • 在 writer 释放写入锁时,reader 和 writer 都处于等待状态,在这时要确定是授予读取锁还是授予写入锁。Writer 优先比较普遍,因为预期写入所需的时间较短并且不那么频繁。Reader 优先不太普遍,因为如果 reader 正如预期的那样频繁和持久,那么它将导致对于写入操作来说较长的时延。公平或者“按次序”实现也是有可能的。
  • 在 reader 处于活动状态而 writer 处于等待状态时,确定是否向请求读取锁的 reader 授予读取锁。Reader 优先会无限期地延迟 writer,而 writer 优先会减少可能的并发。
  • 确定是否重新进入锁:可以使用带有写入锁的线程重新获取它吗?可以在保持写入锁的同时获取读取锁吗?可以重新进入写入锁本身吗?
  • 可以将写入锁在不允许其他 writer 干涉的情况下降级为读取锁吗?可以优先于其他等待的 reader 或 writer 将读取锁升级为写入锁吗?

当评估给定实现是否适合您的应用程序时,应该考虑所有这些情况。  
二、读写锁的机制 
1、"读-读"不互斥,比如当前有10个线程去读(没有线程去写),这个10个线程可以并发读,而不会堵塞。 
2、 "读-写"互斥,当前有写的线程的时候,那么读取线程就会堵塞,反过来,有读的线程在使用的时候,写的线程也会堵塞,就看谁先拿到锁了。 
3、 "写-写"互斥,写线程都是互斥的,如果有两个线程去写,A线程先拿到锁就先写,B线程就堵塞直到A线程释放锁。 
三、读写锁的适用场景 
1、读多写少的高并发环境下,可以说这个场景算是最适合使用ReadWriteLock 了。 
四、使用方式 
我就以在统一监控平台中的数据接收中心的缓存操作做为使用例子了。 
  1、业务需求: 10分钟从数据库中读取所有的TP监控点到缓存中,数据接收中心实时接收到的TP数据需要判断数据中的TP监控KEY是否有效,如果无效就将此TP数据中的监控点放置到自动跑KEY中去。 
  2、程序要求 :在更新缓存时,判断KEY是否有效需要堵塞,直到缓存更新完成,程序启动时首先需要加载数据至缓存,然后每10分钟更新一次缓存。 
  3、程序代码: 
------------------------------------------------------------------------------------------------------------- 
package com.jd.ump.datareceivecenter.cache; 
/** 
*  
* @title 缓存管理接口 
* @description 主要包含两个功能:从缓存中查询数据,KEY类型为String。定时更新缓存 
@author cdwenxing 
* @date 2013-10-29 
*/

public interface CacheManager<T> extends Runnable { 
    T query(String key); 

------------------------------------------------------------------------------------------------------------- 
package com.jd.ump.datareceivecenter.cache.impl;

import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.concurrent.locks.ReadWriteLock; 
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.log4j.Logger; 
import org.springframework.beans.factory.InitializingBean; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.jdbc.core.JdbcTemplate; 
import org.springframework.jdbc.core.RowMapper; 
import org.springframework.stereotype.Component;

import com.jd.ump.datareceivecenter.cache.CacheManager; 
import com.jd.ump.datareceivecenter.vo.TpKeyInfo; 
import com.jd.ump.profiler.CallerInfo; 
import com.jd.ump.profiler.proxy.Profiler;

/** 
*  
* @title tp 所有的 key缓存 
* @description 包含key的查询的和更新 
@author cdwenxing 
* @date 2013-2-5 
*/ 
@Component("tpKeyCacheManager") 
public class TpKeyCacheManager implements CacheManager<TpKeyInfo>,InitializingBean  {

private   final static Logger   LOGGER    = Logger.getLogger(TpKeyCacheManager.class);

private final Map<String,TpKeyInfo> cache =  new ConcurrentHashMap<String,TpKeyInfo>();

@Autowired 
    private   JdbcTemplate jdbcTemplate;

public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { 
        this.jdbcTemplate = jdbcTemplate; 
    } 
    /** 
     * 查询所有的tp key 
     */ 
    private static final String sql = "select mc_accesskey,mc_analysis_frequency from ump_method_config(nolock)  where mc_status = 1";

private final ReadWriteLock lock = new ReentrantReadWriteLock();

public TpKeyCacheManager(){ 
        LOGGER.info("TpKeyCacheManager create"); 
    } 
    public TpKeyCacheManager(JdbcTemplate  jdbcTemplate){ 
        this.jdbcTemplate = jdbcTemplate; 
    }

@Override 
    public TpKeyInfo query(String key) { 
        lock.readLock().lock(); 
        try{ 
            return cache.get(key); 
        }finally{ 
            lock.readLock().unlock(); 
        } 
    }

@Override 
    public void run() { 
        CallerInfo callerInfo = Profiler.registerInfo("ump.drc.web.cache.tpKeyCacheManager", false, true); 
        try{ 
            LOGGER.info("query all TP key sql is:" + sql); 
            //从数据库中获取数据时不需要加锁 
            final List<TpKeyInfo> rows = queryAllTpData();

//获取数据成功后开始加锁 
            lock.writeLock().lock(); 
            try{ 
                    updateCache(rows); 
            } 
            finally 
            { 
                //释放锁 
                    lock.writeLock().unlock(); 
            } 
            LOGGER.info("TpKeyCacheManager Cache init end"); 
        } 
        catch(Exception e){ 
            LOGGER.error("class[TpKeyCacheManager] method[run] invoke fail!",e); 
            Profiler.functionError(callerInfo); 
        }finally{ 
            Profiler.registerInfoEnd(callerInfo); 
        } 
    }

@Override 
    public void afterPropertiesSet() throws Exception { 
        //程序启动时就需要加载数据并同时加上锁,避免数据接收中心收到数据后在空的缓存中进行比较 
        lock.writeLock().lock(); 
        try{ 
            final List<TpKeyInfo> rows = queryAllTpData(); 
            updateCache(rows); 
        }catch(Exception e){ 
            LOGGER.error("load all tp key on error!",e); 
        } 
        finally 
        { 
            lock.writeLock().unlock(); 
        } 
    } 
    /** 
     * 更新缓存 
     * @param rows 
     */ 
    private void updateCache(final List<TpKeyInfo> rows){ 
        this.cache.clear(); 
        if(rows !=null && !rows.isEmpty()){ 
            LOGGER.info("Query results all TP key size:" + rows.size()); 
            for(TpKeyInfo bcVO:rows){ 
                this.cache.put(bcVO.getBusinessKey(), bcVO); 
                if(LOGGER.isDebugEnabled()){ 
                    LOGGER.debug("all TP key value:" + bcVO.getBusinessKey()); 
                } 
            } 
        } 
        else 
        { 
            LOGGER.warn("Query results all TP key size: 0 or is null"); 
        } 
    } 
    /** 
     * 从数据库中获取数据 
     * @return 
     */ 
    private List<TpKeyInfo> queryAllTpData(){ 
        final List<TpKeyInfo> rows = this.jdbcTemplate.query(sql,new RowMapper<TpKeyInfo>(){ 
            @Override 
            public TpKeyInfo mapRow(ResultSet rs, int rowNum) throws SQLException { 
                TpKeyInfo bcVO = new TpKeyInfo(); 
                bcVO.setBusinessKey(rs.getString("mc_accesskey")); 
                bcVO.setAnalysisFrequency(rs.getInt("mc_analysis_frequency")); 
                return bcVO; 
            } 
        }); 
        return rows; 
    }

}

------------------------------------------------------------------------------------------------------------- 
package com.jd.ump.datareceivecenter.task;

import java.util.Calendar; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger; 
import org.springframework.beans.factory.DisposableBean; 
import org.springframework.beans.factory.InitializingBean; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.stereotype.Component; 
/** 
*  
* @title drc 定时器管理模块 
* @description 主要用于对缓存的定时更新进行调度 
@author cdwenxing 
* @date 2013-3-27 
*/ 
@Component("taskManager") 
public class TaskManager implements InitializingBean,DisposableBean{

private final static Logger LOGGER = Logger.getLogger(TaskManager.class); 
    /** 
     * 定时器线程池 
     */ 
    private  final ScheduledExecutorService timeTaskExecutor  = Executors.newScheduledThreadPool(5); 
    /** 
     * tpKeyCache定时更新 
     */ 
    @Autowired 
    private Runnable tpKeyCacheManager; 
    /** 
     * jvmKeyCache定时更新 
     */ 
    @Autowired 
    private Runnable jvmKeyCacheManager; 
    /** 
     * buKeyCache定时更新 
     */ 
    @Autowired 
    private Runnable businessCacheManager;

/** 
     * aliveKeyCache定时更新 
     */ 
    @Autowired 
    private Runnable aliveKeyCacheManager; 
    /** 
     * bizKeyCacheManager定时更新 
     */ 
    @Autowired 
    private Runnable bizKeyCacheManager;

//modified by cdxuxiaolong 2013-08-22 START 
    /** 
     * jvmIdCache定时刷新 
     */ 
    @Autowired 
    private Runnable jvmIdCacheManager; 
    //modified by cdxuxiaolong 2013-08-22 STOP

public TaskManager(){ 
        LOGGER.info("TaskManager cretate"); 
    }

/** 
     * 启动缓存定时器 
     * @return 
     */ 
    public boolean start(){ 
        LOGGER.info("start timer task begin"); 
        final Calendar calendar = Calendar.getInstance(); 
        //计算首次延迟时间,10分钟减少当前离10分钟整数时间的分钟差乘上60秒并且减少当前分钟的中秒数 
        final long initialDelay = (10 - (calendar.get(Calendar.MINUTE)%10))*60 - calendar.get(Calendar.SECOND); 
        //任务重复执行时间间隔 
        final long period = 600;//10分钟*60秒

//向线程池提交定时更新缓存的任务,定时时间的间隔为10分钟=600秒 
        timeTaskExecutor.scheduleAtFixedRate(tpKeyCacheManager, initialDelay, period, TimeUnit.SECONDS); 
        //10分钟更新一次 
        timeTaskExecutor.scheduleAtFixedRate(businessCacheManager, initialDelay + 60, period , TimeUnit.SECONDS); 
        //10分钟更新一次 
        timeTaskExecutor.scheduleAtFixedRate(jvmKeyCacheManager, initialDelay + 120, period , TimeUnit.SECONDS); 
        //10分钟更新一次 
        timeTaskExecutor.scheduleAtFixedRate(aliveKeyCacheManager, initialDelay + 180, period , TimeUnit.SECONDS);         
        //10分钟更新一次 
        timeTaskExecutor.scheduleAtFixedRate(bizKeyCacheManager, initialDelay + 240, period , TimeUnit.SECONDS); 
        //modified by cdxuxiaolong 2013-08-22 START 
        final long idPeriod = 300;//5分钟*60秒 
        timeTaskExecutor.scheduleAtFixedRate(jvmIdCacheManager, initialDelay + 30, idPeriod, TimeUnit.SECONDS); 
        //modified by cdxuxiaolong 2013-08-22 STOP 
        LOGGER.info("start timer task end"); 
        return true;

}

public boolean stop(){ 
        timeTaskExecutor.shutdown(); 
        return true; 
    }

@Override 
    public void afterPropertiesSet() throws Exception { 
        //启动定时任务 
        start(); 
    }

@Override 
    public void destroy() throws Exception { 
        stop(); 
    } 
}

  3、程序性能: 
     1、每次从数据库获取数据到更新完缓存所用时间99.9%用时在10毫秒内,从缓存中读取数据时性能就更高了,因为缓存采用的是Map(实现类为ConcurrentHashMap)。

时间: 2024-08-26 02:50:38

读写锁:ReadWriteLock的相关文章

多线程之Lock锁和读写锁ReadWriteLock

JDK1.5之后有提供了另一种加锁方式Lock锁.Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作.此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的Condition 对象. 锁是控制多个线程对共享资源进行访问的工具.通常,锁提供了对共享资源的独占访问.一次只能有一个线程获得锁,对共享资源的所有访问都需要首先获得锁.不过,某些锁可能允许对共享资源并发访问,如ReadWriteLock 的读取锁. synchronized 方法或语句的使用

读写锁ReadWriteLock

为了提高性能,Java提供了读写锁,在读的地方使用读锁,在写的地方使用写锁,灵活控制,如果没有写锁的情况下,读是无阻塞的,在一定程度上提高了程序的执行效率. Java中读写锁有个接口java.util.concurrent.locks.ReadWriteLock,也有具体的实现ReentrantReadWriteLock,详细的API可以查看JavaAPI文档. ReentrantReadWriteLock 和 ReentrantLock 不是继承关系,但都是基于 AbstractQueuedS

【Java】读写锁 ReadWriteLock接口

和被synchronized修饰的对象同时只能被一个线程访问不同,ReadWriteLock接口提供了更细粒度锁机制.ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作.只要没有 writer,读取锁可以由多个 reader 线程同时保持,但是写入锁是独占的. 下面是测试代码: import org.junit.Test; import java.text.SimpleDateFormat; import java.util.Date; import java

JDK读写锁ReadWriteLock的升级和降级问题

JDK提供了写锁接口ReadWriteLock和它的实现ReentrantReadWriteLock.要实现一个读写锁,需要考虑很多细节,其中之一就是锁升级和锁降级的问题.什么是升级和降级呢?ReadWriteLock的javadoc有一段话: Can the write lock be downgraded to a read lock without allowing an intervening writer? Can a read lock be upgraded to a write

读写锁ReadWriteLock和缓存实例

读写锁:多个读锁不互斥,读锁与些锁互斥,写锁与写锁互斥.即:读的时候不允许写,写的时候不允许读,可以同时读. synchronized关键字和普通的Lock构造的锁,会造成读与读之间的互斥,因此读写锁可提高性能. 例子1:三个线程同时对一个共享数据进行读写. import java.util.Random; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantRea

深入理解读写锁—ReadWriteLock源码分析

ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁.读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的. 所有读写锁的实现必须确保写操作对读操作的内存影响.换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容. 读写锁比互斥锁允许对于共享数据更大程度的并发.每次只能有一个写线程,但是同时可以有多个线程并发地读数据.ReadWriteLock适用于读多写少的并发情况. Java并发包中ReadWriteLock是一个接口,主要有两个方法,如下: public i

ReadWriteLock: 读写锁

ReadWriteLock: 读写锁 ReadWriteLock: JDK1.5提供的读写分离锁,采用读写锁分离可以有效帮助减少锁竞争. 特点: 1).使用读写锁.当线程只进行读操作时,可以允许多个线程同时读 2).写写操作,读写操作间依然需要相互等待和持有锁. 一).使用读写锁与使用重入锁进行读读写操作 开启200个线程,测试读写锁和重入锁的读效率. 使用重入锁进行读写操作:ReentrantLock_Rw import java.util.concurrent.locks.Reentrant

读写锁之ReadWriteLock

你可能有这样一个疑问,Java SDK 并发包里为什么还有很多其他的工具类呢?原因很简单:分场景优化性能,提升易用性. 接下来我们聊聊,针对读多写少这种并发场景,Java SDK 并发包提供了读写锁——ReadWriteLock 读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条,尤其注意第三条,之后我们讲跟其他锁的对比会用到. 基本原则: 1. 允许多个线程同时读共享变量: 2.只允许一个线程写共享变量: 3. 如果一个写线程正在执行写操作,此时禁止读

深入浅出 Java Concurrency (13): 锁机制 part 8 读写锁 (ReentrantReadWriteLock) (1)[转]

从这一节开始介绍锁里面的最后一个工具:读写锁(ReadWriteLock). ReentrantLock 实现了标准的互斥操作,也就是一次只能有一个线程持有锁,也即所谓独占锁的概念.前面的章节中一直在强调这个特点.显然这个特点在一定程度上面减低了吞吐量,实际上独占锁是一种保守的锁策略,在这种情况下任何“读/读”,“写/读”,“写/写”操作都不能同时发生.但是同样需要强调的一个概念是,锁是有一定的开销的,当并发比较大的时候,锁的开销就比较客观了.所以如果可能的话就尽量少用锁,非要用锁的话就尝试看能

锁,同步,可重入锁,读写锁(转)

1.synchronized 把代码块声明为 synchronized,有两个重要后果,通常是指该代码具有 原子性(atomicity)和 可见性(visibility). 1.1 原子性 原子性意味着个时刻,只有一个线程能够执行一段代码,这段代码通过一个monitor object保护.从而防止多个线程在更新共享状态时相互冲突. 1.2 可见性 可见性则更为微妙,它要对付内存缓存和编译器优化的各种反常行为.它必须确保释放锁之前对共享数据做出的更改对于随后获得该锁的另一个线程是可见的 . 作用: