封装自己的ThreadPool

/**
 * Created by lzd on 2016年6月6日 下午5:06:56
 */
public class JdPriceUtils{

    private static final Logger log = Logger.getLogger(JdPriceUtils.class);

    private int threads = 1;
    private ThreadPool threadPool;
    private DbStore store;

    private IRedis redis = RedisFactory.getShardedRedis();
    private ReentrantLock LOCK = new ReentrantLock();
    private Condition condition = this.LOCK.newCondition();
    private final LinkedList<JdBookInfo> list = new LinkedList<>();
    private final Object LIST_LOCK = new Object();
//    private static final CloseableHttpClient client = HttpClients.createDefault();

    public interface DbStore{
        List<JdBookInfo> get();
        void save(JdBookInfo book);
    }

    public JdPriceUtils(DbStore dbStore){
        this.store = dbStore;
    }

    public void init(){
        threadPool = new ThreadPool(this.getThreads());
        while (true) {
            synchronized (LIST_LOCK) {
                list.addAll(product());
                log.info("list container init size = " + list.size());
            }
            if (list.size() == 0){
                this.await();
            }else {
                consume();
                this.signal();
            }
        }
    }

    protected List<JdBookInfo> product(){
        return store.get();
    }

    protected void consume(){
        while (list.size() > 0) {
            final JdBookInfo jdBookInfo;
            synchronized (LIST_LOCK) {
                jdBookInfo = list.removeFirst();
            }
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        handlePrice(jdBookInfo);
                    } catch (Exception e) {
                        log.error("handle price is error",e);
                    }
                }
            });
        }
    }

    protected void handlePrice(JdBookInfo jdBookInfo){
        log.info(Thread.currentThread().getName() + "--处理--" + jdBookInfo.getJsin() + "--的价格");
        if( jdBookInfo == null || jdBookInfo.getJsin() == null) {
            new IllegalArgumentException("param is not valid");
        }
        String code = jdBookInfo.getJsin();
        String res = null;
        while (StringUtils.isEmpty(res)) {
             res = getPrice(code);
        }

        Map map = Collections.EMPTY_MAP;;
        try {
            String json = res.substring(res.indexOf("{"), res.lastIndexOf("}")+1);
            map = JSON.parseObject(json,Map.class);
            log.info("request return json = [" + json + "]");
        } catch (Exception e) {
            addList(jdBookInfo);
            log.warn("json parse is error , again back list");
            return;
        }
        String error = String.valueOf(map.get("error"));
        if(!"null".equals(error)){
            addList(jdBookInfo);
            log.warn("get pdos_captcha,list add book = [jsin = "+jdBookInfo.getJsin()+";price = "+jdBookInfo.getPreface() + " or " + jdBookInfo.getSalePrice());
            return;
        }

        String price = String.valueOf(map.get("m"));
        String salePrice = String.valueOf(map.get("p"));
        //如果价格为null时候,设默认值为0;
        if ("null".equals(price)) {
            price = "0";
        } if ("null".equals(salePrice)) {
            salePrice = "0";
        }
        JdBookInfo book = new JdBookInfo();
        book.setJsin(jdBookInfo.getJsin());
        book.setPrice(price);
        book.setSalePrice(salePrice);
        book.setId(jdBookInfo.getId());
        store.save(book);
    }

    private void addList(JdBookInfo jdBookInfo){
        synchronized (LIST_LOCK) {
            list.addLast(jdBookInfo);
        }
    }

    protected String getPrice(String code){
        String url = "http://p.3.cn/prices/get?skuid=J_"+code+"&type=1&area=1_72_2840&callback=cnp";
        String res = null;
        try {
            String sRandMember = null;
            String[] split = null;
            while (sRandMember == null) {
                sRandMember = redis.sRandMember("proxies");
                if(sRandMember == null) continue;
                split = sRandMember.split(":");
            }

            HttpHost httpHost = new HttpHost(split[0],Integer.parseInt(split[1]));
            RequestConfig reqConfig = RequestConfig.custom().setProxy(httpHost)
                    .setConnectionRequestTimeout(10000)
                    .setConnectTimeout(10000)
                    .setSocketTimeout(10000).build();
            log.info("request httpHost = " + split[0]+":"+split[1]);
            RequestBuilder requestBuilder = RequestBuilder.get(url).setConfig(reqConfig);
            CloseableHttpResponse response = HttpClients.createDefault().execute(requestBuilder.build());
            InputStream in = response.getEntity().getContent();
            byte[] byteArray = IOUtils.toByteArray(in);
            res = new String(byteArray,"utf-8");
        } catch (Exception e) {
            log.error("get conntion price url is error");
        }

        return res;
    }

    private void await(){
        this.LOCK.lock();
        try {
            log.info("main thrad is await ...");
            this.condition.await(30000L,TimeUnit.MILLISECONDS);
        } catch (Exception e) {

        } finally {
            this.LOCK.unlock();
        }
    }

    private void signal(){
        this.LOCK.lock();
        try {
            this.condition.signal();
        } catch (Exception e) {

        } finally {
            this.LOCK.unlock();
        }
    }

    public int getThreads() {
        return threads;
    }

    public JdPriceUtils setThreads(int threads) {
        this.threads = threads;
        return this;
    }

    public void start(){
        this.init();
    }

    public class ThreadPool{
        private int threads;
        private ExecutorService executorService;
        private AtomicInteger threadActive = new AtomicInteger(0);
        private ReentrantLock poolLock = new ReentrantLock();
        private Condition poolCondition = this.poolLock.newCondition();

        public ThreadPool(int threads){
            this.threads = threads;
            executorService = Executors.newFixedThreadPool(threads);
        }

        public void execute(final Runnable runnable){
            try {
                poolLock.lock();
                if (threadActive.get() >= threads ) {
                    while (threadActive.get() >= threads ){
                        try {
                            poolCondition.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            } catch (Exception e) {

            } finally {
                poolLock.unlock();
            }

            threadActive.incrementAndGet();

            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    runnable.run();

                    try {
                        poolLock.lock();
                        poolCondition.signal();
                        threadActive.decrementAndGet();
                    } catch (Exception e) {
                    } finally {
                        poolLock.unlock();
                    }
                }
            });
        }
    }
}
时间: 2024-11-08 01:54:40

