首先来一个生产者消费者的例子:
Producer:
class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); override def run() : Unit = { importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } }
Consumer:
class Consumer(drop : Drop) extends Runnable { override def run() : Unit = { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } }
Drop:
class Drop { var message : String = "" var empty : Boolean = true var lock : AnyRef = new Object() def put(x: String) : Unit = lock.synchronized { // Wait until message has been retrieved await (empty == true) // Toggle status empty = false // Store message message = x // Notify consumer that status has changed lock.notifyAll() } def take() : String = lock.synchronized { // Wait until message is available. await (empty == false) // Toggle status empty=true // Notify producer that staus has changed lock.notifyAll() // Return the message message } private def await(cond: => Boolean) = while (!cond) { lock.wait() } }
主Object:
object ProdConSample { def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop(); // Spawn Producer new Thread(new Producer(drop)).start(); // Spawn Consumer new Thread(new Consumer(drop)).start(); } }
Producer
和 Consumer
类几乎与它们的
Java 同类相同,再一次扩展(实现)了 Runnable
接口并覆盖了 run()
方法,并且
—对于 Producer
的情况
—分别使用了内置迭代方法来遍历 importantInfo
数组的内容。(实际上,为了让它更像
Scala,importantInfo
可能应该是一个 List
而不是 Array
,但在第一次尝试时,我希望尽可能保证它们与原始
Java 代码一致。)
Drop
类同样类似于它的
Java 版本。但 Scala 中有一些例外,“synchronized” 并不是关键字,它是针对 AnyRef
类定义的一个方法,即
Scala “所有引用类型的根”。这意味着,要同步某个特定的对象,您只需要对该对象调用同步方法;在本例中,对 Drop
上的
lock 字段中所保存的对象调用同步方法。
注意,我们在 await()
方法定义的 Drop
类中还利用了一种
Scala 机制:cond
参数是等待计算的代码块,而不是在传递给该方法之前进行计算。在
Scala 中,这被称作 “call-by-name”;此处,它是一种实用的方法,可以捕获需要在 Java 版本中表示两次的条件等待逻辑(分别用于 put
和take
)。
最后,在 main()
中,创建 Drop
实例,实例化两个线程,使用 start()
启动它们,然后在 main()
的结束部分退出,相信
JVM 会在 main()
结束之前启动这两个线程。(在生产代码中,可能无法保证这种情况,但对于这样的简单的例子,99.99
% 没有问题。)
但是,已经说过,仍然存在相同的基本问题:程序员仍然需要过分担心两个线程之间的通信和协调问题。虽然一些 Scala 机制可以简化语法,但这目前为止并没有相当大的吸引力
Scala 并发性 v2
Scala Library Reference 中有一个有趣的包:scala.concurrency
。这个包包含许多不同的并发性结构,包括我们即将利用的 MailBox
类。
顾名思义,MailBox
从本质上说就是 Drop
,用于在检测之前保存数据块的单槽缓冲区。但是,MailBox
最大的优势在于它将发送和接收数据的细节完全封装到模式匹配和
case 类中,这使它比简单的 Drop
(或 Drop
的多槽数据保存类 java.util.concurrent.BoundedBuffer
)更加灵活。
package com.tedneward.scalaexamples.scala.V2 { import concurrent.{MailBox, ops} object ProdConSample { class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); override def run() : Unit = { importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } } class Consumer(drop : Drop) extends Runnable { override def run() : Unit = { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } class Drop { private val m = new MailBox() private case class Empty() private case class Full(x : String) m send Empty() // initialization def put(msg : String) : Unit = { m receive { case Empty() => m send Full(msg) } } def take() : String = { m receive { case Full(msg) => m send Empty(); msg } } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop() // Spawn Producer new Thread(new Producer(drop)).start(); // Spawn Consumer new Thread(new Consumer(drop)).start(); } } }
此处,v2 和 v1 之间的惟一区别在于 Drop
的实现,它现在利用 MailBox
类处理传入以及从 Drop
中删除的消息的阻塞和信号事务。(我们可以重写 Producer
和 Consumer
,让它们直接使用 MailBox
,但考虑到简单性,我们假定希望保持所有示例中的 Drop
API
相一致。)使用 MailBox
与使用典型的 BoundedBuffer
(Drop
)稍有不同,因此我们来仔细看看其代码。
MailBox
有两个基本操作:send
和 receive
。receiveWithin
方法仅仅是基于超时的 receive
。MailBox
接收任何类型的消息。send()
方法将消息放置到邮箱中,并立即通知任何关心该类型消息的等待接收者,并将它附加到一个消息链表中以便稍后检索。receive()
方法将阻塞,直到接收到对于功能块合适的消息。
因此,在这种情况下,我们将创建两个 case 类,一个不包含任何内容(Empty
),这表示 MailBox
为空,另一个包含消息数据(Full
。
-
put
方法,由于它会将数据放置在Drop
中,对MailBox
调用receive()
以查找Empty
实例,因此会阻塞直到发送Empty
。此时,它发送一个Full
实例给包含新数据的MailBox
。 -
take
方法,由于它会从Drop
中删除数据,对MailBox
调用receive()
以查找Full
实例,提取消息(再次得益于模式匹配从
case 类内部提取值并将它们绑到本地变量的能力)并发送一个Empty
实例给MailBox
。
不需要明确的锁定,并且不需要考虑监控程序。
Scala 并发性 v3
事实上,我们可以显著缩短代码,只要 Producer
和 Consumer
不需要功能全面的类(此处便是如此)
—两者从本质上说都是 Runnable.run()
方法的瘦包装器,Scala
可以使用 scala.concurrent.ops
对象的 spawn
方法来实现
package com.tedneward.scalaexamples.scala.V3 { import concurrent.MailBox import concurrent.ops._ object ProdConSample { class Drop { private val m = new MailBox() private case class Empty() private case class Full(x : String) m send Empty() // initialization def put(msg : String) : Unit = { m receive { case Empty() => m send Full(msg) } } def take() : String = { m receive { case Full(msg) => m send Empty(); msg } } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop() // Spawn Producer spawn { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } // Spawn Consumer spawn { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } } }
spawn
方法(通过包块顶部的 ops
对象导入)接收一个代码块(另一个
by-name 参数示例)并将它包装在匿名构造的线程对象的 run()
方法内部。事实上,并不难理解 spawn
的定义在 ops
类的内部是什么样的:
<span style="background-color: rgb(153, 153, 153);"> def spawn(p: => Unit) = { val t = new Thread() { override def run() = p } t.start() }</span>
事实上,Scala 的并发性支持超越了 MailBox
和 ops
类;Scala
还支持一个类似的 “Actors” 概念,它使用了与 MailBox
所采用的方法相类似的消息传递方法,但应用更加全面并且灵活性也更好。