Java 线程池 +生产者消费者+MySQL读取300 万条数据

1.1需求

数据库300 万条用户数据 ,遍历获取所有用户, 各种组合关联, 获取到一个新的json ,存到redis 上。

1.2 难点

数据库比较多, 不可能单线程查询所有的数据到内存。

1.3解决办法

多线程读取, 生产者 每次获取200 条数据, 消费者去消费。(这里 主要是根据MySQL分页去获取下一个200 条数据)

1.4 代码

1.4.1 调用方法

    /**
     * 线程启动
     */
    public void update() {        //redis操作类
        HashRedisUtil redisUtil= HashRedisUtil.getInstance();
        //生产者消费者
        ProducerConsumer pc = new ProducerConsumer();
        //数据仓库
        Storage s = pc.new Storage();

        ExecutorService service = Executors.newCachedThreadPool();
        //一个线程进行查询
        Producer p = pc.new Producer(s,userMapper);
        service.submit(p);
        System.err.println("生产线程正在生产中。。。。。。。。。");
        //是个线程进行修改
        for(int i=0;i<10;i++){
            System.err.println("消费线程"+i+"正在消费中。。。。。。。。。。");
            service.submit(pc.new Consumer( redisUtil,userMapper,s));
        }

    }

1.4.2 主要核心类

package com.ypp.thread;

import java.math.BigDecimal;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.LocalDateTime;

import com.alibaba.fastjson.JSONObject;
import com.ypp.constants.Constants;
import com.ypp.mapper.UserMapper;
import com.ypp.model.User;
import com.ypp.model.UserAlis;
import com.ypp.model.UserBaseModel;
import com.ypp.model.UserVip;
import com.ypp.util.HashRedisUtil;
import com.ypp.util.JsonUtils;
import com.ypp.util.PHPSerializer;

public class ProducerConsumer {
    private static Logger logger = Logger.getLogger(ProducerConsumer.class);    //这个page 是核心, 全局变量, 当生产者生产一次 ,获取200 个用户, 会把这个page++, 下次获取就是后一个200 条用户了
    private static Integer page = 0;

   //消费者
    public class Consumer implements Runnable {

        private HashRedisUtil redisUtil;
        private UserMapper userMapper;
        private Storage s = null;

        public Consumer(HashRedisUtil redisUtil, UserMapper userMapper, Storage s) {
            super();
            this.redisUtil = redisUtil;
            this.userMapper = userMapper;
            this.s = s;
        }

