scala中java并发编程

Runnable/Callable

Runnable只有一个没有返回值的方法


1

2

3

trait Runnable {

  def run(): Unit

}

Callable的方法和run类似,只不过它有一个返回值


1

2

3

trait Callable[V] {

  def call(): V

}

线程

Scala的并发是建立在Java的并发模型上的。

在Sun的JVM上,对于一个IO密集型的任务,我们可以在单机上运行成千上万的线程。

Thread是通过Runnable构造的。要运行一个Runnable的run方法,你需要调用对应线程的start方法。


1

2

3

4

5

6

7

8

9

scala> val hello = new Thread(new Runnable {

  def run() {

    println("hello world")

  }

})

hello: java.lang.Thread = Thread[Thread-3,5,main]

scala> hello.start

hello world

当你看见一个实现Runnable的类,你应该明白它会被放到一个线程里去执行的。

一段单线程的代码

下面是一段代码片段,它可以运行,但是会有问题。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

import java.net.{Socket, ServerSocket}

import java.util.concurrent.{Executors, ExecutorService}

import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {

  val serverSocket = new ServerSocket(port)

  def run() {

    while (true) {

      // 这里会阻塞直到有连接进来

      val socket = serverSocket.accept()

      (new Handler(socket)).run()

    }

  }

}

class Handler(socket: Socket) extends Runnable {

  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {

    socket.getOutputStream.write(message)

    socket.getOutputStream.close()

  }

}

(new NetworkService(2020, 2)).run

每个请求都会把当前线程的名称main作为响应。

这段代码最大的问题在于一次只能够响应一个请求!

你可以对每个请求都单独用一个线程来响应。只需要把


1

(new Handler(socket)).run()

改成


1

(new Thread(new Handler(socket))).start()

但是如果你想要复用线程或者对于线程的行为要做一些其他的控制呢?

Executors

随着Java 5的发布,对于线程的管理需要一个更加抽象的接口。

你可以通过Executors对象的静态方法来取得一个ExecutorService对象。这些方法可以让你使用各种不同的策略来配置一个ExecutorService,例如线程池。

下面是我们之前的阻塞式网络服务器,现在改写成可以支持并发请求。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

import java.net.{Socket, ServerSocket}

import java.util.concurrent.{Executors, ExecutorService}

import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {

  val serverSocket = new ServerSocket(port)

  val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)

  def run() {

    try {

      while (true) {

        // This will block until a connection comes in.

        val socket = serverSocket.accept()

        pool.execute(new Handler(socket))

      }

    } finally {

      pool.shutdown()

    }

  }

}

class Handler(socket: Socket) extends Runnable {

  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {

    socket.getOutputStream.write(message)

    socket.getOutputStream.close()

  }

}

(new NetworkService(2020, 2)).run

从下面的示例中,我们可以大致了解内部的线程是怎么进行复用的。


1

2

3

4

5

6

7

8

9

10

11

$ nc localhost 2020

pool-1-thread-1

$ nc localhost 2020

pool-1-thread-2

$ nc localhost 2020

pool-1-thread-1

$ nc localhost 2020

pool-1-thread-2

Futures

一个Future代表一次异步计算的操作。你可以把你的操作包装在一个Future里,当你需要结果的时候,你只需要简单调用一个阻塞的get()方法就好了。一个Executor返回一个Future。如果你使用Finagle RPC的话,你可以使用Future的实例来保存还没有到达的结果。

FutureTask是一个可运行的任务,并且被设计成由Executor进行运行。


1

2

3

4

5

val future = new FutureTask[String](new Callable[String]() {

  def call(): String = {

    searcher.search(target);

}})

executor.execute(future)

现在我需要结果,那就只能阻塞到直到结果返回。


1

val blockingResult = future.get()

参考 Scala School中关于Finagle的章节有大量使用Future的示例,也有一些组合使用的例子。Effective Scala中也有关于Futures的内容。

线程安全问题


1

2

3

4

5

class Person(var name: String) {

  def set(changedName: String) {

    name = changedName

  }

}

这个程序在多线程的环境下是不安全的。如果两个线程都有同一个Person示例的引用,并且都调用set方法,你没法预料在两个调用都结束的时候name会是什么。

