简单的生产消费者模型

案例场景:httpclient4.3.5抓取网页,用自带的线程池进行多线程测试。

httpclient4.3.5简单介绍:对于同一主机的请求,会保存路由信息,下次的请求会根据保存的路由走,减少了查找主机的时间。

类介绍:数据结构用的阻塞队列结构;监控线程、生产线程、消费线程

代码如下:

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;

public class AQSTest {

    public static void main(String[] args){

        BlockQueue queue = new BlockQueue();
        monitor motor = new monitor(queue) ;
        motor.start() ;// 监控队列大小
        Producer prod1 = new Producer(queue) ;
        Producer prod2 = new Producer(queue) ;
        Producer prod3 = new Producer(queue) ;

        prod1.start();
        prod2.start();
        prod3.start();

        Crawel crawel = new Crawel(queue);
        crawel.start() ;

        try {
            Thread.sleep(60*1000*2);
            prod1.shutdown();
            prod2.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /*
        int i = new Random().nextInt()%5;
        System.out.println(i);
        */
    }

    public static class Crawel extends Thread{

        private BlockQueue queue ;
        public Crawel(BlockQueue _queue){
            queue = _queue ;
        }
        @Override
        public void run() {

            PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
            // 将最大连接数增加到200
            cm.setMaxTotal(200);
            // 将每个路由基础的连接增加到20
            cm.setDefaultMaxPerRoute(20);
            //将目标主机的最大连接数增加到50
            HttpHost localhost = new HttpHost("www.yeetrack.com", 80);
            cm.setMaxPerRoute(new HttpRoute(localhost), 50);

            CloseableHttpClient httpClient = HttpClients.custom()
                    .setConnectionManager(cm)
                    .build();
         // 为每个url创建一个线程,GetThread是自定义的类
            GetThread[] threads = new GetThread[50];
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new GetThread(httpClient,queue );
            }

            // 启动线程
            for (int j = 0; j < threads.length; j++) {
                threads[j].start();
            }

            // join the threads
            for (int j = 0; j < threads.length; j++) {
                try {
                    threads[j].join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class GetThread extends Thread {

        private final CloseableHttpClient httpClient;
        private final HttpContext context;
        private  HttpGet httpget;
        private volatile boolean stop = false;
        private final BlockQueue queue ;

        public GetThread(CloseableHttpClient httpClient,BlockQueue _queue) {
            this.httpClient = httpClient;
            this.context = HttpClientContext.create();
            this.queue = _queue ;
        }

        public void shutdown(){
            synchronized (this) {
                stop  = true ;
            }
        }

        @Override
        public void run() {
            while(!stop){
                try {
                    String url = queue.take() ;
                    httpget = new HttpGet(url);
                    CloseableHttpResponse response = httpClient.execute(
                            httpget, context);
                    try {
                        HttpEntity entity = response.getEntity();
                        String resp = EntityUtils.toString(entity);

                        System.out.println("成功获取源码");
                        EntityUtils.consume(entity);
                    } finally {
                        response.close();
                    }
                } catch (ClientProtocolException ex) {
                    // Handle protocol errors
                } catch (IOException ex) {
                    // Handle I/O errors
                }
            }

        }

    }

    public static class monitor extends  Thread{

        private BlockQueue queue ;
        private volatile boolean shutdown = false;
        public monitor(BlockQueue _queue){
            queue = _queue ;
        }
        public void run() {
                while(!shutdown){
                    System.out.println("Queue Size: "+queue.size());
                    try {
                        sleep(500) ;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        }
        public void shutdown(){
            synchronized (this) {
                shutdown = true;
            }
        }
    }

    public static class Const {

        private static String[] urls = {"http://www.baidu.com","http://www.bing.com","http://www.hao123.com","http://www.163.com"
                ,"http://www.csdn.net"};

        private static Lock lock = new ReentrantLock() ;
        public static String get(){
            lock.lock() ;
            int i = new Random().nextInt()%urls.length;
            i = Math.abs(i);
            String str = urls[i] ;
            lock.unlock() ;
            return str ;
        }
    }

    public static class Producer extends Thread {

        private volatile boolean stop = false;
        private BlockQueue queue ;
        public Producer(BlockQueue _queue){
            queue = _queue ;
        }
        @Override
        public void run() {
            while(!stop){
                queue.put(Const.get()) ;
                try {
                    sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void shutdown(){
            synchronized (this) {
                stop = true;
            }
        }

    }

    public static class BlockQueue {
        public  LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(1000);

        public void put(String url){
            try {
                queue.put(url) ;
            } catch (InterruptedException e) {
                e.printStackTrace();
                queue.clear();
            }
        }

        public String take(){
            String element = null ;
            try {
                element = queue.take() ;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return element ;
        }

        public int size(){
            int size = queue.size() ;
            return size ;
        }
    }

    public static class IdleConnectionMonitorThread extends Thread {

        private final HttpClientConnectionManager connMgr;
        private volatile boolean shutdown;

        public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
            super();
            this.connMgr = connMgr;
        }

        @Override
        public void run() {
            try {
                while (!shutdown) {
                    synchronized (this) {
                        wait(5000);
                        // 关闭失效的连接
                        connMgr.closeExpiredConnections();
                        // 可选的, 关闭30秒内不活动的连接
                        connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
                    }
                }
            } catch (InterruptedException ex) {
                // terminate
            }
        }

        public void shutdown() {
            shutdown = true;
            synchronized (this) {
                notifyAll();
            }
        }

    }

}
时间: 2024-10-09 17:17:08

简单的生产消费者模型的相关文章

4.利用python生成器实现简单的“生产者消费者”模型

假如说,没有生成器这种对象,那么如何实现这种简单的"生产者消费者"模型呢? import time def producer(): pro_list = [] for i in range(10000): print "包子%s制作ing" %(i) time.sleep(0.5) pro_list.append("包子%s" %i) return pro_list def consumer(pro_list): for index,stuffe

java多线程 生产消费者模型

[seriesposts sid=500] 下面的代码讲述了一个故事 一个面包生产铺里目前有30个面包,有三个人来买面包,第一个人要买50个,第二个要买20个,第三个要买30个. 第一个人不够,所以等着,让第二个买了.面包铺继续生产面包.有7个人在生产. package com.javaer.thread; public class CPMode { public static void main(String[] args) { Godown godown = new Godown(30);

[golang]单向channel的应用“生产消费者模型”

单向channel应用"生产消费者模型" 单向channel最典型的应用是"生产者消费者模型" 所谓"生产者消费者模型": 某个模块(函数等)负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.协程.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者. 单单抽象出生产者和消费者,还够不上是生产者/消费者模型.该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介.生产者把数据放入

Java 多线程系列2——多线程的生命周期及生产消费者模型

一.线程的生命周期及五种基本状态 关于Java中线程的生命周期,首先看一下下面这张较为经典的图: 上图中基本上囊括了Java中多线程各重要知识点.掌握了上图中的各知识点,Java中的多线程也就基本上掌握了.主要包括: Java线程具有五中基本状态 新建状态(New):当线程对象对创建后,即进入了新建状态,如:Thread t = new MyThread(); 就绪状态(Runnable):当调用线程对象的start()方法(t.start();),线程即进入就绪状态.处于就绪状态的线程,只是说

使用队列queue实现一个简单的生产者消费者模型

一.生产者消费者模型 我们去超市商店等地购买商品时,我们大部分人都会说自己是消费者,而超市的各大供货商.工厂等,自然而然地也就成了我们的生产者.如此一来,生产者有了,消费者也有了,那么将二者联系起来的超市又该作何理解呢?诚然,它本身是作为一座交易场所而诞生. 上述情形类比到实际的软件开发过程中,经常会发现:某个线程或模块的代码负责生产数据(工厂),而生产出来的数据却不得不交给另一模块(消费者)来对其进行处理,在这之间使用了队列.栈等类似超市的东西来存储数据(超市),这就抽象除了我们的生产者/消费

简单生产消费者模型

import java.util.ArrayList; import java.util.List; public class ProduceAndConsume { public static final Object signal = new Object(); public static List<String> list = new ArrayList<String>(); public static void main(String args[]) { Thread pr

9 异常处理 操作系统 进程线程 队列+生产消费者模型 进程同步 回调函数

异常处理 异常就是程序运行时发生错误的信号,在python中,错误触发的异常如下 异常的种类: AttributeError 试图访问一个对象没有的树形,比如foo.x,但是foo没有属性x IOError 输入/输出异常:基本上是无法打开文件 ImportError 无法引入模块或包:基本上是路径问题或名称错误 IndentationError 语法错误(的子类) :代码没有正确对齐 IndexError 下标索引超出序列边界,比如当x只有三个元素,却试图访问x[5] KeyError 试图访

简单的生产-消费者程序

1 #include <stdlib.h> 2 #include <pthread.h> 3 #include <stdio.h> 4 #include <sys/unistd.h> 5 6 #define PRINT_LINE printf("FILE: %s, LINE: %d\n", __FILE__, __LINE__); 7 #define CYCLE_TIME (5) 8 typedef struct msg { 9 stru

python2.0_s12_day9之day8遗留知识(queue队列&amp;生产者消费者模型)

4.线程 1.语法 2.join 3.线程锁之Lock\Rlock\信号量 4.将线程变为守护进程 5.Event事件 * 6.queue队列 * 7.生产者消费者模型 4.6 queue队列 queue非常有用,当信息必须安全的在多个线程之间进行数据交换的时候就应该想到queue 所以,queue它能保证数据被安全的在多个线程之间进行交换,那他就是天生的线程安全. queue有那么几种: class queue.Queue(maxsize=0) # 先入先出 class queue.LifoQ