合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
上一章介绍了 Future 类型,以及如何用它来编写高可读性、高组合性的异步执行代码。 Future 只是整个谜团的一部分:它是一个只读类型,允许你使用它计算得到的值,或者处理计算中出现的错误。但是在这之前,必须得有一种方法把这个值放进去。这一章里,你将会看到如何通过 Promise 类型来达到这个目的。 ### 类型 Promise 之前,我们把一段顺序执行的代码块传递给了 `scala.concurrent` 里的 `future` 方法,并且在作用域中给出了一个 `ExecutionContext`,它神奇地异步调用代码块,返回一个 Future 类型的结果。 虽然这种获得 Future 的方式很简单,但还有其他的方法来创建 Future 实例,并填充它,这就是 Promise。Promise 允许你在 Future 里放入一个值,不过只能做一次,Future 一旦完成,就不能更改了。 一个 Future 实例总是和一个(也只能是一个)Promise 实例关联在一起。如果你在 REPL 里调用 `future` 方法,你会发现返回的也是一个 Promise: ~~~ import concurrent.Future import concurrent.Future scala> import concurrent.future import concurrent.future scala> import concurrent.ExecutionContext.Implicits.global import concurrent.ExecutionContext.Implicits.global scala> val f: Future[String] = future { "Hello World!" } f: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@2b509249 ~~~ 你得到的对象是一个 `DefaultPromise` ,它实现了 `Future` 和 `Promise` 接口,不过这就是具体的实现细节了(译注,有兴趣的读者可翻阅其实现的源码),使用者只需要知道代码实现把 Future 和对应的 Promise 之间的联系分的很清晰。 这个小例子说明了:除了通过 Promise,没有其他方法可以完成一个 Future,`future` 方法也只是一个辅助函数,隐藏了具体的实现机制。 现在,让我们动动手,看看怎样直接使用 Promise 类型。 #### 给出承诺 当我们谈论起承诺能否被兑现时,一个很熟知的例子是那些政客的竞选诺言。 假设被推选的政客给他的投票者一个减税的承诺。这可以用 `Promise[TaxCut]` 表示: ~~~ import concurrent.Promise case class TaxCut(reduction: Int) // either give the type as a type parameter to the factory method: val taxcut = Promise[TaxCut]() // or give the compiler a hint by specifying the type of your val: val taxcut2: Promise[TaxCut] = Promise() // taxcut: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@66ae2a84 // taxcut2: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@346974c6 ~~~ 一旦创建了这个 Promise,就可以在它上面调用 `future` 方法来获取承诺的未来: ~~~ val taxCutF: Future[TaxCut] = taxcut.future // `> scala.concurrent.Future[TaxCut] ` scala.concurrent.impl.Promise$DefaultPromise@66ae2a84 ~~~ 返回的 Future 可能并不和 Promise 一样,但在同一个 Promise 上调用 `future` 方法总是返回同一个对象,以确保 Promise 和 Future 之间一对一的关系。 #### 结束承诺 一旦给出了承诺,并告诉全世界会在不远的将来兑现它,那最好尽力去实现。在 Scala 中,可以结束一个 Promise,无论成功还是失败。 ##### 兑现承诺 为了成功结束一个 Promise,你可以调用它的 `success` 方法,并传递一个大家期许的结果: ~~~ taxcut.success(TaxCut(20)) ~~~ 这样做之后,Promise 就无法再写入其他值了,如果偏要再写,会产生异常。 此时,和 Promise 关联的 Future 也成功完成,注册的回调会开始执行,或者说对这个 Future 进行了映射,那这个时候,映射函数也该执行了。 一般来说,Promise 的完成和对返回的 Future 的处理发生在不同的线程。很可能你创建了 Promise,并立即返回和它关联的 Future 给调用者,而实际上,另外一个线程还在计算它。 为了说明这一点,我们拿减税来举个例子: ~~~ object Government { def redeemCampaignPledge(): Future[TaxCut] = { val p = Promise[TaxCut]() Future { println("Starting the new legislative period.") Thread.sleep(2000) p.success(TaxCut(20)) println("We reduced the taxes! You must reelect us!!!!1111") } p.future } } ~~~ 这个例子中使用了 Future 伴生对象,不过不要被它搞混淆了,这个例子的重点是:Promise 并不是在调用者的线程里完成的。 现在我们来兑现当初的竞选宣言,在 Future 上添加一个 `onComplete` 回调: ~~~ import scala.util.{Success, Failure} val taxCutF: Future[TaxCut] = Government.redeemCampaignPledge() println("Now that they're elected, let's see if they remember their promises...") taxCutF.onComplete { case Success(TaxCut(reduction)) => println(s"A miracle! They really cut our taxes by $reduction percentage points!") case Failure(ex) => println(s"They broke their promises! Again! Because of a ${ex.getMessage}") } ~~~ 多次运行这个例子,会发现显示屏输出的结果顺序是不确定的,而且,最终回调函数会执行,进入成功的那个 case 。 ##### 违背诺言 政客习惯违背诺言,Scala 程序员有时候也只能这样做。调用 `failure` 方法,传递一个异常,结束 Promise: ~~~ case class LameExcuse(msg: String) extends Exception(msg) object Government { def redeemCampaignPledge(): Future[TaxCut] = { val p = Promise[TaxCut]() Future { println("Starting the new legislative period.") Thread.sleep(2000) p.failure(LameExcuse("global economy crisis")) println("We didn't fulfill our promises, but surely they'll understand.") } p.future } } ~~~ 这个 `redeemCampaignPledge` 实现最终会违背承诺。一旦用 `failure` 结束这个 Promise,也无法再次写入了,正如 `success` 方法一样。相关联的 Future 也会以 `Failure` 收场。 如果已经有了一个 Try,那可以直接把它传递给 Promise 的 `complete` 方法,以此来结束这个它。如果这个 Try 是一个 Success,关联的 Future 会成功完成,否则,就失败。 ### 基于 Future 的编程实践 如果想使用基于 Future 的编程范式以增加应用的扩展性,那应用从下到上都必须被设计成非阻塞模式。这意味着,基本上应用层所有的函数都应该是异步的,并且返回 Future。 当下,一个可能的使用场景是开发 Web 应用。流行的 Scala Web 框架,允许你将响应作为 `Future[Response]` 返回,而不是等到你完成响应再返回。这个非常重要,因为它允许 Web 服务器用少量的线程处理更多的连接。通过赋予服务器 `Future[Response]` 的能力,你可以最大化服务器线程池的利用率。 而且,应用的服务可能需要多次调用数据库层以及(或者)某些外部服务,这时候可以获取多个 Future,用 for 语句将它们组合成新的 Future,简单可读!最终,Web 层再将这样的一个 Future 变成 `Future[Response]`。 但是该怎样在实践中实现这些呢?需要考虑三种不同的场景: #### 非阻塞IO 应用很可能涉及到大量的 IO 操作。比如,可能需要和数据库交互,还可能作为客户端去调用其他的 Web 服务。 如果是这样,可以使用一些基于 Java 非阻塞 IO 实现的库,也可以直接或通过 Netty 这样的库来使用 Java 的 NIO API。这样的库可以用定量的线程池处理大量的连接。 但如果是想开发这样的一个库,直接和 Promise 打交道更为合适。 #### 阻塞 IO 有时候,并没有基于 NIO 的库可用。比如,Java 世界里大多数的数据库驱动都是使用阻塞 IO。在 Web 应用中,如果用这样的驱动发起大量访问数据库的调用,要记得这些调用是发生在服务器线程里的。为了避免这个问题,可以将所有需要和数据库交互的代码都放入 `future` 代码块里,就像这样: ~~~ // get back a Future[ResultSet] or something similar: Future { queryDB(query) } ~~~ 到现在为止,我们都是使用隐式可用的全局 `ExecutionContext` 来执行这些代码块。通常,更好的方式是创建一个专用的 `ExecutionContext` 放在数据库层里。可以从 Java的 `ExecutorService` 来它,这也意味着,可以异步的调整线程池来执行数据库调用,应用的其他部分不受影响。 ~~~ import java.util.concurrent.Executors import concurrent.ExecutionContext val executorService = Executors.newFixedThreadPool(4) val executionContext = ExecutionContext.fromExecutorService(executorService) ~~~ #### 长时间运行的计算 取决于应用的本质特点,一个应用偶尔还会调用一些长时间运行的任务,它们完全不涉及 IO(CPU 密集的任务)。这些任务也不应该在服务器线程中执行,因此需要将它们变成 Future: ~~~ Future { longRunningComputation(data, moreData) } ~~~ 同样,最好有一些专属的 `ExecutionContext` 来处理这些 CPU 密集的计算。怎样调整这些线程池大小取决于应用的特征,这些已经超过了本文的范围。 ### 总结 这一章里,我们学习了 Promise - 基于 Future 的并发范式的可写组件,以及怎样用它来完成一个 Future;同时,还给出了一些在实践中使用它们的建议。 下一章会讨论 Scala 函数式编程是如何增加代码可用性(一个长久以来和面向对象编程相关联的概念)的。