I was always wondering what is the magic behind the Future[T] in Scala, and finally I found no magic but everything falling back to the old Java concurrent things. All the callback functions are finally turned into a java.lang.Runnables and are managed by some java.util.concurrent.Executor.

Future[T] in Scala is implemented internally with Promise[T]. The terms future and promise are often used interchangeably, there are some differences in usage. A future is a read-only placeholder view of a variable, while a promise is a writable, single assignment container which sets the value of the future.

Use one sentance to discribe the implementation would be: the body of the future and all the callbacks are essentially runnables, which use promises to store the result of the expression.

Following is the simplified version of Future[T] and Promise[T].

package concurrent

import scala.Option
import scala.util.{Try, Success, Failure}

trait Future[+T] {
  private def internalExecutor = Future.InternalCallbackExecutor
  def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete {
    case Success(v) =>
      pf.applyOrElse[T, Any](v, Predef.conforms[T])
    case _ =>
  }
  def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
    case Failure(t) =>
      callback.applyOrElse[Throwable, Any](t, Predef.conforms[Throwable])
    case _ =>
  }
  def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit
  def isCompleted: Boolean
  def value: Option[Try[T]]
  // other monadic methods
}

object Future {
  private[concurrent] object InternalCallbackExecutor extends ExecutionContext with java.util.concurrent.Executor {}
  def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body)
}

The heart of the Future[T] is the onComplete method, everything else is based on it.

package concurrent.impl

import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import scala.util.{Try, Success, Failure}

private[concurrent] object Future {
  class PromiseCompletingRunnable[T](body: => T) extends Runnable {
    val promise = new Promise.DefaultPromise[T]()

    override def run() = {
      promise complete {
        try Success(body) catch { case NonFatal(e) => Failure(e) }
      }
    }
  }

  def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
    val runnable = new PromiseCompletingRunnable(body)
    executor.prepare.execute(runnable)
    runnable.promise.future
  }
}

Everytime we create a Future we create a Promise, which stores the result of the expression by calling the complete method. The evaluation of the expression is encapsulated in a Runnable to run in some thread managed by the executor, so the Future returns immediately.

Now, let’s look at the simplified implementation of Promise[T].

package concurrent

import scala.util.{ Try, Success, Failure }

trait Promise[T] {
  private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor
  def future: Future[T]
  def isCompleted: Boolean
  def complete(result: Try[T]): this.type =
    if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.")
  def tryComplete(result: Try[T]): Boolean
}

object Promise {
  def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]()
}

tryComplete returns false when the Promise has already been written, so that the complete method can be called only once, or it will throw an exception, thus the single assignment property is promised.

Finally, we get to the base of everything. Let’s go through the implementation step by step, the simplified implementation.

package concurrent.impl

private[concurrent] trait Promise[T] extends concurrent.Promise[T] with concurrent.Future[T] {
  def future: this.type = this
}

A trait to be used below, you can see that sometimes Future and Promise are interchangeable.

private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable {
  // must be filled in before running it
  var value: Try[T] = null

  override def run() = {
    require(value ne null) // must set value to non-null before running!
    try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
  }

  def executeWithValue(v: Try[T]): Unit = {
    require(value eq null) // can't complete it twice
    value = v
    // Note that we cannot prepare the ExecutionContext at this point, since we might
    // already be running on a different thread!
    try executor.execute(this) catch { case NonFatal(t) => executor reportFailure t }
  }
}

This is the Runnable that encapsulates the callback function.

private[concurrent] object Promise {
  /** A DefaultPromise has three possible states. It can be:
   *
   *  1. Incomplete, with an associated list of callbacks waiting on completion.
   *  2. Complete, with a result.
   *  3. Linked to another DefaultPromise.
   */
  class DefaultPromise[T] with Promise[T] { self =>

The implementation! A Promise has three states: incomplete, complete, link to another Promise. The third state is produced under the chaining of futures and promises.

    // a simplified version of state, the state is updated by the CAS 
    // in the original code
    @volatile var state: Any = Nil
    
    // an abstraction, there is a compressedRoot method to make every linked 
    // Promise linking with the last Promise in the chain
    //
    // also, related, a link method is used in flatMap, to update the state 
    // to an other promise. flatMap is a place where memoy leak is possible 
    // to rise
    private def root: DefaultPromise[T] = ???

Compressing the root is necessary for preventing memory leaks. It also benifits subsequent calls.

    def value: Option[Try[T]] = value0
    @tailrec
    private def value0: Option[Try[T]] = state.synchronized match {
      case c: Try[_] => Some(c.asInstanceOf[Try[T]])
      case _: DefaultPromise[_] => root.value0
      case _ => None
    }
    
    override def isCompleted: Boolean = isCompleted0
    @tailrec
    private def isCompleted0: Boolean = state.synchronized match {
      case _: Try[_] => true
      case _: DefaultPromise[_] => root.isCompleted0
      case _ => false
    }
    
    def tryComplete(value: Try[T]): Boolean = {
      tryCompleteAndGetListeners(value) match {
        case null             => false
        case rs if rs.isEmpty => true
        case rs               => rs.foreach(r => r.executeWithValue(resolved)); true
      }
    }

When a Promise has already been completed, tryCompleteAndGetListeners will return a null, or it will return the callback function list. If the callback function list is not empty, then tryComplete schedule those callbacks.

    @tailrec
    private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = {
      state.synchronized match {
        case raw: List[_] => state = raw.asInstanceOf[List[CallbackRunnable[T]]]; state
        case _: DefaultPromise[_] => root.tryCompleteAndGetListeners(v)
        case _ => null
      }
    }

If the state is a list of callbacks, it means that the Promise has not been completed yet, thus complete it and return the list of callbacks. Return null otherwise.

    def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
      val preparedEC = executor.prepare
      val runnable = new CallbackRunnable[T](preparedEC, func)
      dispatchOrAddCallback(runnable)
    }
    @tailrec
    private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = {
      state.synchronized match {
        case r: Try[_]          => runnable.executeWithValue(r.asInstanceOf[Try[T]])
        case _: DefaultPromise[_] => root.dispatchOrAddCallback(runnable)
        case listeners: List[_] => state = runnable :: listeners
      }
    }
  }

When adding a callback to a Promise, if the Promise has already been completed, we schedule the callback, otherwise, we add it into the callback list.

  final class KeptPromise[T](suppliedValue: Try[T]) extends Promise[T] {

    val value = Some(suppliedValue)

    override def isCompleted: Boolean = true

    def tryComplete(value: Try[T]): Boolean = false

    def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
      val completedAs = value.get
      val preparedEC = executor.prepare
      (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs)
    }
  }

This is for an already completed Future, useful in future-composition, chaining.

}

Now things become pretty clear.

References

link