24

Suppose I have a function, which invokes a blocking interruptible operation. I would like to run it asynchronously with a timeout. That is, I would like to interrupt the function when the timeout is expired.

So I am trying to do something like that:

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); aref.get().interrupt} // 1
  Future {aref.set(Thread.currentThread); Try(f())}    // 2
}

The problem is that aref in (1) can be null because (2) has not set it to the current thread yet. In this case I would like to wait until aref is set. What is the best way to do that ?

6
  • I assume so, but is there a reason not to do the set in a single thread and then split off two futures only after it's set? Commented May 17, 2013 at 19:21
  • 5
    Future[Unit] will already give you a Try value, no need to nest an additional Try. Commented May 17, 2013 at 23:09
  • @0__: agreed, but for some weird reason in the Functional Reactive Programming course on Coursera by M. Odersky and Erik Meijer, in one of the lecture videos Meijer makes one of his example functions "more robust" by making it return a Future[Try[T]] instead of a Future[T]. Commented Apr 1, 2014 at 15:53
  • @0__ thank you, I was wondering exactly that. Commented Feb 16, 2015 at 14:49
  • @ErikAllik do you know about any example where we would see a difference? / could you elaborate more on that? I tried to see some different behaviour setting timeouts and dividing by zero, but I couldn't. I've added a Try at the Await (I think that really makes a difference there, by capturing the timeout exceptions). Commented Feb 16, 2015 at 14:52

5 Answers 5

17

You can go for a slightly easier approach using Await. The Await.result method takes timeout duration as a second parameter and throws a TimeoutException on timeout.

try {
  import scala.concurrent.duration._
  Await.result(aref, 10 seconds);
} catch {
    case e: TimeoutException => // whatever you want to do.
}
Sign up to request clarification or add additional context in comments.

8 Comments

I guess, It does not interrupt the function, which is running asynchronously.
In order to interrupt the function I need a mechanism similar to aref to store the current thread. Thus we return to the original question :)
In my code aref is AtomicReference rather than a Future.
You should never use Await.result in Your production code. It's a blocking function
@PeterPerháč import scala.concurrent.duration._
|
6

I needed the same behavior as well, so this is how I solved it. I basically created an object that creates a timer and fails the promise with a TimeoutException if the future hasn't completed in the specified duration.

package mypackage

import scala.concurrent.{Promise, Future}
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext.Implicits.global

object TimeoutFuture {

  val actorSystem = ActorSystem("myActorSystem")
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
    val promise = Promise[A]()
    actorSystem.scheduler.scheduleOnce(timeout) {
      promise tryFailure new java.util.concurrent.TimeoutException
    }

    Future {
      try {
        promise success block
      }
      catch {
        case e:Throwable => promise failure e
      } 
    }

    promise.future
  }
}

2 Comments

where does PackagerGlobal come from? I get a compile error... not found: value PackagerGlobal
Sorry about that, PackagerGlobal was an object class in my code which I forgot to remove. I've fixed the sample above so it's clear what object you need, namely an instance of the ActorSystem
5

If you add a CountDownLatch you can achieve the behavior you want. (Note that blocking (i.e. getting stuck at await) in lots and lots of Futures may lead to starvation of thread pools.)

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
  val cdl = new java.util.concurrent.CountDownLatch(1)

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); cdl.await(); aref.get().interrupt}   // 1
  Future {aref.set(Thread.currentThread); cdl.countDown(); Try(f())}  // 2
}

1 Comment

Then if f is completed much faster, the underlying Thread will still be interrupted?
3

Although you already got some answers on how to achieve it with blocking the additional thread to handle the timeout, I would suggest you to try a different way, for the reason Rex Kerr already gave. I don't exactly know, what you are doing in f(), but if it is I/O bound, I would suggest you to just use an asynchronous I/O library instead. If it is some kind of loop, you could pass the timeout value directly into that function and throw a TimeoutException there, if it exceeds the timeout. Example:

import scala.concurrent.duration._
import java.util.concurrent.TimeoutException

def doSth(timeout: Deadline) = {
  for {
    i <- 0 to 10
  } yield {
    Thread.sleep(1000)
    if (timeout.isOverdue)
      throw new TimeoutException("Operation timed out.")

    i
  }
}

scala> future { doSth(12.seconds.fromNow) }
res3: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = 
  scala.concurrent.impl.Promise$DefaultPromise@3d104456

scala> Await.result(res3, Duration.Inf)
res6: scala.collection.immutable.IndexedSeq[Int] =
  Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> future { doSth(2.seconds.fromNow) }
res7: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = 
  scala.concurrent.impl.Promise$DefaultPromise@f7dd680

scala> Await.result(res7, Duration.Inf)
java.util.concurrent.TimeoutException: Operation timed out.
    at $anonfun$doSth$1.apply$mcII$sp(<console>:17)
    at $anonfun$doSth$1.apply(<console>:13)
    at $anonfun$doSth$1.apply(<console>:13)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    ...

scala> res7.value
res10: Option[scala.util.Try[scala.collection.immutable.IndexedSeq[Int]]] =
  Some(Failure(java.util.concurrent.TimeoutException: Operation timed out.))

This will only use only 1 thread, that will be terminated after timeout + execution time of a single step.

1 Comment

I suppose here, that I cannot use any asynchronous I/O my function f performs long blocking but interruptible call.
1

You could also try using a CountDownLatch like this:

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 

  import ExecutionContext.Implicits.global
  val latch = new CountDownLatch(1)
  Future {
    latch.await()
    aref.get().interrupt
  }

  Future {
    aref.set(Thread.currentThread) 
    latch.countDown()
    Try(f())
  }
}

Now I'm waiting forever with my call to latch.await(), but you could certainly change that to:

latch.await(1, TimeUnit.SECONDS)

and then wrap it with a Try to handle if when/if it times out.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.