Java总结(十一)——通过Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件

一.通过Callable接口实现多线程

1.Callable接口介绍:

(1)java.util.concurrent.Callable是一个泛型接口,只有一个call()方法

(2)call()方法抛出异常Exception异常,且返回一个指定的泛型类对象

2.Callable接口实现多线程的应用场景

(1)当父线程想要获取子线程的运行结果时

3.使用Callable接口实现多线程的步骤

(1)第一步:创建Callable子类的实例化对象

(2)第二步:创建FutureTask对象,并将Callable对象传入FutureTask的构造方法中

(注意:FutureTask实现了Runnable接口和Future接口)

(3)第三步:实例化Thread对象,并在构造方法中传入FurureTask对象

(4)第四步:启动线程

例1(利用Callable接口实现线程):

利用Callable接口创建子线程类:

package call;

import java.util.concurrent.Callable;

/*

* 实现Callable接口创建子线程,指明范型为返回的数据类型

* */

public class CallDemo implements Callable<String> {

@Override

public String call() throws Exception {

String th_name = Thread.currentThread().getName();

System.out.println(th_name + "遭遇大规模敌军突袭...");

System.out.println(th_name + "迅速变换阵型...");

System.out.println(th_name + "极速攻杀敌军...");

return "敌军损失惨重,我军大获全胜";

}

}

实线程类:

package call;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.FutureTask;