        public void run() {
            try {
                while (true) {
                    User users = s.pop();

                    long bbb = System.currentTimeMillis();
                    // 获取一个用户的粉丝列表 并存到redis
                    try {
                        fansUpdate(users.getToken(), users.getUserId(), redisUtil);
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                    // 获取一个用户的关注列表, 并存到redis
                    try {
                        followUpdate(users.getToken(), users.getUserId(), redisUtil);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    // 获取一个用户的黑名单, 并存到redis
                    try {
                        blackUpdate(users.getToken(), users.getUserId(), redisUtil);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    // 用户基本信息
                    try {
                        userbaseUpdate(users.getToken(), users.getUserId(), redisUtil);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    long ccc = System.currentTimeMillis();
                    System.out.println("用户:" + users.getToken() + " 全部总共耗时:" + (ccc - bbb) + "毫秒");

                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

        public List<User> getUserInfo(Integer iThread) {
            return userMapper.findUserInfo((iThread - 1) * 200 + 1);
        }

    /**
         * 用户基本信息修改
         *
         * @param token
         * @param myuserId
         * @param redisUtil
         * @throws Exception
         */
        private void userbaseUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception {

        }

        /**
         * 更新一个用户的黑名单(原来的token改成userID)
         *
         * @param token
         * @param string
         * @param redisUtil
         * @throws Exception
         */
        private void blackUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception {

        }

        /**
         * 获取一个用户的关注
         *
         * @param token
         * @param string
         * @param redisUtil
         * @throws Exception
         */
        private void followUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception {

        }

        /**
         * 获取一个用户的粉丝列表
         *
         * @param token
         * @param userId
         * @param redisUtil
         * @throws Exception
         */
        private void fansUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception {

        }

//生产者
    public class Producer implements Runnable {
        private Storage s = null;
        private UserMapper mapper ;
        public Producer( Storage s, UserMapper mapper) {

            this.s = s;
            this.mapper = mapper;
        }
        public void run() {
            try {

                while (true) {

                    System.err.println("当前分页是:"+page+"****************************************");
                     List<User> list= mapper.findUserInfo(page);
                    s.push(list);
                    page++;
                }
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }

        }
    }
//数据仓库
    public class Storage {
        BlockingQueue<User> queues = new LinkedBlockingQueue<User>(200);

        /**
         * 生产
         *
         * @param p
         *            产品
         * @throws InterruptedException
         */
        public void push(List<User> p) throws InterruptedException {
            for(User user:p){
                queues.put(user);
            }
        }

        /**
         * 消费
         *
         * @return 产品
         * @throws InterruptedException
         */
        public User pop() throws InterruptedException {
            return queues.take();
        }
    }

}
时间: 2024-10-23 09:11:49

Java 线程池 +生产者消费者+MySQL读取300 万条数据的相关文章

通过存储过程,插入300万条数据的一点思考?

1.今天凌晨1点多开始插入数据,到现在为止,一共插入的数据大小,大约是30M数据,但是总量在190M数据左右 2.中间我去睡觉,电脑可能也是处于睡眠状态. 3.电脑的性能也很大程度决定了这个处理数据的速度(比如,位宽,是否支持超频等等吧.) 4.灵活处理问题吧,300万条数据太多,可以选择3万条数据处理,计时处理,我们可以选择3000条数据处理,做实验就是要一个模拟环境 5.千万不要死板教条, 6.及时回顾之前的知识点,核心知识点,经常性回顾.(一定会有新收获的) 7.注意身体,身体是革命的本钱

java线程之生产者消费者

看了毕向东老师的生产者消费者,就照着视频参考运行了一下,感觉还好 这个值得学习的是条理特别清晰: ProducterConsumerDemo.java中,一个资源类Resources,生产者消费者都可以访问的到. 生产者类Producter,消费者Consumer都实现了Runnable接口,在其中的run方法中实现重载,对共享资源进行生产和消费 优化: 如果以后需要加入项目中,对ProducterConsumerDemo类中加一个构造方法,public ProducterConsumerDem

Java线程通信-生产者消费者问题

线程通信示例--生产者消费者问题 这类问题描述了一种情况,假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中的产品取走消费.假设仓库中没有产品,则生产者可以将 产品放入仓库,有产品,则停止生产并等待,直到仓库中的产品被消费这取走为止. 如果仓库中放油产品,则消费者可以将产品取走消费,否则停止消费并等待,直到 仓库中再次放入产品为止. 显然,这是一个同步问题,生产者和消费这共享同一资源, 并且生产者和消费这之间彼此依赖,互为条件向前推进.Java提供了3个方法解决了线程间的

Java多线程-同步:synchronized 和线程通信:生产者消费者模式

大家伙周末愉快,小乐又来给大家献上技术大餐.上次是说到了Java多线程的创建和状态|乐字节,接下来,我们再来接着说Java多线程-同步:synchronized 和线程通信:生产者消费者模式. 一.同步:synchronized 多个线程同时访问一个对象,可能造成非线程安全,数据可能错误,所谓同步:就是控制多个线程同时访就是控制多线程操作同一个对象时,注意是同一个对象,数据的准确性, 确保数据安全,但是加入同步后因为需要等待,所以效率相对低下. 如:一个苹果,自己一个人去咬怎么都不会出问题,但是

Java线程池与java.util.concurrent

Java(Android)线程池 介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端执行一个异步任务你还只是如下new Thread吗? new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 那你就out太多了,new Thre

Java 线程池的原理与实现

最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用

Java线程池介绍

Java线程池介绍 2015-10-24 ImportNew (点击上方公号,可快速关注) 原文:allegro 译文:ImportNew - paddx 链接:http://www.importnew.com/16845.html 根据摩尔定律(Moore’s law),集成电路晶体管的数量差不多每两年就会翻一倍.但是晶体管数量指数级的增长不一定会导致 CPU 性能的指数级增长.处理器制造商花了很多年来提高时钟频率和指令并行.在新一代的处理器上,单线程程序的执行速率确实有所提高.但是,时钟频率

Java 线程池的原理与实现 (转)

  最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字

Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题.有如下几个常见的实现方法: 1. wait()/notify() 2. lock & condition 3. BlockingQueue 下面来逐一分析. 1. wait()/notify() 第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行:这也是最原始的实现. 1 public class WaitNotifyBroker<T> implements Broker&