在Java的内存模型里,每个处理器都允许在它的L1或者L2 cache里缓存变量,所以两个在不同处理器上运行的线程对于相同的数据有种不同的视图。

下面我们来讨论一下可以强制线程的数据视图保持一致的工具。

三个工具

同步

互斥量(Mutex)提供了锁定资源的语法。当你进入一个互斥量的时候,你会获得它。在JVM里使用互斥量最常用的方式就是在一个对象上进行同步访问。在这里,我们会在Person上进行同步访问。

在JVM里,你可以对任何非null的对象进行同步访问。


1

2

3

4

5

6

7

class Person(var name: String) {

  def set(changedName: String) {

    this.synchronized {

      name = changedName

    }

  }

}

volatile

随着Java 5对于内存模型的改变,volatile和synchronized的作用基本相同,除了一点,volatile也可以用在null上。

synchronized提供了更加细粒度的加锁控制。而volatile直接是对每次访问进行控制。


1

2

3

4

5

class Person(@volatile var name: String) {

  def set(changedName: String) {

    name = changedName

  }

}

AtomaticReference

同样的,在Java 5中新增了一系列底层的并发原语。AtomicReference类就是其中一个。


1

2

3

4

5

6

7

import java.util.concurrent.atomic.AtomicReference

class Person(val name: AtomicReference[String]) {

  def set(changedName: String) {

    name.set(changedName)

  }

}

它们都有额外的消耗吗?

AutomicReference是这两种方式中最耗性能的,因为如果你要取得对应的值,则需要经过方法分派(method dispatch)的过程。

volatilesynchronized都是通过Java内置的monitor来实现的。在没有竞争的情况下,monitor对性能的影响非常小。由于synchronized允许你对代码进行更加细粒度的加锁控制,这样就可以减小加锁区,进而减小竞争,因此synchronized应该是最佳的选择。

当你进入同步块,访问volatile引用,或者引用AtomicReference,Java会强制要求处理器刷新它们的缓存流水线,从而保证数据的一致性。

如果我这里说错了,请指正出来。这是一个很复杂的主题,对于这个主题肯定需要花费大量的时间来进行讨论。

其他来自Java 5的优秀工具

之前提到了AtomicReference,除了它之外,Java 5还提供了很多其他有用的工具。

CountDownLatch

CountDownLatch是供多个进程进行通信的一个简单机制。


1

2

3

4

5

6

val doneSignal = new CountDownLatch(2)

doAsyncWork(1)

doAsyncWork(2)

doneSignal.await()

println("both workers finished!")

除此之外,它对于单元测试也是很有用的。假设你在做一些异步的工作,并且你想要保证所有的功能都完成了。你只需要让你的函数都对latch进行countDown操作,然后在你的测试代码里进行await

AtomicInteger/Long

由于对于Int和Long的自增操作比较常见,所以就增加了AtomicIntegerAtomicLong

AtomicBoolean

我想我没有必要来解释这个的作用了。

读写锁(ReadWriteLock)

ReadWriteLock可以实现读写锁,读操作只会在写者加锁的时候进行阻塞。

我们来构建一个非线程安全的搜索引擎

这是一个简单的非线程安全的倒排索引。我们这个反向排索引把名字的一部分映射到指定的用户。

下面是原生的假设只有单线程访问的写法。

注意这里的使用mutable.HashMap的另一个构造函数this()


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import scala.collection.mutable

case class User(name: String, id: Int)

class InvertedIndex(val userMap: mutable.Map[String, User]) {

  def this() = this(new mutable.HashMap[String, User])

  def tokenizeName(name: String): Seq[String] = {

    name.split(" ").map(_.toLowerCase)

  }

  def add(term: String, user: User) {

    userMap += term -> user

  }

  def add(user: User) {

    tokenizeName(user.name).foreach { term =>

      add(term, user)

    }

  }

}

我把具体怎么根据索引获取用户的方法暂时省略掉了,我们后面会来进行补充。

我们来让它变得安全

在上面的倒排索引的示例里,userMap是没法保证线程安全的。多个客户端可以同时尝试去添加元素,这样会产生和之前Person示例里相似的问题。

因为userMap本身不是线程安全的,那么我们怎么能够保证每次只有一个线程对它进行修改呢?

你需要在添加元素的时候给userMap加锁。


1

2

3

4

5

6

7

