现在的位置: 首页 > 综合 > 正文

scala学习十一 并发编程

2019年05月24日 ⁄ 综合 ⁄ 共 4937字 ⁄ 字号 评论关闭

首先来一个生产者消费者的例子:

Producer:

  class Producer(drop : Drop) 
    extends Runnable 
  { 
    val importantInfo : Array[String] = Array( 
      "Mares eat oats", 
      "Does eat oats", 
      "Little lambs eat ivy", 
      "A kid will eat ivy too"
    ); 
  
    override def run() : Unit = 
    { 
      importantInfo.foreach((msg) => drop.put(msg)) 
      drop.put("DONE") 
    } 
  } 

Consumer:

  class Consumer(drop : Drop) 
    extends Runnable 
  { 
    override def run() : Unit = 
    { 
      var message = drop.take() 
      while (message != "DONE") 
      { 
        System.out.format("MESSAGE RECEIVED: %s%n", message) 
        message = drop.take() 
      } 
    } 
  } 

Drop:

 class Drop 
  { 
    var message : String = ""
    var empty : Boolean = true 
    var lock : AnyRef = new Object() 
  
    def put(x: String) : Unit = 
      lock.synchronized 
      { 
        // Wait until message has been retrieved 
        await (empty == true) 
        // Toggle status 
        empty = false 
        // Store message 
        message = x 
        // Notify consumer that status has changed 
        lock.notifyAll() 
      } 

    def take() : String = 
      lock.synchronized 
      { 
        // Wait until message is available. 
        await (empty == false) 
        // Toggle status 
        empty=true 
        // Notify producer that staus has changed 
        lock.notifyAll() 
        // Return the message 
        message 
      } 

    private def await(cond: => Boolean) = 
      while (!cond) { lock.wait() } 
  } 

主Object:

object ProdConSample 
 { 
  def main(args : Array[String]) : Unit = 
  { 
    // Create Drop 
    val drop = new Drop(); 
  
    // Spawn Producer 
    new Thread(new Producer(drop)).start(); 
    
    // Spawn Consumer 
    new Thread(new Consumer(drop)).start(); 
  } 
 }

Producer和 Consumer类几乎与它们的
Java 同类相同,再一次扩展(实现)了 Runnable接口并覆盖了 run()方法,并且
—对于 Producer的情况
—分别使用了内置迭代方法来遍历 importantInfo数组的内容。(实际上,为了让它更像
Scala,importantInfo可能应该是一个 List而不是 Array,但在第一次尝试时,我希望尽可能保证它们与原始
Java 代码一致。)

Drop类同样类似于它的
Java 版本。但 Scala 中有一些例外,“synchronized” 并不是关键字,它是针对 AnyRef类定义的一个方法,即
Scala “所有引用类型的根”。这意味着,要同步某个特定的对象,您只需要对该对象调用同步方法;在本例中,对 Drop上的
lock 字段中所保存的对象调用同步方法

意,我们在 await()方法定义的 Drop类中还利用了一种
Scala 机制:cond参数是等待计算的代码块,而不是在传递给该方法之前进行计算。在
Scala 中,这被称作 “call-by-name”;此处,它是一种实用的方法,可以捕获需要在 Java 版本中表示两次的条件等待逻辑(分别用于 puttake)。

最后,在 main()中,创建 Drop实例,实例化两个线程,使用 start()启动它们,然后在 main()的结束部分退出,相信
JVM 会在 main()结束之前启动这两个线程。(在生产代码中,可能无法保证这种情况,但对于这样的简单的例子,99.99
% 没有问题。)

但是,已经说过,仍然存在相同的基本问题:程序员仍然需要过分担心两个线程之间的通信和协调问题。虽然一些 Scala 机制可以简化语法,但这目前为止并没有相当大的吸引力

Scala 并发性 v2

Scala Library Reference 中有一个有趣的包:scala.concurrency。这个包包含许多不同的并发性结构,包括我们即将利用的 MailBox类。

