Future-基础

介绍:Java 与 Scala 中的并发

Java 通过内存共享和锁来提供并发支持。Scala 中通过不可变状态的转换来实现:Future。虽然 Java 中也提供了 Future,但与 Scala 中的不同。

二者都是通过异步计算来表示结果,但是 Java 中需要使用阻塞的get方法来访问结果,同时可以在调用get之前使用isDone来检查结果是否完成来避免阻塞,但是仍然需要等待结果完成以支持后续使用该结果的计算。

在 Scala 中,无论Future是否完成,都可以对他指定转换过程。每一个转换过程的结果都是一个新的Future,这个新的Future表示通过函数对原始Future转换后得到的结果。计算执行的线程通过一个隐式的*execution context(执行上下文)*来决定。以不可变状态串行转换的方式来描述异步计算,避免共享内存和锁带来的额外开销。

锁机制的弊端

Java 平台中,每个对象都与一个逻辑监视器关联,以控制多线程对数据的访问。使用这种模式时需要指定哪些数据会被多线程共享并将被访问的、控制访问的和共享数据的代码段都标记为synchronized。Java 运行时使用的机制来确保同一时间只有一个线程能够进入被锁保护的代码段。以此协调你能够通过多线程来访问数据。

为了兼容性,Scala 提供了 Java 的并发原语。可以在 Scala 中调用方法wait/notify/notifyAll,并且意义与 Java 一致。但是并不提供关键字synchronized,但是预定义了一个方法:

var counter = 0
synchronized {
  // 这里同时只能有一个线程
  counter = counter + 1
}

但是这种模式难于编写可靠的多线程应用。死锁、竟态…

使用 Try 处理异步中的异常

当你调用一个 Scala 方法时,它会在你等待返回结果时执行一个计算,如果结果是一个Future,它表示另一个异步化执行的计算,通常会被一个完全不同的线程执行。在Future上执行的操作都需要一个excution context来提供异步执行的策略,通常可以使用由 Scala 自身提供的全局执行上下文,在 JVM 上,它使用一个线程池

引入全局执行上下文:

import scala.concurrent.ExecutionContext.Implicits.global
val future = Future { Thread.sleep(10000); 21 + 21 }

当一个Future未完成时,可以调用两个方法:

future.isComplated		// false
future.value			// Option[scala.util.Try[Int]] = None

完成后:

future.isComplated		// true
future.value			// Option[scala.util.Try[Int]] = Some(Success(42))

value方法返回的Option包含一个Try,成功时包含一个类型为 T 的值,失败时包含一个异常,java.lang.Throwable的实例。

Try支持在尝试异步计算前进行同步计算,同时支持一个可能包含异常的计算。

同步计算时可以使用try/catch来确保新城调用方法并捕捉、处理方法抛出的异常。但是异步计算中,发起计算的线程常会移动到其他任务上,然后当计算中抛出异常时,原始的线程不再能通过catch子句来处理异常。因此使用Future进行异步操作时使用Try来处理可能的失败并生成一个值,而不是直接抛出异常。

scala> val fut = Future { Thread.sleep(10000); 21 / 0 } 
fut: scala.concurrent.Future[Int] = ...

scala> fut.value 
res4: Option[scala.util.Try[Int]] = None

// 10s later
scala> fut.value 
res5: Option[scala.util.Try[Int]] = Some(Failure(java.lang.ArithmeticException: / by zero))

Try的定义:

object Try {
  /** 通过传名参数构造一个 Try。
   * 捕获所有 non-fatal 错误并返回一个 `Failure` 对象。
   */
  def apply[T](r: => T): Try[T] =
    try Success(r) catch {				// 常规的 try、catch 调用
      case NonFatal(e) => Failure(e)
    }
}
sealed abstract class Try[+T]
final case class Failure[+T](exception: Throwable) extends Try[T]
final case class Success[+T](value: T) extends Try[T]

Future 操作

map

将传递给map方法的函数作用到原始Future的结果并生成一个新的Future

val result = fut.map(x => x + 1)

原始Futuremap转换可能在两个不同的线程上执行。

for

因为Future声明了一个flatMap方法,因此可以使用for表达式来转换。

val fut1 = Future { Thread.sleep(10000); 21 + 21 }	// Future[Int]
val fut2 = Future { Thread.sleep(10000); 23 + 23 }	// Future[Int]
for { x <- fut1; y <- fut2 } yield x + y			// Future[Int]

