java 多线程 同步 观察者 并发集合的一个例子

//第一版
package com.hra.riskprice;

import com.hra.riskprice.SysEnum.Factor_Type;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.swing.text.html.HTMLDocument;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.TimeUnit;

 class ForwardingSet<E> implements  Set<E>{
    private final Set<E> s;
    public ForwardingSet(Set<E> s){this.s=s;}

    @Override
    public void clear() {
        s.clear();
    }

    public boolean isEmpty(){return s.isEmpty();}
    public int size(){return s.size();}
    public Iterator<E> iterator(){return s.iterator();}
    public boolean add(E e){return s.add(e);}
    public boolean remove(Object o){return s.remove(o);}
    public boolean containsAll(Collection<?> c){return s.containsAll(c);}
    public boolean addAll(Collection<? extends  E> c){
        return s.addAll(c);
    }
    public boolean removeAll(Collection<?> c){
        return s.removeAll(c);
    }
    public boolean retainAll(Collection<?> c){
        return s.retainAll(c);
    }

    @Override
    public Object[] toArray() {
        return s.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return s.toArray(a);
    }

    @Override
    public boolean equals(Object o) {
        return s.equals(o);
    }

    @Override
    public int hashCode() {
        return s.hashCode();
    }

    @Override
    public String toString() {
        return s.toString();
    }

    @Override
    public boolean contains(Object o) {
        return s.contains(o);
    }
}

 interface  SetObserver<E>{

    void added(ObservableSet<E> set,E element);
}

 class ObservableSet<E> extends  ForwardingSet<E>{

    public ObservableSet(Set<E> set){
        super(set);
    }

    private final  List<SetObserver<E>> observers=new ArrayList<SetObserver<E>>();

    public void addObserver(SetObserver<E> observer){
        synchronized(observers){
            observers.add(observer);
        }
    }
    public boolean removeObserver(SetObserver<E> observer){
        synchronized (observers){
            return  observers.remove(observer);
        }
    }
    public void notifyElementAdded(E element){
        synchronized (observers){
            for(SetObserver<E> observer:observers){
                observer.added(this,element);
            }
        }
    }

    @Override
    public boolean add(E e) {
        boolean added=super.add(e);
        if(added){
            notifyElementAdded(e);
        }
        return added;
    }

    @Override
    public boolean addAll(Collection<? extends E> c) {
        boolean result=false;
        for(E element:c){
            result|=add(element);
        }
        return result;
    }
}

@SpringBootApplication
public class RiskpriceApplication {

    public static void main(String[] args)  {

        ObservableSet<Integer> set=new ObservableSet<Integer>(new HashSet<Integer>());
        set.addObserver(new SetObserver<Integer>() {
            @Override
            public void added(ObservableSet<Integer> s, Integer e) {
                System.out.println(e);
                if(e==23){
                    s.removeObserver(this);
                }
            }
        });
        for(int i=0;i<100;i++){
            set.add(i);
        }
    }
}
你觉得会打印0~23吗,实际上运行后就挂了,for循环遍历过程中,不允许修改枚举列表,我们可以考虑通过另外一个线程去移除这个观察者,也是下面过度得第二版了 通过 ExecutorService 

//第二版
package com.hra.riskprice;