顾名思义,MailBox从本质上说就是 Drop,用于在检测之前保存数据块的单槽缓冲区。但是,MailBox最大的优势在于它将发送和接收数据的细节完全封装到模式匹配和
case 类中,这使它比简单的 Drop(或 Drop的多槽数据保存类 java.util.concurrent.BoundedBuffer)更加灵活。

 package com.tedneward.scalaexamples.scala.V2 
 { 
  import concurrent.{MailBox, ops} 

  object ProdConSample 
  { 
    class Producer(drop : Drop) 
      extends Runnable 
    { 
      val importantInfo : Array[String] = Array( 
        "Mares eat oats", 
        "Does eat oats", 
        "Little lambs eat ivy", 
        "A kid will eat ivy too"
      ); 
    
      override def run() : Unit = 
      { 
        importantInfo.foreach((msg) => drop.put(msg)) 
        drop.put("DONE") 
      } 
    } 
    
    class Consumer(drop : Drop) 
      extends Runnable 
    { 
      override def run() : Unit = 
      { 
        var message = drop.take() 
        while (message != "DONE") 
        { 
          System.out.format("MESSAGE RECEIVED: %s%n", message) 
          message = drop.take() 
        } 
      } 
    } 

    class Drop 
    { 
      private val m = new MailBox() 
      
      private case class Empty() 
      private case class Full(x : String) 
      
      m send Empty()  // initialization 
      
      def put(msg : String) : Unit = 
      { 
        m receive 
        { 
          case Empty() => 
            m send Full(msg) 
        } 
      } 
      
      def take() : String = 
      { 
        m receive 
        { 
          case Full(msg) => 
            m send Empty(); msg 
        } 
      } 
    } 
  
    def main(args : Array[String]) : Unit = 
    { 
      // Create Drop 
      val drop = new Drop() 
      
      // Spawn Producer 
      new Thread(new Producer(drop)).start(); 
      
      // Spawn Consumer 
      new Thread(new Consumer(drop)).start(); 
    } 
  } 
 }

此处,v2 和 v1 之间的惟一区别在于 Drop的实现,它现在利用 MailBox类处理传入以及从 Drop中删除的消息的阻塞和信号事务。(我们可以重写 Producer和 Consumer,让它们直接使用 MailBox,但考虑到简单性,我们假定希望保持所有示例中的 DropAPI
相一致。)使用 MailBox与使用典型的 BoundedBufferDrop)稍有不同,因此我们来仔细看看其代码。

MailBox有两个基本操作:send和 receivereceiveWithin 方法仅仅是基于超时的 receiveMailBox接收任何类型的消息。send()方法将消息放置到邮箱中,并立即通知任何关心该类型消息的等待接收者,并将它附加到一个消息链表中以便稍后检索。receive()方法将阻塞,直到接收到对于功能块合适的消息。

因此,在这种情况下,我们将创建两个 case 类,一个不包含任何内容(Empty),这表示 MailBox为空,另一个包含消息数据(Full

  • put方法,由于它会将数据放置在 Drop中,对 MailBox调用 receive()以查找 Empty实例,因此会阻塞直到发送 Empty。此时,它发送一个Full实例给包含新数据的 MailBox
  • take方法,由于它会从 Drop中删除数据,对 MailBox调用 receive()以查找 Full实例,提取消息(再次得益于模式匹配从
    case 类内部提取值并将它们绑到本地变量的能力)并发送一个 Empty 实例给 MailBox

不需要明确的锁定,并且不需要考虑监控程序。

Scala 并发性 v3

事实上,我们可以显著缩短代码,只要 Producer 和 Consumer不需要功能全面的类(此处便是如此)
—两者从本质上说都是 Runnable.run()方法的瘦包装器,Scala
可以使用 scala.concurrent.ops对象的 spawn方法来实现

 package com.tedneward.scalaexamples.scala.V3 
 { 
  import concurrent.MailBox 
  import concurrent.ops._ 

  object ProdConSample 
  { 
    class Drop 
    { 
      private val m = new MailBox() 
      
      private case class Empty() 
      private case class Full(x : String) 
      
      m send Empty()  // initialization 
      
      def put(msg : String) : Unit = 
      { 
        m receive 
        { 
          case Empty() => 
            m send Full(msg) 
        } 
      } 
      
      def take() : String = 
      { 
        m receive 
        { 
          case Full(msg) => 
            m send Empty(); msg 
        } 
      } 
    } 
  
    def main(args : Array[String]) : Unit = 
    { 
      // Create Drop 
      val drop = new Drop() 
      
      // Spawn Producer 
      spawn 
      { 
        val importantInfo : Array[String] = Array( 
          "Mares eat oats", 
          "Does eat oats", 
          "Little lambs eat ivy", 
          "A kid will eat ivy too"
        ); 
        
        importantInfo.foreach((msg) => drop.put(msg)) 
        drop.put("DONE") 
      } 
      
      // Spawn Consumer 
      spawn 
      { 
        var message = drop.take() 
        while (message != "DONE") 
        { 
          System.out.format("MESSAGE RECEIVED: %s%n", message) 
          message = drop.take() 
        } 
      } 
    } 
  } 
 }

spawn方法(通过包块顶部的 ops对象导入)接收一个代码块(另一个
by-name 参数示例)并将它包装在匿名构造的线程对象的 
run()方法内部。事实上,并不难理解 spawn的定义在 ops类的内部是什么样的:

<span style="background-color: rgb(153, 153, 153);">  def spawn(p: => Unit) = { 
    val t = new Thread() { override def run() = p } 
    t.start() 
  }</span>

事实上,Scala 的并发性支持超越了 MailBox和 ops类;Scala
还支持一个类似的 “Actors” 概念,它使用了与 
MailBox所采用的方法相类似的消息传递方法,但应用更加全面并且灵活性也更好。

抱歉!评论已关闭.