封装自己的ThreadPool的相关文章

使用C++11封装线程池ThreadPool

读本文之前,请务必阅读: 使用C++11的function/bind组件封装Thread以及回调函数的使用 Linux组件封装(五)一个生产者消费者问题示例   线程池本质上是一个生产者消费者模型,所以请熟悉这篇文章:Linux组件封装(五)一个生产者消费者问题示例. 在ThreadPool中,物品为计算任务,消费者为pool内的线程,而生产者则是调用线程池的每个函数. 搞清了这一点,我们很容易就需要得出,ThreadPool需要一把互斥锁和两个同步变量,实现同步与互斥. 存储任务,当然需要一个

C#中实现并发

C#中实现并发的几种方法的性能测试 0x00 起因 去年写的一个程序因为需要在局域网发送消息支持一些命令和简单数据的传输,所以写了一个C/S的通信模块.当时的做法很简单,服务端等待链接,有用户接入后开启一个线程,在线程中运行一个while循环接收数据,接收到数据就处理.用户退出(收到QUIT命令)后线程结束.程序一直运行正常(当然还要处理“TCP粘包”.消息格式封装等问题,在此不作讨论),不过随着使用的人越来越多,而且考虑到线程开销比较大,如果有100个用户链接那么服务端就要多创建100个线程,

C#中实现并发的几种方法的性能测试