def add(user: User) {

  userMap.synchronized {

    tokenizeName(user.name).foreach { term =>

      add(term, user)

    }

  }

}

不幸的是,上面的做法有点太粗糙了。能在互斥量(mutex)外面做的工作尽量都放在外面做。记住我之前说过,如果没有竞争的话,加锁的代价是非常小的。如果你在临界区尽量少做操作,那么竞争就会非常少。


1

2

3

4

5

6

7

8

9

10

11

def add(user: User) {

  // tokenizeName was measured to be the most expensive operation.

  // tokenizeName 这个操作是最耗时的。

  val tokens = tokenizeName(user.name)

  tokens.foreach { term =>

    userMap.synchronized {

      add(term, user)

    }

  }

}

SynchronizedMap

我们可以通过使用SynchronizedMap trait来使得一个可变的(mutable)HashMap具有同步机制。

我们可以扩展之前的InvertedIndex,给用户提供一种构建同步索引的简单方法。


1

2

3

4

5

import scala.collection.mutable.SynchronizedMap

class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {

  def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])

}

如果你去看具体的实现的话,你会发现SynchronizedMap只是在每个方法上都加上了同步访问,因此它的安全是以牺牲性能为代价的。

Java ConcurrentHashMap

Java里有一个很不错的线程安全的ConcurrentHashMap。幸运的是,JavaConverter可以使得我们通过Scala的语法来使用它。

实际上,我们可以无缝地把我们新的,线程安全的InvertedIndex作为老的非线程安全的一个扩展。


1

2

3

4

5

6

7

8

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])

    extends InvertedIndex(userMap) {

  def this() = this(new ConcurrentHashMap[String, User] asScala)

}

现在来加载我们的InvertedIndex

最原始的方法


1

2

3

4

5

6

7

8

9

10

11

12

13

trait UserMaker {

  def makeUser(line: String) = line.split(",") match {

    case Array(name, userid) => User(name, userid.trim().toInt)

  }

}

class FileRecordProducer(path: String) extends UserMaker {

  def run() {

    Source.fromFile(path, "utf-8").getLines.foreach { line =>

      index.add(makeUser(line))

    }

  }

}

对于文件里的每一行字符串,我们通过调用makeUser来生成一个User,然后通过add添加到InvertedIndex里。如果我们并发访问一个InvertedIndex,我们可以并行调用add方法,因为makeUser方法没有副作用,它本身就是线程安全的。

我们不能并行读取一个文件,但是我们可以并行构造User,并且并行将它添加到索引里。

解决方案:生产者/消费者

实现非同步计算的,通常采用的方法就是将生产者同消费者分开,并让它们通过队列(queue)来进行通信。让我们用下面的例子来说明我们是怎么实现搜索引擎的索引的。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

// Concrete producer

class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {

  def run() {

    Source.fromFile(path, "utf-8").getLines.foreach { line =>

      queue.put(line)

    }

  }

}

// 抽象的消费者

abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {

  def run() {

    while (true) {

      val item = queue.take()

      consume(item)

    }

  }

  def consume(x: T)

}

val queue = new LinkedBlockingQueue[String]()

//一个生产者线程

val producer = new Producer[String]("users.txt", q)

new Thread(producer).start()

trait UserMaker {

  def makeUser(line: String) = line.split(",") match {

    case Array(name, userid) => User(name, userid.trim().toInt)

  }

}

class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {

  def consume(t: String) = index.add(makeUser(t))

}

// 假设我们的机器有8个核

val cores = 8

val pool = Executors.newFixedThreadPool(cores)

// 每个核设置一个消费者

for (i <- i to cores) {

  pool.submit(new IndexerConsumer[String](index, q))

}

原文链接: Scala School 翻译: ImportNew.com朱伟杰
译文链接: http://www.importnew.com/4750.html

时间: 2024-10-12 18:11:46

scala中java并发编程的相关文章

《Java并发编程实战》第三章 对象的共享 读书笔记

一.可见性 什么是可见性? Java线程安全须要防止某个线程正在使用对象状态而还有一个线程在同一时候改动该状态,并且须要确保当一个线程改动了对象的状态后,其它线程能够看到发生的状态变化. 后者就是可见性的描写叙述即多线程能够实时获取其它线程改动后的状态. *** 待补充   两个工人同一时候记录生产产品总数问题 1. 失效数据 可见性出现故障就是其它线程没有获取到改动后的状态,更直观的描写叙述就是其它线程获取到的数据是失效数据. 2. 非原子64位操作 3. 加锁与可见性 比如在一个变量的读取与

