第13章——使用Actor编程

在编写复杂、耗时的应用程序时,我们经常会使用多线程以及并发来降低响应时间或者 提高性能。可惜,传统的并发解决方案导致了一些问题,如线程安全、竞态条件、死锁、活 锁以及不容易理解的、容易出错的代码。共享的可变性是罪魁祸首。

13.1 一个顺序耗时问题

ProgrammingActors/CountFilesSequential.scala

import java.io.File

def getChildren(file: File) = {
  val children = file.listFiles()
  if (children != null) children.toList else List()
}

val start = System.nanoTime
val exploreFrom = new File(args(0))

var count = 0L
var filesToVisit = List(exploreFrom)

while (filesToVisit.nonEmpty) {
  val head = filesToVisit.head
  filesToVisit = filesToVisit.tail

  val children = getChildren(head)
  count = count + children.count { !_.isDirectory }
  filesToVisit = filesToVisit ::: children.filter { _.isDirectory }
}

val end = System.nanoTime
println(s"Number of files found: $count")
println(s"Time taken: ${(end - start) / 1.0e9} seconds")
Full source at GitHub

执行命令

scala countFilesSequential.scala /Users/venkats/agility
Full source at GitHub

运行结果

Number of files found: 479758
Time taken: 66.524453436 seconds
Full source at GitHub

13.3 创建 Actor

ProgrammingActors/HollywoodActor.scala

import akka.actor._

class HollywoodActor() extends Actor {
  def receive: Receive = {
    case message ⇒ println(s"playing the role of $message")
  }
}
Full source at GitHub

执行命令——编译

scalac -d classes HollywoodActor.scala CreateActors.scala
Full source at GitHub

执行命令——执行

scala -classpath classes CreateActors
Full source at GitHub

ProgrammingActors/CreateActors.scala

import akka.actor._

import scala.concurrent.Await
import scala.concurrent.duration.Duration

object CreateActors extends App {
  val system = ActorSystem("sample")

  val depp = system.actorOf(Props[HollywoodActor])

  depp ! "Wonka"

  val terminateFuture = system.terminate()
  Await.ready(terminateFuture, Duration.Inf)
}
Full source at GitHub

运行结果

playing the role of Wonka
Full source at GitHub

ProgrammingActors/HollywoodActor2.scala

case message ⇒ println(s"$message - ${Thread.currentThread}")
Full source at GitHub

ProgrammingActors/CreateActors2.scala

val depp = system.actorOf(Props[HollywoodActor])
val hanks = system.actorOf(Props[HollywoodActor])

depp ! "Wonka"
hanks ! "Gump"

depp ! "Sparrow"
hanks ! "Phillips"
println(s"Calling from ${Thread.currentThread}")
Full source at GitHub

运行结果

Wonka - Thread[sample-akka.actor.default-dispatcher-2,5,main]
Gump - Thread[sample-akka.actor.default-dispatcher-3,5,main]
Calling from Thread[main,5,main]
Phillips - Thread[sample-akka.actor.default-dispatcher-3,5,main]
Sparrow - Thread[sample-akka.actor.default-dispatcher-2,5,main]
Full source at GitHub

13.4 Actor 和线程

ProgrammingActors/CreateActors3.scala

depp ! "Wonka"
hanks ! "Gump"

Thread.sleep(100)

depp ! "Sparrow"
hanks ! "Phillips"
Full source at GitHub

运行结果

Wonka - Thread[sample-akka.actor.default-dispatcher-3,5,main]
Gump - Thread[sample-akka.actor.default-dispatcher-4,5,main]
Sparrow - Thread[sample-akka.actor.default-dispatcher-4,5,main]
Phillips - Thread[sample-akka.actor.default-dispatcher-3,5,main]
Calling from Thread[main,5,main]
Full source at GitHub

13.5 隔离可变性

ProgrammingActors/HollywoodActor4.scala

import akka.actor._
import scala.collection._

