并发编程是很困难的,特别是在你没有很好的设计与抽像你的功能层次时。传统的并发解决方案是采用多线程和共享变量,这使得随着代码的增加你很难找到错误根源。
Scala中采用了更好的方案,它不是只基于更低层次的线程的。Scala为用户提供了更高级的抽象:Futures和Promises(Akka还提供了基于actor模式的编程范式,是一种更高层次的并发编程抽象。本文主要讲解Futures和Promises,这里提供一些进一步学习的参考)。
Future
Future是持有某个值的对象,它完成一些计算并允许在“将来”的某一个时刻获取结果。它也可以是其它计算结果的结果(简单点说就是多个Future可以嵌套)。创建一个Future最简单的方式就调用它的apply方法,它将直接开始一个异步计算,并返回一个Future对象,它将包含计算结果。
1 | import scala.concurrent.ExecutionContext.Implicits.global |
第一行导入了ExecutionContext.Implicits.global作为当前环境的一个默认执行上下文。现在先暂时不管它的具体含意,只要知道它会提供一个线程池,所有任务最终都会被提交给它来异步执行就可以了。在这个示例中先定义computation函数,并在Future { ... }代码块中调用。程序会使用上下文中的找到的线程池(由ExecutionContext.Implicits.global导入)并马上开始异步执行。
访问Future的结果
前面说了,Future返回值有两种类型:Success和Failure。而Future在执行后提供了3个回调函数来让你访问结果,它们分别是:
- def onSuccess[U](pf: PartialFunction[T, U]): 传入一个偏函数,可以使用模式匹配来处理你想要的结果
- def onFailure[U](pf: PartialFunction[Throwable, U]): 传入一个偏函数,可以使用模式匹配来处理你想要的异常
- def onComplete[U](f: Try[T] => U): 传一个接受
Try[T](http://www.yangbajing.me/2013/02/16/Option-Either-Try/)类型的函数。
上面3个回调函数都要求返回一个类型为U的返回值,这也得益于Scala的类型自动推导功能,你可以减少很多的样版代码。
当Future完成后,我们注册的回调函数将收到值。一个常用的注册回调函数是onComplete,它期待传入一个偏函数,并处理Success[T]和Failure[E]两种情况。(编函数将另文介绍)
1 | theFuture.onComplate { |
可以看到,在Scala中写多线程代码是非常轻松惬意的。但是,你以为使用Future只是简化了new Thead或new Runnable的代码量而以,那就大错特错了。Scala的Future不只这些功能……
合并多个Future的结果
实际工作中,我们经常遇到需要向多个来源同时异步请求数据的时候。这时我们就需要等所以来源数据都返回后将结果集处理后再返回。使用Future.sequence方法,接收一个包含Future的列表来将一系列Future的结果汇总到一个List单一结果里输出。完整代码在:http://git.oschina.net/yangbajing/codes/08pacy2lubgqnkmv1xojd
1 | val f1 = Future { |
代码f1、f2、f3字义了3个异步操作并马上执行。f4将3个异步操作的结果合并到一个List里返回,同时f4也是一个异步操作。除了采用Future.sequence提供的方便函数,我们还可以使用for comprehension特性来更灵活的合并多个Future的结果。
我们把f4的操作改成使用for推导式形式:
1 | val f4: Future[(String, String, Int)] = |
可以看到for推导式也可以使用在Future上,r2 <- f2代码的含意是在f2这个Future执行完后将结果赋值给变量r2。与Future.sequence将多个线程的返回值合并到一个List不同。使用for推导式,在yield语句部分你可以对每个线程的运算结果做更自由的处理,并返回自己想要的类型(这得益于Scala强大的类型推导功能,不需要你显示的声明变量值类型)。
异常处理
前文代码,当Future代码块内有异常抛出时使用Future.sequence和for comprehension也会抛出异常,你将不能正确的获得结果。这时,可以使用Future提供的recover方法处理异常,并把异常恢复成一个正确值返回。
1 | def recover[U >: T](pf: PartialFunction[Throwable, U]) |
recover常用使用方式如下:
1 | Future (6 / 0) recover { case e: ArithmeticException => 0 } // result: 0 |
接下来看看怎样使用recover来将处理异常并可使返回值可正确应用于Future.sequence和for comprehension中。以之前的f2举例,修改代码如下:
1 | val f2 = Future { |
采用上面的步骤将异常转换成一个值返回,f2就可以正确的应用到合并代码里了。
Promise
Promise是一个承若,它是一个可修改的对象。一个Promise可以在未来成功的完成一个任务(使用p.success来完成),也可能用来完成一个失败(通过返回一个异常,使用p.failure)。失败了的Promise,可以通过f.recover来处理故障。考虑一个把com.google.common.util.concurrent.FutureCallback<V>封装成Scala的Future的例子,看看Promise是怎样使用的。
1 | val promise = Promise[R]() |
总结
Scala使用Future和Promise对并发编程提供了快捷的支持,同时对多个Future结果的合并和Future的异常恢复也提供了优雅的解决方案。