import com.hra.riskprice.SysEnum.Factor_Type;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.swing.text.html.HTMLDocument;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

 class ForwardingSet<E> implements  Set<E>{
    private final Set<E> s;
    public ForwardingSet(Set<E> s){this.s=s;}

    @Override
    public void clear() {
        s.clear();
    }

    public boolean isEmpty(){return s.isEmpty();}
    public int size(){return s.size();}
    public Iterator<E> iterator(){return s.iterator();}
    public boolean add(E e){return s.add(e);}
    public boolean remove(Object o){return s.remove(o);}
    public boolean containsAll(Collection<?> c){return s.containsAll(c);}
    public boolean addAll(Collection<? extends  E> c){
        return s.addAll(c);
    }
    public boolean removeAll(Collection<?> c){
        return s.removeAll(c);
    }
    public boolean retainAll(Collection<?> c){
        return s.retainAll(c);
    }

    @Override
    public Object[] toArray() {
        return s.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return s.toArray(a);
    }

    @Override
    public boolean equals(Object o) {
        return s.equals(o);
    }

    @Override
    public int hashCode() {
        return s.hashCode();
    }

    @Override
    public String toString() {
        return s.toString();
    }

    @Override
    public boolean contains(Object o) {
        return s.contains(o);
    }
}

 interface  SetObserver<E>{

    void added(ObservableSet<E> set,E element);
}

 class ObservableSet<E> extends  ForwardingSet<E>{

    public ObservableSet(Set<E> set){
        super(set);
    }

    private final  List<SetObserver<E>> observers=new ArrayList<SetObserver<E>>();

    public void addObserver(SetObserver<E> observer){
        synchronized(observers){
            observers.add(observer);
        }
    }
    public boolean removeObserver(SetObserver<E> observer){
        synchronized (observers){
            return  observers.remove(observer);
        }
    }
    public void notifyElementAdded(E element){
        synchronized (observers){
            for(SetObserver<E> observer:observers){
                observer.added(this,element);
            }
        }
    }

    @Override
    public boolean add(E e) {
        boolean added=super.add(e);
        if(added){
            notifyElementAdded(e);
        }
        return added;
    }

    @Override
    public boolean addAll(Collection<? extends E> c) {
        boolean result=false;
        for(E element:c){
            result|=add(element);
        }
        return result;
    }
}

@SpringBootApplication
public class RiskpriceApplication {

    public static void main(String[] args)  throws  InterruptedException{

        ObservableSet<Integer> set=new ObservableSet<Integer>(new HashSet<Integer>());
        set.addObserver(new SetObserver<Integer>() {
            @Override
            public void added(ObservableSet<Integer> s, Integer e) {
                System.out.println(e);
                if(e==23){
                    ExecutorService excutor= Executors.newSingleThreadExecutor();
                    final SetObserver<Integer> observer=this;
                    try{
                        excutor.submit(new Runnable() {
                            @Override
                            public void run() {
                                s.removeObserver(observer);
                            }
                        }).get();

                    }catch (ExecutionException ex){
                        throw new AssertionError(ex.getCause());
                    }catch (InterruptedException ex){
                        throw new AssertionError(ex.getCause());
                    }finally {
                        excutor.shutdown();
                    }
                }
            }
        });
        for(int i=0;i<100;i++){
            set.add(i);
        }
    }
}
第二版虽然会打印到23但是实际上并没有成功,
 public void run() {
                                s.removeObserver(observer);
                            }
进入
    public boolean removeObserver(SetObserver<E> observer){
        synchronized (observers){
            return  observers.remove(observer);
        }

    }
经过同步快synchronized 的时候将会遭遇死锁,因为主线程已经锁定了observers,只有等待子线程执行完成后才会释放锁,而子线程又在等待锁的释放,这样相互的等待就造成了死锁,但是由于Java设计的锁是可重入的,这种调用不会产生死锁,但会产生一个异常,因为调用线程正在该锁所保护的线程上进行着。这种失败可能是灾难性的,本质来说这个锁,没有尽到它的职责。可重入的锁简化了多线程的面向对象程序构造,但是它可能会将活性失败,变成安全性失败(参考自Effective java)
什么解决呢,来个2.1版本吧
我们建立个快照,而不使用原observers,这样每个通知都使用了自己的快照观察者列表引用就不会死锁了
 public void notifyElementAdded(E element){
        List<SetObserver<E>> snaphot=null;//快照
        synchronized (observers){
            snaphot=new ArrayList<SetObserver<E>>(observers);
        }
        for(SetObserver<E> observer:snaphot){
            observer.added(this,element);
        }
    }