case class Play(role: String)
case class ReportCount(role: String)

class HollywoodActor() extends Actor {
  val messagesCount: mutable.Map[String, Int] = mutable.Map()

  def receive: Receive = {
    case Play(role) ⇒
      val currentCount = messagesCount.getOrElse(role, 0)
      messagesCount.update(role, currentCount + 1)
      println(s"Playing $role")

    case ReportCount(role) ⇒
      sender ! messagesCount.getOrElse(role, 0)
  }
}
Full source at GitHub

ProgrammingActors/UseActor.scala

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration._

object UseActor extends App {

  case class Play(role: String)

  case class ReportCount(role: String)

  val system = ActorSystem("sample")

  val depp = system.actorOf(Props[HollywoodActor])
  val hanks = system.actorOf(Props[HollywoodActor])

  depp ! Play("Wonka")
  hanks ! Play("Gump")

  depp ! Play("Wonka")
  depp ! Play("Sparrow")

  println("Sent roles to play")

  implicit val timeout: Timeout = Timeout(2.seconds)
  val wonkaFuture = depp ? ReportCount("Wonka")
  val sparrowFuture = depp ? ReportCount("Sparrow")
  val gumpFuture = hanks ? ReportCount("Gump")

  val wonkaCount = Await.result(wonkaFuture, timeout.duration)
  val sparrowCount = Await.result(sparrowFuture, timeout.duration)
  val gumpCount = Await.result(gumpFuture, timeout.duration)

  println(s"Depp played Wonka $wonkaCount time(s)")
  println(s"Depp played Sparrow $sparrowCount time(s)")
  println(s"Hanks played Gump $gumpCount time(s)")

  val terminateFuture = system.terminate()
  Await.ready(terminateFuture, Duration.Inf)
}

case class Play(role: String)

case class ReportCount(role: String)
Full source at GitHub

运行结果

Sent roles to play
Playing Wonka
Playing Gump
Playing Wonka
Playing Sparrow
Depp played Wonka 2 time(s)
Depp played Sparrow 1 time(s)
Hanks played Gump 1 time(s)
Full source at GitHub

13.6 使用 Actor 模型进行并发

ProgrammingActors/FileExplorer.scala

import akka.actor._
import java.io._

class FileExplorer extends Actor {
  def receive: Receive = {
    case dirName: String ⇒
      val file = new File(dirName)
      val children = file.listFiles()
      var filesCount = 0

      if (children != null) {
        children.filter { _.isDirectory }
          .foreach { sender ! _.getAbsolutePath }
        filesCount = children.count { !_.isDirectory }
      }

      sender ! filesCount
  }
}
Full source at GitHub

ProgrammingActors/FilesCounter.scala

import akka.actor._
import akka.routing._

class FilesCounter extends Actor {
  val start: Long = System.nanoTime
  var filesCount = 0L
  var pending = 0

  val fileExplorers: ActorRef =
    context.actorOf(RoundRobinPool(100).props(Props[FileExplorer]))

  def receive: Receive = {
    case dirName: String ⇒
      pending = pending + 1
      fileExplorers ! dirName

    case count: Int ⇒
      filesCount = filesCount + count
      pending = pending - 1

      if (pending == 0) {
        val end = System.nanoTime
        println(s"Files count: $filesCount")
        println(s"Time taken: ${(end - start) / 1.0e9} seconds")
        context.system.terminate()
      }
  }
}
Full source at GitHub

ProgrammingActors/CountFiles.scala

import akka.actor._

object CountFiles extends App {
  val system = ActorSystem("sample")

  val filesCounter = system.actorOf(Props[FilesCounter])

  filesCounter ! args(0)
}
Full source at GitHub

执行命令

scalac -d classes FilesCounter.scala FileExplorer.scala CountFiles.scala
scala -classpath classes CountFiles /Users/venkats/agility
Full source at GitHub

运行结果

Files count: 479758
Time taken: 5.609851764 seconds
Full source at GitHub