因为for表达式是对转换的串行化,如果没有在for之前创建Future并不能达到并行的目的。

for { 
	x <- Future { Thread.sleep(10000); 21 + 21 } 
	y <- Future { Thread.sleep(10000); 23 + 23 } 
} yield x + y		// 需要最少 20s 的时间完成计算

for { x <- fut1; y <- fut2 } yield x + y实际会被转化为fut1.flatMap(x => fut2.map(y => x + y))

flatMap的定义:将一个函数作用到Future成功时的结果并生成一个新的Future,如果原Future失败,新的Future将会包含同样的异常。

创建 Future

上面的例子是通过Futureapply方法来创建:

def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body)

body是需要执行的异步计算。

创建一个成功Future

Future.successful { 21 + 21 }
// def successful[T](result: T): Future[T] = Promise.successful(result).future
// result 为 Future 的结果

创建一个失败Future

Future.failed(new Exception("bummer!"))
// def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future
// exception 为指定的异常

通过Try创建一个已完成的Future

import scala.util.{Success,Failure}
Future.fromTry(Success { 21 + 21 })
Future.fromTry(Failure(new Exception("bummer!")))
// def fromTry[T](result: Try[T]): Future[T] = Promise.fromTry(result).future

常用的方式是通过Promise来创建,得到一个被这个Promise控制的Future,当这个Promise完成时对应的Future才会完成:

val pro = Promise[Int]			// Promise[Int]
val fut = pro.future			// Future[Int]
fut.value						// None
pro.success(42)					// 或者 pro.failure(exception)/pro.complete(result: Try[T])
fut.value						// Try[Int]] = Some(Success(42))

或者调用completeWith方法并传入一个新的Future,新的Future一旦完成则用值赋予给这个Priomise

filter & collect

filter用户验证Future的值,如果满足则保留这个值,如果不满足则会抛出一个NoSuchElementException异常:

val fut = Future { 42 }
val valid = fut.filter(res => res > 0)
valid.value		// Some(Success(42))
val invalid = fut.filter(res => res < 0)
invalid.value	// Some(Failure(java.util.NoSuchElementException: Future.filter predicate is not satisfied))

同时提供了一个withFilter方法,因此可以在for表达式中执行相同的操作:

val valid = for (res <- fut if res > 0) yield res
val invalid = for (res <- fut if res < 0) yield res

collect方法对Future的值进行验证并通过一个操作将其转换。如果传递给collect偏函数符合Future的值,该Future会返回经过偏函数转换后的值,否则会抛出NoSuchElementException异常:

val valid = fut collect { case res if res > 0 => res + 46 }		// Some(Success(88))
val invalid = fut collect { case res if res < 0 => res + 46 }	// NoSuchElementException

错误处理:failed、fallBackTo、recover、recoverWith

failed

failed方法将一个任何类型的、错误的Future转换为一个成功的Future[Throwable],这个Throwable即引起错误的异常。

val failure = Future { 42 / 0 }
failure.value			// Some(Failure(java.lang.ArithmeticException: / by zero))
val expectedFailure = failure.failed
expectedFailure.value	// Some(Success(java.lang.ArithmeticException: / by zero))

如果调用failed方法的Future最终是成功的,而调用failed方法返回的Future会以一个NoSuchElementException异常失败。因此,只有当你需要Future失败时,调用failed方法才是适当的:

val success = Future { 42 / 1 }
success.value			// Some(Success(42)), 原本是一个成功的 Future
val unexpectedSuccess = success.failed
unexpectedSuccess.value	// NoSuchElementException, 称为一个失败的 Future

fallBackTo

fallBackTo方法用于提供一个可替换的Future,以便调用该方法的Future失败时作为备用。

val fallback = failure.fallbackTo(success)
fallback.value

如果调用fallBackTo方法的原始Future执行失败,传递给fallBackTo的错误本质上会被忽略。但是如果调用fallBackTo提供的Future也失败了,则会返回最初的错误,即原始Future中的错误:

val failedFallback = failure.fallbackTo( 
	Future { val res = 42; require(res < 0); res } // 这里实际是一个 require 异常
)
failedFallback.value	// Some(Failure(java.lang.ArithmeticException: / by zero)),仍然返回了原始 Future 中的除零异常

recover

recover允许将一个失败的Future转换为一个成功的Future,或者原始Future成功时则不作处理。

val recovered = failedFallback recover { case ex: ArithmeticException => -1 }
recovered.value		// Some(Success(-1)), 捕捉异常并设置成功值,返回新的 Future

如果原始Future成功,recover部分会以相同的值完成:

val unrecovered = fallback recover { case ex: ArithmeticException => -1 }
unrecovered.value	// Some(Success(42))

同时,如果传递给recover的偏函数并不包含原始Future的错误类型,新的Future仍然会以原始Future中的失败完成:

val alsoUnrecovered = failedFallback recover { case ex: IllegalArgumentException => -2 }
alsoUnrecovered.value	// Some(Failure(java.lang.ArithmeticException: / by zero))

recoverWith

recoverWithrecover类似,但是使用的是一个Future值。

val alsoRecovered = failedFallback recoverWith { 
	case ex: ArithmeticException => Future { 42 + 46 } 	// 这是一个 Future
}

其他方面的处理则于recover一致。

transform:对可能性的映射

transfor接收两个转换Future的函数:一个处理原始Future成功的请求,一个处理失败的情况。

val first = success.transform( 
	res => res * -1, 						// 成功
	ex => new Exception("see cause", ex) 	// 失败
)

**注意:**现有的transform并不能将一个成功的Future转换为一个失败的Future,或者反向。只能对成功时的结果进行转换或失败时的异常类型进行转换。

Scala 2.12 版本中提供了一种替代的方式,接收Try => Try的函数:

val firstCase = success.transform { 		// 处理成功的 Future
	case Success(res) => Success(res * -1) 
	case Failure(ex) => Failure(new Exception("see cause", ex)) 
}

val secondCase = failure.transform { 		// 处理失败的 Future
	case Success(res) => Success(res * -1) 
	case Failure(ex) => Failure(new Exception("see cause", ex)) 
}

val nonNegative = failure.transform { 		// 将失败转换为成功
	case Success(res) => Success(res.abs + 1) 
	case Failure(_) => Success(0) 
}

组合 Future:zip、fold、reduce、sequence、traverse

zip

zip方法将两个成功的Future转换为一个新的Future,其值两个Future值的元组。

val zippedSuccess = success zip recovered		// scala.concurrent.Future[(Int, Int)]
zippedSuccess.value								// Some(Success((42,-1)))

如果其中一个失败,zip方法的值会以同样的异常失败:

val zippedFailure = success zip failure
zippedFailure.value		// Some(Failure(java.lang.ArithmeticException: / by zero))

如果两个都失败,结果值会包含最初的异常,即调用zip方法的那个Future的异常。

fold

trait TraversableOnce[+A] extends GenTraversableOnce[A]

可以被贯穿一次或多次的集合的模板特质。它的存在主要用于消除IteratorTraversable之间的重复代码。包含一系列抽象方法并在IteratorTraversable..中实现,这些方法贯穿集合中的部分或全部元素并返回根据操作生成的值。

fold方法通过穿过一个TraversableOnceFuture集合来累积值,生成一个Future结果。如果集合中的所有Future都成功了,结果Future会以累积值成功。如果集合中任何一个失败,结果Future就会失败。如果多个Future失败,结果中会包含第一个失败的错误。

val fortyTwo = Future { 21 + 21 }
val fortySix = Future { 23 + 23 }
val futureNums = List(fortyTwo, fortySix)
val folded = Future.fold(futureNums)(0) { 	// (0), 提供一个累积值的初始值
	(acc, num) => acc + num 
}
folded.value								// Some(Success(88))

reduce

reduce方法与fold类似,但是不需要提供初始的默认值,它使用最初的Future的结果作为开始值。

val reduced = Future.reduce(futureNums) { 
	(acc, num) => acc + num 
}
reduced.value	// Some(Success(88))

如果给reduce方法传入一个空的集合,则会以NoSuchElementException异常失败,因为没有初始值。

sequence

sequence方法将一个TraversableOnceFuture集合转换为一个包含TraversableOnce值的Future。比如List[Future[Int]] => Future[List[Int]]:

val futureList = Future.sequence(futureNums)
futureList.value	// Some(Success(List(42, 46)))

traverse

traverse方法将一个包含任意元素类型的TraversableOnce转换为一个TraversableOnceFuture,并且这个序列转换为一个TraversableOnce值的Future。比如,List[Int] => Future[List[Int]]

val traversed =Future.traverse(List(1, 2, 3)) { i => Future(i) }	// .Future[List[Int]]
traversed.value		// Some(Success(List(1, 2, 3)))

执行副作用:foreach、onComplete、andThen

有时需要在Future完成时执行一些副作用,而不是通过Future生成一个、一些值。

foreach

最基本的foreach方法会在Future成功完成时执行一些副作用。失败时将不会执行:

failure.foreach(ex => println(ex))		// 不会执行
success.foreach(res => println(res))	// 42

因为不带yieldfor表达式会被重写为一个foreach执行,因此也可以使用for表达式来实现:

for (res <- failure) println(res)
for (res <- success) println(res)

onComplete

这是Future的一种回调函数,无论Future最终成功或失败,onComplete方法都会执行。它需要被传入一个TrySuccess用于处理成功的情况,Failure用于处理失败的情况:

success onComplete { 
	case Success(res) => println(res) 
	case Failure(ex) => println(ex) 
}

andThen

Future并不会保证通过onComplete注册的回调函数的执行顺序。如果需要保证回调函数的执行顺序,可以使用andThen方法代替,它是Future的两一个回调函数。

andThen方法返回一个对原始Future映射(即与原始 Future 同样的方式成功或失败)的新Future,但是当回调完全执行后才会完成。它的功能是,既不影响原始 Future 的结果,又能在原始 Future 完成时执行一些回调。

val newFuture = success andThen { 
	case Success(res) => println(res) 
	case Failure(ex) => println(ex) 
}
42					// 在回调中打印 结果
newFuture.value		// Some(Success(42)), 同时仍然保持了原始 Future 的值

但是需要注意的是,如果传递给andThen的函数如果在执行时引发异常,该异常会传递给后续的回调或者通过结果Future呈现。

2.12 中的新方法

flatten

flatten方法将一个嵌套的Future转换为一个单层的Future,即Future[Future[Int]] =>Future[Int]

val nestedFuture = Future { Future { 42 } }		// Future[Future[Int]]
val flattened = nestedFuture.flatten			// Future[Int]

zipWith

zipWith方法实质上是对两个Future执行zip方法,并将结果元组执行一个map调用:

val futNum = Future { 21 + 21 }
val futStr = Future { "ans" + "wer" }
val zipped = futNum zip futStr
val mapped = zipped map { case (num, str) => s"$num is the $str" }

使用zipWith只需要一步:

val fut = futNum.zipWith(futStr) { // Scala 2.12 
	case (num, str) => s"$num is the $str" 
}

transformWith

transformWith支持通过一个Try => Future的函数来转换Future

val flipped: Future[Int] = success.transformWith { // Scala 2.12 
	case Success(res) => Future { throw new Exception(res.toString) } 
	case Failure(ex) => Future { 21 + 21 } 
}

该方法实质上是对transform方法的重写,它支持生成一个Future而不是生成一个Try

测试 Future

Future 的作用在于避免阻塞。在很多 JVM 实现上,创建上千个线程之后,线程间的上下文切换对性能的影响达到不能接受的程度。通过避免阻塞,可以繁忙时维持有限的线程数。不过,Scala 支持在需要的时候阻塞Future的结果,通过Await

val fut = Future { Thread.sleep(10000); 21 + 21 }
val x:Int = Await.result(fut, 15.seconds) 		// <= blocks

然后就可以对其结果进行测试:

import org.scalatest.Matchers._
x should be (42)

或者直接通过特质ScalaFutures提供的阻塞结构来测试。比如futureValue方法,它会阻塞直到Future完成,如果Future失败,则会抛出TestFailedException异常。

import org.scalatest.concurrent.ScalaFutures._
val fut = Future { Thread.sleep(10000); 21 + 21 }
fut.futureValue should be (42)			// <= futureValue 阻塞

或者使用 ScalaTest 3.0 提供的异步测试风格:

import org.scalatest.AsyncFunSpec 
import scala.concurrent.Future

class AddSpec extends AsyncFunSpec {

	def addSoon(addends: Int * ): Future[Int] = Future { addends.sum }

	describe("addSoon") { 
		it("will eventually compute a sum of passed Ints") { 
			val futureSum: Future[Int] = addSoon(1, 2) 
			// You can map assertions onto a Future, then return 
			// the resulting Future[Assertion] to ScalaTest: 
			futureSum map { sum => assert(sum == 3) } 
		}
	}
}