//第三版
事实上,要将外来方法的调用移出同步代码块还有更好的方法,从java1.5发行版以来,提供了并发集合 corrent collection ,称作 CopyOnWriteArrayList,
这是专门为此定制的,他是Arraylist的一种变体,通过重新拷贝整个底层数组,在这里实现所有的操作,由于内部数组永远不动(归功于重新拷贝),因此迭代不需要锁定,大量使用有性能影响,但对于观察者列表几乎不变来说却是很好的,因为他们几乎不改动,并且经常遍历
第三版较之前2.1版本更改如下:
  private final  List<SetObserver<E>> observers=new CopyOnWriteArrayList<SetObserver<E>>();

    public void addObserver(SetObserver<E> observer){
        //synchronized(observers){
            observers.add(observer);
        //}
    }
    public boolean removeObserver(SetObserver<E> observer){
        //synchronized (observers){
            return  observers.remove(observer);
        //}

    }
    public void notifyElementAdded(E element){
        //List<SetObserver<E>> snaphot=null;//快照
        //synchronized (observers){
          //  snaphot=new ArrayList<SetObserver<E>>(observers);
        //}
        for(SetObserver<E> observer:observers){
            observer.added(this,element);
        }
    }

当然这个方法也可以改了,因为实际操作的时候底层是重新拷贝,所以也就不需要通过另外一个线程去移除引用了 修改如下:
 set.addObserver(new SetObserver<Integer>() {
            @Override
            public void added(ObservableSet<Integer> s, Integer e) {
                System.out.println(e);
                if(e==23){
                    s.removeObserver(this);
//                    ExecutorService excutor= Executors.newSingleThreadExecutor();
//                    final SetObserver<Integer> observer=this;
//                    try{
//                        excutor.submit(new Runnable() {
//                            @Override
//                            public void run() {
//                                s.removeObserver(observer);
//                            }
//                        }).get();
//
//                    }catch (ExecutionException ex){
//                        throw new AssertionError(ex.getCause());
//                    }catch (InterruptedException ex){
//                        throw new AssertionError(ex.getCause());
//                    }finally {
//                        excutor.shutdown();
//                    }
                }
            }
        });

原文地址:https://www.cnblogs.com/kexb/p/10162685.html

时间: 2024-10-29 15:31:50

java 多线程 同步 观察者 并发集合的一个例子的相关文章

Java多线程与高并发:高并发解决思路

Java多线程与高并发:高并发解决思路 小玲子之凌空蹈虚关注 122018.11.21 09:55:30字数 1,553阅读 4,228 來源:http://www.wangtianyi.top/blog/2018/05/11/javaduo-xian-cheng-yu-gao-bing-fa-liu-gao-bing-fa-jie-jue-si-lu/ 缓存并发 image.png 当大量请求访问同一个没有被缓存的数据的时候,会发送大量请求给数据库,导致数据库压力过大,还会导致一致性问题,所以

Java线程同步和并发第1部分

通过优锐课核心java学习笔记中,我们可以看到,码了很多专业的相关知识, 分享给大家参考学习.我们将分两部分介绍Java中的线程同步,以更好地理解Java的内存模型. 介绍 Java线程同步和并发是复杂应用程序各个设计阶段中讨论最多的主题. 线程,同步技术有很多方面,它们可以在应用程序中实现高并发性. 多年来,CPU(多核处理器,寄存器,高速缓存存储器和主内存(RAM))的发展已导致通常是开发人员往往忽略的某些领域-例如线程上下文,上下文切换,变量可见性,JVM内存 型号与CPU内存型号. 在本

java基础知识回顾之java Thread类学习(六)--java多线程同步函数用的锁

1.验证同步函数使用的锁----普通方法使用的锁 思路:创建两个线程,同时操作同一个资源,还是用卖票的例子来验证.创建好两个线程t1,t2,t1线程走同步代码块操作tickets,t2,线程走同步函数封装的代码操作tickets,同步代码块中的锁我们可以指定.假设我们事先不知道同步函数用的是什么锁:如果在同步代码块中指定的某个锁(测试)和同步函数用的锁相同,就不会出现线程安全问题,如果锁不相同,就会发生线程安全问题. 看下面的代码:t1线程用的同步锁是obj,t2线程在操作同步函数的资源,假设不