Java并发编程之线程创建和启动(Thread、Runnable、Callable和Future)

这一系列的文章暂不涉及Java多线程开发中的底层原理以及JMM.JVM部分的解析(将另文总结),主要关注实际编码中Java并发编程的核心知识点和应知应会部分. 说在前面,Java并发编程的实质,是线程对象调用start方法启动多线程,而线程对象则必须是Thread类或其子类实现.Runnable和Callable的作用类似于Comparable.Serializable,是用于被并发的类实现的接口,从而使得Thread类可以在初始化时传入这个被并发的类.此是大前提.本文从多线程实现和启动出发,对

读Java并发编程实践中,向已有线程安全类添加功能--客户端加锁实现示例

在Java并发编程实践中4.4中提到向客户端加锁的方法.此为验证示例,写的不好,但可以看出结果来. package com.blackbread.test; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public

转: 【Java并发编程】之十八:第五篇中volatile意外问题的正确分析解答(含代码)

转载请注明出处:http://blog.csdn.net/ns_code/article/details/17382679 在<Java并发编程学习笔记之五:volatile变量修饰符-意料之外的问题>一文中遗留了一个问题,就是volatile只修饰了missedIt变量,而没修饰value变量,但是在线程读取value的值的时候,也读到的是最新的数据.但是在网上查了很多资料都无果,看来很多人对volatile的规则并不是太清晰,或者说只停留在很表面的层次,一知半解. 这两天看<深入Ja

java并发编程中CountDownLatch和CyclicBarrier的使用

转自:http://blog.csdn.net/hbzyaxiu520/article/details/6183714 在多线程程序设计中,经常会遇到一个线程等待一个或多个线程的场景,遇到这样的场景应该如何解决? 如果是一个线程等待一个线程,则可以通过await()和notify()来实现: 如果是一个线程等待多个线程,则就可以使用CountDownLatch和CyclicBarrier来实现比较好的控制. 下面来详细描述下CountDownLatch的应用场景: 例如:百米赛跑:8名运动员同时

Java并发编程(8):多线程环境中安全使用集合API(含代码)

Java并发编程(8):多线程环境中安全使用集合API(含代码)JAVA大数据中高级架构 2018-11-09 14:44:47在集合API中,最初设计的Vector和Hashtable是多线程安全的.例如:对于Vector来说,用来添加和删除元素的方法是同步的.如果只有一个线程与Vector的实例交互,那么,要求获取和释放对象锁便是一种浪费,另外在不必要的时候如果滥用同步化,也有可能会带来死锁.因此,对于更改集合内容的方法,没有一个是同步化的.集合本质上是非多线程安全的,当多个线程与集合交互时

Java并发编程学习路线

一年前由于工作需要从微软技术栈入坑Java,并陆陆续续做了一个Java后台项目,目前在搞Scala+Java混合的后台开发,一直觉得并发编程是所有后台工程师的基本功,所以也学习了小一年Java的并发工具,对整体的并发理解乃至分布式都有一定的提高,所以想和大家分享一下. 我的学习路线 首先说说学习路线,我一开始是直接上手JCIP(Java Concurrency in Practice),发现不是很好懂,把握不了那本书的主线,所以思索着从国内的作者开始先,所以便读了下方腾飞的<Java并发编程的艺

基于JVM原理JMM模型和CPU缓存模型深入理解Java并发编程

许多以Java多线程开发为主题的技术书籍,都会把对Java虚拟机和Java内存模型的讲解,作为讲授Java并发编程开发的主要内容,有的还深入到计算机系统的内存.CPU.缓存等予以说明.实际上,在实际的Java开发工作中,仅仅了解并发编程的创建.启动.管理和通信等基本知识还是不够的.一方面,如果要开发出高效.安全的并发程序,就必须深入Java内存模型和Java虚拟机的工作原理,从底层了解并发编程的实质:更进一步地,在现今大数据的时代,要开发出高并发.高可用.考可靠的分布式应用及各种中间件,更需要深

Java并发编程:Concurrent锁机制解析

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd