JAVA 并发编程-应用篇

提到java多线程不免有些人会头大,很多概念都是很理解但是真正到了实战的时候又是不知道如何操作了,下面就结合实际项目来说说多线程的应用。

业务需求:

举例:批量插入10万条用户的相关活动优惠券

操作方式:使用固定10个大小的线程池来做,并每次处理1000条插入数据

线程类:注实现Callable<Integer>接口的是能得到返回值的线程类

public class InsertBatchThread implements Callable<Integer> {

    private int vdate;
    private int uid;
    private int count;
    private FundsInfoMapper fundsInfoMapper;
    private WmpsDayInterMapper wmpsDayInterMapper;
    private DataSource dataSource;

    public WmpsDayInterMapper getWmpsDayInterMapper() {
        if (null == wmpsDayInterMapper) {
            synchronized (this) {
                if (null == wmpsDayInterMapper) {
                    wmpsDayInterMapper = SpringContextUtils.getBean("wmpsDayInterMapper");
                }
            }
        }
        return wmpsDayInterMapper;
    }

    public FundsInfoMapper getProCommFundsInfoMapper() {
        if (null == fundsInfoMapper) {
            synchronized (this) {
                if (null == fundsInfoMapper) {
                    fundsInfoMapper = SpringContextUtils.getBean("fundsInfoMapper");
                }
            }
        }
        return fundsInfoMapper;
    }

    
    /**
     * 无参构造函数
     */
    public InsertBatchThread(){

    }

    /**
     * 无参构造函数
     */
    public InsertBatchThread(int vdate,int uid,int count){
        this.vdate=vdate;
        this.uid=uid;
        this.count=count;
        this.fundsInfoMapper=getFundsInfoMapper();
        this.wmpsDayInterMapper=getWmpsDayInterMapper();
        
    }

    /**
     * 多线程规定好的方法,如果是继承Thread类则必须实现这个方法
     */
    public Integer call() {
        int result = -1;
        try {

            //操作
            List<Integer> listUsers = fundsInfoMapper.selectUserForInsertBatch(count * 1000, 1000);
            
           //批量插入用户活动优惠券记录表
            List<WmpsDayInter> listOnePageDayInner = wmpsDayInterMapper.selectByInsertBatch(vdate,listUsers.get(0),listUsers.get(listUsers.size() - 1));
            wmpsDayInterInsertBatch(listOnePageDayInner);

        }catch (Exception e){
            result=count;
        }
        return  result;
    }

    //批量插入用户活动优惠券记录表--JDBC
    public int[] wmpsDayInterInsertBatch(List<WmpsDayInter> listOnePage) throws Exception{
        dataSource=  SpringContextUtils.getBean("dataSource");
        PreparedStatement ps = dataSource.getConnection().prepareStatement("INSERT ignore INTO t_a (uid, inter, cdate) values(?,?,?)" );
        for(WmpsDayInter oneObj :listOnePage ){
            ps.setInt(1,oneObj.getUid());
            ps.setBigDecimal(2, oneObj.getInter());
            ps.setBigDecimal(3,oneObj.getCdate());
            ps.addBatch();
        }
        return ps.executeBatch();
    }

    
}

对应的业务实现类:

/**
 * 活动优惠券
 */
@Service("FundsInfoService")
public class FundsInfoServiceImpl extends  BaseService<ProCommFundsInfo, FundsInfoExample> implements IFundsInfoService {
    private final static Logger LOGGER = LoggerFactory.getLogger(FundsInfoServiceImpl.class);
    @Autowired
    private FundsInfoMapper fundsInfoMapper;

    public FundsInfoServiceImpl() {
    }

    @Override
    public void insertDayInter(Integer vdate, Integer uid) {

		//活动优惠券条件判断
        if (vdate == null || vdate.intValue() <= 0 || uid == null || uid.intValue() < 0) {
            LOGGER.error("=insertDayInter=>error,vdate=" + vdate + ",uid=" + uid);
        } else {

			//统计符合条件用户
            int totalNum = fundsInfoMapper.countForInsertBatchByUser();
            List<Integer> listException = new ArrayList<Integer>();

			//初始化第一批次起始值
			int startRow = 0;
            try {

				//固定大小线程池
                ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

                for (int i = 0; i <= Math.floor(totalNum / 1000); i++) {
                    Future<Integer> future = fixedThreadPool.submit(new InsertBatchThread(vdate, uid, i));
                    if( future.get()>=0 ){
                        listException.add( future.get());
                    }
                }

				//活动优惠券发完,验证是否有遗漏,如果有则修复
                if(listException.size()>0){

                    for (int i = 0; i <= listException.size(); i++) {
                        Future<Integer> future = fixedThreadPool.submit(new InsertBatchThread(vdate, uid, listException.get(i)));
                        if( future.get()>=0 ){
                            listException.add( future.get());
                        }
                    }
                }

            }catch (Exception e){
                LOGGER.error("<=insertDayInter=>error",e);
            }

        }
        LOGGER.info("<=insertDayInter=>");

    }

}