public class TestCallable {

public static void main(String[] args) {

CallDemo cl = new CallDemo();// 实例化Callable子类对象

FutureTask<String> ft = new FutureTask<String>(cl);// 实例化FutureTask对象,并将Callable子类对象传入FutureTask的构造方法中

new Thread(ft, "李存孝部队——>").start();// 启动线程

Thread.currentThread().setName("李存勖部队——>");// 设置父线程名

try {

System.out.println(Thread.currentThread().getName() + "休整5000ms");

Thread.sleep(5000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(Thread.currentThread().getName() + "休整完毕..");

try {

String str = ft.get();// 利用FutureTask对象调用get()方法获取子线程的返回值

System.out.println(Thread.currentThread().getName() + "获取友军消息"

+ str);

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}

}

}

运行结果:

李存勖部队——>休整5000ms

李存孝部队——>遭遇大规模敌军突袭...

李存孝部队——>迅速变换阵型...

李存孝部队——>极速攻杀敌军...

李存勖部队——>休整完毕..

李存勖部队——>获取友军消息敌军损失惨重,我军大获全胜

例2(匿名类部类实现Callable接口创建子线程):

匿名类部类实现Callable接口创建子线程类并实现:

package call;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.FutureTask;

//匿名类部类实现Callable接口创建子线程

public class AnonyCallable {

public static void main(String[] args) {

Callable<String> cl = new Callable<String>() {

@Override

public String call() throws Exception {

System.out.println(Thread.currentThread().getName() + "正在行军~~~");

System.out.println(Thread.currentThread().getName() + "遭遇敌军~~~");

System.out.println(Thread.currentThread().getName() + "奋勇杀敌!!!!");

return "战斗胜利,俘虏敌军50000人";

}

};

FutureTask<String> ft = new FutureTask(cl);

new Thread(ft, "李存孝部队").start();

try {

Thread.currentThread().setName("李存勖部队");

Thread.sleep(3000);

System.out.println(Thread.currentThread().getName() + "休整3000ms");

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(Thread.currentThread().getName() + "整顿完毕,等待友军消息...");

try {

String str = ft.get();

System.out.println("李存勖部队得知友军消息为:" + str);

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}

}

}

运行结果:

李存孝部队正在行军~~~

李存孝部队遭遇敌军~~~

李存孝部队奋勇杀敌!!!!

李存勖部队休整3000ms

李存勖部队整顿完毕,等待友军消息...

李存勖部队得知友军消息为:战斗胜利,俘虏敌军50000人

二.生产者——消费者问题

  1. 生产者线程不断生产,消费者线程不断取走生产者生产的产品
  2. Object中的几个方法支持:

    (1)wait():线程等待,当前线程进入调用对象的线程——等待池

    (2)Notify():唤醒一个等待线程

    (3)notifyAll():唤醒全部的等到线程

    注意:以上三个方法都必须在同步机制中调用

    例3(生产者消费者问题(一对一)):

    早餐基础类:

    package one2one.producer;

    // 早餐基础类

    public class Breakfast {

    private String food;   // 吃的

    private String drink;  // 喝的

    private boolean flag=false;

    public synchronized void makeBreakfast(String food,String drink){

    if(flag){

    try {

    wait();   // 生产者线程进入同步对象维护的“线程等待池”

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    this.food=food;

    try {

    Thread.sleep(1000);   // 休眠,但不释放“锁”

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    this.drink=drink;

    flag=true;

    notify();

    }

    public synchronized void eatBreakfast(){

    if(!flag){

    try {

    wait();   // 消费者线程进入同步对象维护的“线程等待池”,而且当前线程释放"锁"

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    try {

    Thread.sleep(1000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    System.out.println(this.food+"=============>"+this.drink);

    flag=false;

    notify();

    }

    }

    生产者线程类:

    package one2one.producer;

    // 生产者线程

    public class Producer implements Runnable{

    private Breakfast bf;

    public Producer(Breakfast bf){

    this.bf=bf;

    }

    @Override

    public void run() {

    for (int i = 1; i <=7; i++) {

    if(i%2==0){

    this.bf.makeBreakfast("bread","milk");

    }else{

    this.bf.makeBreakfast("馒头","稀饭");

    }

    }

    }

    }

    消费者线程类:

    package one2one.producer;

    // 消费者线程

    public class Consumer implements Runnable{

    private Breakfast bf;

    public Consumer(Breakfast bf){

    this.bf=bf;

    }

    @Override

    public void run() {

    for (int i = 1; i <=7; i++) {

    System.out.println("星期"+i+"早餐种类:food======>drink");

    this.bf.eatBreakfast();

    }

    }

    }

    测试类:

    package one2one.producer;

    public class Test {

    public static void main(String[] args) {

    Breakfast bf=new Breakfast();

    new Thread(new Producer(bf)).start();   // 启动生产者线程

    new Thread(new Consumer(bf)).start();   // 启动消费者线程

    }

    }

    运行结果:

    星期1早餐种类:food======>drink

    馒头=============>稀饭

    星期2早餐种类:food======>drink

    bread=============>milk

    星期3早餐种类:food======>drink

    馒头=============>稀饭

    星期4早餐种类:food======>drink

    bread=============>milk

    星期5早餐种类:food======>drink

    馒头=============>稀饭

    星期6早餐种类:food======>drink

    bread=============>milk

    星期7早餐种类:food======>drink

    馒头=============>稀饭

  3. (生产者消费者问题(many2many))

    生产消费基础类:

    package manytomany.product;

    public class Product {

    private int count = 0;// 商品数量

    private int MAX = 10;// 最大库存

    // 生产商品

    public synchronized void makeProduct() {

    String thread_name = Thread.currentThread().getName();// 获取生产者线程名

    if (count > MAX) {

    System.out.println("货物已满" + thread_name + "停止生产...");

    try {

    notifyAll(); // 唤醒所有的消费者线程

    wait(); // 生产者线程停止生产

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    } else {

    try {

    Thread.sleep(1000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    count++; // 生产者线程生产商品

    System.out.println(thread_name + "生产了产品,目前商品总量:" + count);

    notifyAll();// 唤醒所有消费者线程,模拟消费

    }

    }

    // 消费商品

    public synchronized void buyProduct() {

    String thread_name = Thread.currentThread().getName();// 获取线程名

    if (count <= 0) {

    System.out.println("已无货," + thread_name + "停住消费...");

    try {

    notifyAll();// 唤醒生产者线程 生产商品

    wait();// 消费者线程休眠,停止消费

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    } else {

    try {

    Thread.sleep(1000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    count--;// 开始 消费商品

    System.out.println(thread_name + "消费了一个商品,目前商品数量为:" + count);

    }

    }

    }

    生产者线程:

    package manytomany.product;

    public class Producer implements Runnable {

    private Product product; // 获取Product对象

    public Producer(Product product) {

    this.product = product;

    }

    @Override

    public void run() {

    while (true) {

    product.makeProduct();// 调用生产方法

    }

    }

    }

    消费者线程:

    package manytomany.product;

    public class Consumer implements Runnable{

    private Product product;

    public Consumer(Product product) {

    this.product = product;

    }

    @Override

    public void run() {

    while(true){

    product.buyProduct();//调用消费方法

    }

    }

    }

    测试类:

    package manytomany.product;

    public class Test {

    public static void main(String[] args) {

    Product pro = new Product();

    new Thread(new Producer(pro), "生产者1号——>").start();

    new Thread(new Producer(pro), "生产者2号——>").start();

    new Thread(new Producer(pro), "生产者3号——>").start();

    new Thread(new Producer(pro), "生产者4号——>").start();

    new Thread(new Consumer(pro), "消费者A——>").start();

    new Thread(new Consumer(pro), "消费者B——>").start();

    new Thread(new Consumer(pro), "消费者C——>").start();

    new Thread(new Consumer(pro), "消费者D——>").start();

    new Thread(new Consumer(pro), "消费者E——>").start();

    }

    }

    运行结果(截取部分):

    消费者E——>消费了一个商品,目前商品数量为:7

    消费者E——>消费了一个商品,目前商品数量为:6

    消费者E——>消费了一个商品,目前商品数量为:5

    消费者A——>消费了一个商品,目前商品数量为:4

    消费者A——>消费了一个商品,目前商品数量为:3

    消费者A——>消费了一个商品,目前商品数量为:2

    消费者A——>消费了一个商品,目前商品数量为:1

    消费者A——>消费了一个商品,目前商品数量为:0

    已无货,消费者A——>停住消费...

    已无货,消费者C——>停住消费...

    已无货,消费者D——>停住消费...

    已无货,消费者B——>停住消费...

    生产者1号——>生产了产品,目前商品总量:1

    生产者1号——>生产了产品,目前商品总量:2

    生产者1号——>生产了产品,目前商品总量:3

    生产者1号——>生产了产品,目前商品总量:4

    生产者1号——>生产了产品,目前商品总量:5

  4. 多线程下载(复制)文件

    1.使用RandomAccessFile与InputStream的skip(long n)方法使每个线程负责文件的每一部分读写。

    例(开启6个线程断点下载(复制)电影).

    下载复制线程:

    package download;

    import java.io.*;

    public class DownloadRunnable implements Runnable{

    private File srcFile;   // 源文件路径

    private long startPos;   // 每个线程的开始下载位置

    private long partTask;   // 每个线程的下载任务

    private RandomAccessFile raf;  // 用来写入

    public DownloadRunnable(File srcFile,long startPos,long partTask,RandomAccessFile raf){

    this.srcFile=srcFile;

    this.startPos=startPos;

    this.partTask=partTask;

    this.raf=raf;

    }

    @Override

    public void run() {

    System.out.println(Thread.currentThread().getName()+"准备从第"+startPos+"个字节开始读...");

    InputStream input=null;

    try {

    input=new FileInputStream(srcFile);

    input.skip(startPos);  // 跳过输入流的startPos个字节

    byte[] b=new byte[1024*1024*10];

    int len=0;

    int count=0;   // 用来记录已经读写的字节数

    while((len=input.read(b))!=-1 && count<partTask){

    raf.write(b, 0, len);

    count+=len;

    }

    System.out.println(Thread.currentThread().getName()+"已经写入了"+count+"个字节");

    } catch (Exception e) {

    e.printStackTrace();

    }finally{

    try {

    input.close();

    raf.close();

    } catch (IOException e) {

    e.printStackTrace();

    }

    }

    }

    }

    开启线程类:

    package dowload;

    import java.io.*;

    public class TestDown {

    public static void main(String[] args) {

    File srcFile = new File("e:" + File.separator + "哈利波特" + File.separator

    + "哈利波特与死亡圣器上.mkv");

    long pathTask = srcFile.length() / 6;

    for (int i = 0; i < 6; i++) {

    RandomAccessFile raf = null;

    try {

    raf = new RandomAccessFile("d:" + File.separator + "Movies.mkv", "rw");

    long startPos = pathTask * i;

    raf.seek(startPos);

    new Thread(

    new DownloadRunnable(srcFile, startPos, pathTask, raf),

    "第" + (i + 1) + "条下载线程——>").start();

    } catch (Exception e) {

    e.printStackTrace();

    }

    }

    }

    }

    【本次总结完毕】

     

原文地址:http://blog.51cto.com/13501268/2087975

时间: 2024-08-01 01:10:45

Java总结(十一)——通过Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件的相关文章

Java并发程序设计(十一)设计模式与并发之生产者-消费者模式

设计模式与并发之生产者-消费者模式 生产者-消费者模式是一个经典的多线程设计模式.它为多线程间的协作提供了良好的解决方案.在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程.生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务.生产者和消费者之间则通过共享内存缓冲区进行通信.

多线程-生产者消费者

正解博客:https://blog.csdn.net/u011863767/article/details/59731447 永远在循环(loop)里调用 wait 和 notify,不是在 If 语句 现在你知道wait应该永远在被synchronized的背景下和那个被多线程共享的对象上调用,下一个一定要记住的问题就是,你应该永远在while循环,而不是if语句中调用wait.因为线程是在某些条件下等待的--在我们的例子里,即"如果缓冲区队列是满的话,那么生产者线程应该等待",你可

多线程生产者/消费者模式实现

参考书籍<java多线程编程核心技术> 都是基于wait/notify实现的 一个生产者和一个消费者:操作值 1 package com.qf.test10.pojo; 2 3 /** 4 * @author qf 5 * @create 2018-09-18 15:59 6 */ 7 public class Entity { 8 public static String value = ""; 9 } 1 package com.qf.test10; 2 3 impor

多线程生产者消费者问题处理

一.比较低级的办法是用wait和notify来解决这个问题. 消费者生产者问题: 这个问题是一个多线程同步问题的经典案例,生产者负责生产对象,消费者负责将生成者产生的对象取出,两者不断重复此过程.这过程需要注意几个问题: 不论生产者和消费者有几个,必须保证: 1.生产者每次产出的对象必须不一样,产生的对象有且仅有出现一次: 2.消费者每次取出的对象必须不一样,取出的对象有且仅有出现一次: 3.一定是先产生该对象,然后对象才能被取出,顺序不能乱: 第一种情况:多个生产者轮流负责生产,多个消费者负责

callable接口的多线程实现方式

package com.cxy.juc; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; class MyThread implements Callable<Integer> { @Override public Integer call() throws Exception { retur

java多线程 生产者消费者案例-虚假唤醒

package com.java.juc; public class TestProductAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Produce produce = new Produce(clerk); Consumer consumer = new Consumer(clerk); new Thread(produce, "线程A").start(); ne

java多线程生产者消费者

//Java Thread producer customer class ThreadTest { public static void main(String[] args) { Q q=new Q(); Producer p=new Producer(q); Customer c=new Customer(q); Thread t0=new Thread(p); Thread t1=new Thread(c); t0.start(); t1.start(); for(int i=0;i<5

java多线程 生产者消费者模式

package de.bvb; /** * 生产者消费者模式 * 通过 wait() 和 notify() 通信方法实现 * */ public class Test1 { public static void main(String[] args) { Godown godown = new Godown(50); for (int i = 0; i < 5; i++) { new ProducerThread(i * 10, godown).start(); new ConsumerThre

Java 多线程 生产者消费者问题

1 package producer; 2 3 public class SyncStack { 4 int index =0; 5 SteamedBun[] bunArr = new SteamedBun[6]; //栈里只能放6个元素 6 7 public synchronized void push(SteamedBun bun) 8 { 9 while(index >= bunArr.length) //栈满等待 10 { 11 bun.setIndex(bunArr.length -1