0x00 起因 去年写的一个程序因为需要在局域网发送消息支持一些命令和简单数据的传输,所以写了一个C/S的通信模块.当时的做法很简单,服务端等待链接,有用户接入后开启一个线程,在线程中运行一个while循环接收数据,接收到数据就处理.用户退出(收到QUIT命令)后线程结束.程序一直运行正常(当然还要处理“TCP粘包”.消息格式封装等问题,在此不作讨论),不过随着使用的人越来越多,而且考虑到线程开销比较大,如果有100个用户链接那么服务端就要多创建100个线程,500个用户就是500个线程,确实太

基于线程池、消息队列和epoll模型实现Client-Server并发架构

引言 并发是什么?企业在进行产品开发过程中为什么需要考虑这个问题?想象一下天猫的双11和京东的618活动,一秒的点击量就有几十万甚至上百万,这么多请求一下子涌入到服务器,服务器需要对这么多的请求逐个进行消化掉,假如服务器一秒的处理能力就几万,那么剩下的不能及时得到处理的这些请求作何处理?总不能让用户界面一直等着,因此消息队列应运而生,所有的请求都统一放入消息队列,工作线程从消息队列不断的消费,消息队列相当于一个缓冲区,可达到解藕.异步和削峰的目的. Kafka.ActiveMQ.RabbitMQ

boost之ThreadPool

boost之ThreadPool 版权声明:本文为博主原创文章,未经博主允许不得转载. threadpool是基于boost库实现的一个线程池子库,但线程池实现起来不是很复杂.我们从threadpool中又能学到什么东西呢? 它是基于boost库实现的,如果大家对boost库有兴趣,看看一个简单的实现还是可以学到点东西的. threadpool基本功能 1.任务封装,包括普通任务(task_func)和优先级任务(prio_task_func). 2.调度策略,包括fifo_scheduler(

封装一个简单的solrserver组件

一个简单的solrserver组件 实现索引更新的异步处理,以及查询接口,日志/线程池/队列监控没有加上. SolrDocment封装 接口: public interface ISolrDocument { public SolrInputDocument convertToInputDocument() throws Exception; public void buildSolrDocument(SolrDocument document) throws Exception; } 实现:

封装Socket.BeginReceive/EndReceive以支持Timeout

Socket .NET中的Socket类提供了网络通信常用的方法,分别提供了同步和异步两个版本,其中异步的实现是基于APM异步模式实现,即BeginXXX/EndXXX的方式.异步方法由于其非阻塞的特性,在需考虑程序性能和伸缩性的情况下,一般会选择使用异步方法.但使用过Socket提供的异步方法的同学,应该都会注意到了Socket的异步方法是无法设置Timeout的.以Receive操作为例,Socket提供了一个ReceiveTimeout属性,但该属性设置的是同步版本的Socket.Rece

简单看看ThreadPool的源码以及从中看出线程间传值的另一种方法

这几天太忙没时间写博客,今天回家就简单的看了下ThreadPool的源码,发现有一个好玩的东西,叫做”执行上下文“,拽名叫做:”ExecutionContext“. 一:ThreadPool的大概流程. 第一步:它会调用底层一个helper方法. 第二步:走进这个helper方法,我们会发现有一个队列,并且这个队列的item必须是QueueUserWorkItemCallback的实例,然后这就激发了我的 兴趣,看看QueueUserWorkItemCallback到底都有些什么? 第三步:走到

野生程序员对.NETFramework 4.0 ThreadPool的理解

ThreadPool 类 提供一个线程池,该线程池可用于执行任务.发送工作项.处理异步 I/O.代表其他线程等待以及处理计时器. 命名空间:   System.Threading程序集:  mscorlib(位于 mscorlib.dll) 版本信息 .NET Framework 自 1.1 起可用可移植类库 在 可移植 .NET 平台 中受支持Silverlight 自 2.0 起可用Windows Phone Silverlight 自 7.0 起可用 线程安全 This type is t