问题在于,当我们需要使用多线程操作时,一般会先查询,再进行插入,但是此时如果我们没有一个合理的维度去分割数据的话,很容易造成死锁。例如如果我们没有将维度分配合理,就有可能批次一与批次二同时处理相同的数据,既要查询又要更新,互相均需要对方的锁释放才可以继续执行,导致死锁。

时间: 2024-10-13 22:23:19

JAVA 并发编程-应用篇的相关文章

《Java并发编程之美》(翟陆续著)高清pdf

<Java并发编程之美> 阿里巴巴技术专家力作,用代码说话.用实例验证,并发编程没有这么难!<Java并发编程的艺术>*作者方腾飞老师好评推荐! ? 百度网盘链接: https://pan.baidu.com/s/12oEEeDEO_YofImkpQA1bLA 提取码: pmkh  内容简介  · · · · · · 并发编程相比 Java 中其他知识点的学习门槛较高,从而导致很多人望而却步.但无论是职场面试,还是高并发/ 高流量系统的实现,却都离不开并发编程,于是能够真正掌握并发

转: 【Java并发编程】之十八:第五篇中volatile意外问题的正确分析解答(含代码)

转载请注明出处:http://blog.csdn.net/ns_code/article/details/17382679 在<Java并发编程学习笔记之五:volatile变量修饰符-意料之外的问题>一文中遗留了一个问题,就是volatile只修饰了missedIt变量,而没修饰value变量,但是在线程读取value的值的时候,也读到的是最新的数据.但是在网上查了很多资料都无果,看来很多人对volatile的规则并不是太清晰,或者说只停留在很表面的层次,一知半解. 这两天看<深入Ja

Java并发编程:锁的释放

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd

JAVA并发编程J.U.C学习总结

前言 学习了一段时间J.U.C,打算做个小结,个人感觉总结还是非常重要,要不然总感觉知识点零零散散的. 有错误也欢迎指正,大家共同进步: 另外,转载请注明链接,写篇文章不容易啊,http://www.cnblogs.com/chenpi/p/5614290.html 本文目录如下,基本上涵盖了J.U.C的主要内容: JSR 166及J.U.C Executor框架(线程池. Callable .Future) AbstractQueuedSynchronizer(AQS框架) Locks & C

JAVA并发编程2_线程安全&amp;内存模型

"你永远都不知道一个线程何时在运行!" 在上一篇博客JAVA并发编程1_多线程的实现方式中后面看到多线程中程序运行结果往往不确定,和我们预期结果不一致.这就是线程的不安全.线程的安全性是非常复杂的,没有任何同步的情况下,多线程的执行顺序是不可预测的.当多个线程访问同一个资源时就会出现线程安全问题.例如有一个银行账户,一个线程往里面打钱,一个线程取钱,要是得到不确定的结果那是多么可怕的事情. 引入: 例如下面的程序,在单线程下,执行两次i++理论上i的最终值是12,但是在多线程环境下则不

【转】Java并发编程:线程池的使用

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java并发编程(一)

Java并发编程(一) 之前看<Thinking In Java>时,并发讲解的挺多的,自己算是初步了解了并发.但是其讲解的不深入,自己感觉其讲解的不够好.后来自己想再学一学并发,买了<Java并发编程实战>,看了一下讲的好基础.好多的理论,而且自我感觉讲的逻辑性不强.最后,买了本<Java并发编程的艺术>看,这本书挺好的,逻辑性非常强. 1. 概述 本篇文章主要内容来自<Java并发编程的艺术>,其讲解的比较深入,自己也有许多不懂的地方,然后自己主要把它讲

12、Java并发编程:阻塞队列

Java并发编程:阻塞队列 在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产

Java并发编程-非阻塞同步方式原子类(Atomic)的使用

非阻塞同步 在大多数情况下,我们为了实现线程安全都会使用Synchronized或lock来加锁进行线程的互斥同步,但互斥同步的最主要的问题就是进行线程的阻塞和唤醒所带来的性能问题,因此这种阻塞也称作阻塞同步.从处理问题的方式上说,互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题,无论共享数据是否真的会出现竞争,它都会进行加锁.用户态核心态转换.维护锁的计数器和检查是否有被阻塞的线程需要被唤醒等操作. 随着硬件指令集的发展,我们有了另一个选择:基于冲突检测的乐