转:关于JAVA多线程同步

转:http://lanvis.blog.163.com/blog/static/26982162009798422547/ 因为需要,最近关注了一下JAVA多线程同步问题.JAVA多线程同步主要依赖于若干方法和关键字.将心得记录如下: 1  wait方法:        该方法属于Object的方法,wait方法的作用是使得当前调用wait方法所在部分(代码块)的线程停止执行,并释放当前获得的调用wait所在的代码块的锁,并在其他线程调用notify或者notifyAll方法时恢复到竞争锁状态

Java多线程-同步:synchronized 和线程通信:生产者消费者模式

大家伙周末愉快,小乐又来给大家献上技术大餐.上次是说到了Java多线程的创建和状态|乐字节,接下来,我们再来接着说Java多线程-同步:synchronized 和线程通信:生产者消费者模式. 一.同步:synchronized 多个线程同时访问一个对象,可能造成非线程安全,数据可能错误,所谓同步:就是控制多个线程同时访就是控制多线程操作同一个对象时,注意是同一个对象,数据的准确性, 确保数据安全,但是加入同步后因为需要等待,所以效率相对低下. 如:一个苹果,自己一个人去咬怎么都不会出问题,但是

(转) Java多线程同步与异步

Java线程 同步与异步 线程池1)多线程并发时,多个线程同时请求同一个资源,必然导致此资源的数据不安全,A线程修改了B线 程的处理的数据,而B线程又修改了A线程处理的数理.显然这是由于全局资源造成的,有时为了解 决此问题,优先考虑使用局部变量,退而求其次使用同步代码块,出于这样的安全考虑就必须牺牲 系统处理性能,加在多线程并发时资源挣夺最激烈的地方,这就实现了线程的同步机制 同步:A线程要请求某个资源,但是此资源正在被B线程使用中,因为同步机制存在,A线程请求 不到,怎么办,A线程只能等待下去

聊聊Java里常用的并发集合

前言 在我们的程序开发过程中,如果涉及到多线程环境,那么对于集合框架的使用就必须更加谨慎了,因为大部分的集合类在不施加额外控制的情况下直接在并发环境中直接使用可能会出现数据不一致的问题,所以为了解决这个潜在的问题,我们要么在自己的业务逻辑中加上一些额外的控制,例如锁,或者我们直接使用Java提供的可在并发环境中使用的集合类,这是一个简便而且高效的方法.那么我们下面就来了解下Java提供了哪些“神器”可以让我们安全的使用集合. 正文 非阻塞式安全列表 - ConcurrentLinkedDeque

Java多线程同步的方法

一 synchronized关键字 1.synchronized实现原理: ---基于对象监视器(锁) java中所有对象都自动含有单一的锁,JVM负责跟踪对象被加锁的次数.如果一个对象被解锁,其计数变为0.在任务(线程)第一次给对象加锁的时候, 计数变为1.每当这个相同的任务(线程)在此对象上获得锁时,计数会递增.只有首先获得锁的任务(线程)才能继续获取该对象上的多个锁.每当任务离开时,计数递减,当计数为0的时候,锁被完全释放. Java中每个对象或者类都有一把锁与之相关联,对于对象来说,监视

Java多线程---同步与锁

一,线程的同步是为了防止多个线程访问一个数据对象时,对数据造成的破坏. 二.同步和锁定 1.锁的原理 Java中每个对象都有一个内置锁. 当程序运行到非静态的synchronized同步方法上时,自动获得与正在执行代码类的当前实例(this实例)有关的锁.获得一个对象的锁也称为获取锁.锁定对象.在对象上锁定或在对象上同步. 当程序运行到synchronized同步方法或代码块时该对象锁才起作用. 一个对象只有一个锁.所以,如果一个线程获得该锁,就没有其他线程可以获得锁,直到第一个线程释放(或返回