We want to study the java.util.Timer
scheduler and strip it to its bare fundamentals. We want to know:
- How does it work?
- What concurrency mechanisms does it use?
- Could you roll out your own implementation, or is it some sort of native concept that ships with the Java language itself and cannot be replicated in user code?
Yes. You can definitely roll out your own! Like all things concurrency, you shouldn’t use your own implementation in production, but learning to develop one from scratch is an enlightening and insightful experience.
Here you will learn low level concurrency primitives and concepts. The building blocks used by the libraries that provide us higher-level abstractions for concurrency, like timers and task schedulers.
TLDR
- Every
Timer
that is instantiated starts a thread. A single new thread.- The thread is essentially a
while(true)
loop over a queue of tasks. - The thread is responsible for both running the tasks, and deciding when to run the next task.
- The thread is essentially a
- The native method
wait(<timeout>)
– present on every Java object – is the mechanism by which the thread can sleep until the next task is due.- Correspondingly, the native method
notify()
is the mechanism by which the thread can be “waken up, if during its sleep,” a new task is submitted that ought to run sooner.
- Correspondingly, the native method
The MVP of a Timer
The purpose of a timer is to receive and manage tasks with deferred and/or periodic execution. Given an instance, you want to give it a task “to run 1 minute from now”, or “every 30 seconds.”
These tasks need to run on threads, these being the basic unit to “run things”. It also doesn’t make much sense to run these tasks on the same thread that submits the task. Or else the thread would have to block to process that task, which doesn’t seem useful. The question is then if we should have a single thread, multiple threads, or some kind of thread pool. Regardless, this is enough to realize that we need some kind of data structure by means of which the thread that submits the task, can actually pass the task to the thread(s) that run it.
The most basic form of a timer is therefore a lonely thread whose sole purpose is to loop over some data structure that contains tasks to run. Evidently, that data structure is shared with the other “producer” threads which have a reference to the timer and that submit the tasks:
It turns out, the Timer
class is very much well represented in the above diagram. In particular, a Timer
is associated with a SINGLE thread. Every time you create a new instance, a new thread is created. That thread goes on an infinite loop. On each iteration it checks if there is a task that it should run. It executes it if there is, and then proceeds to the next iteration. This is essentially it.
Note
Since Java 5.0 (2004), Timer
went out of favor with the introduction of java.util.concurrent.ScheduledThreadPoolExecutor
. The latter is more versatile and also better designed, but Timer
is more than adequate. For example, ScheduledThreadPoolExecutor
is an interface rather than a concrete class, with all the advantages that brings.
Notice also that you can have many “clients” of a Timer
. That is, many parts of your code base with a reference to the same object. Each may submit tasks, and be running on a different thread. This demands the “shared data structure” be thread-safe.
The API
There are four public methods on the Java’s Timer
class.
public class Timer {
public void schedule(TimerTask task, long delay) {/* impl */}
public void schedule(TimerTask task, long delay, long period) {/* impl */}
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {/* impl */}
public void cancel() {/* impl */}
}
These methods are what the client threads are allowed to call.
The two methods with the period
argument are for periodic tasks. They are meant to run “an infinite number of times” at a frequency of period
. The difference between the two is briefly addressed towards the end of the article.
We will focus on the first method. It’s the simplest, but still requiring us to address the fundamentals of the challenge that underpins timers: How to schedule the tasks. It’s here where we lift the veil of magic. The other two scheduling methods introduce no fundamental new challenge and can be regarded as afterthoughts which we will overlook. cancel()
is also important.
On the listing above, TimerTask
is a class which exists for the purpose of Timer
. It is defined on the same file as well. It extends, and is essentially, a Runnable
. It’s not overly relevant to the core understanding so we will just use Runnable
on our case.
Keep in mind that whenever we refer to Timer
we mean the actual Java class java.util.Timer
.
Naive Approach
Warning
In the code snippets that follow, we will use Scala. Scala syntax is slightly different from Java, but because all JVM languages operate within the same “concurrency model”, any code and discussion is 100% “interoperable”.
We need the following:
- Thread safe data structure.
- One thread looping over that data structure endlessly.
We don’t want to have to initialize the timer. Once an object is created, we want it to be immediately ready to receive tasks. This means the timer’s thread will have to be created and started on the constructor call. We should pause to consider the significance of this. Even though from the point of view of the client constructing a timer looks just like any other call, underneath it is a point where the code becomes concurrent. After that point, two threads, with their own stack, are active doing their “own thing” and being scheduled on the actual physical processor. They communicate only via main memory.
Naive approach
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
class CustomTimer private (queue: BlockingQueue[(Runnable, Long)]) {
def schedule(runnable: Runnable, delay: Long): Unit = {
val task = (runnable, System.currentTimeMillis() + delay)
queue.put(task)
}
}
object CustomTimer {
def apply(threadName: String): CustomTimer = {
val queue = new LinkedBlockingQueue[(Runnable, Long)]()
def mainLoop(): Unit =
while (true) {
val tuple @ (runnableTask, whenToRun) = queue.take()
if (System.currentTimeMillis() > whenToRun) runnableTask.run()
else queue.put(tuple)
}
val timerThread = new Thread(new Runnable { def run(): Unit = mainLoop() })
timerThread.setName(threadName)
timerThread.setDaemon(false)
timerThread.start()
new CustomTimer(queue)
}
}
The first approach above will leverage the existing LinkedBlockingQueue
class. Developing a thread safe data structure from scratch is very educational in what regards to concurrency – which is what we want to learn – but here we will delegate that particular job so that we can focus on the other concurrency intricacies of building a custom timer/scheduler. I will still consider the custom timer we are building as being “from scratch”.
The code is quite straightforward. The timer thread loops around forever. On each iteration it takes one task from the shared queue, and if that tasks’ time is up, it runs it. The client threads’ code path (i.e., the schedule
method) is even simpler. It is just a thin wrapper to inserting tasks onto the same shared queue.
Are you surprised of the usage of System.currentTimeMillis()
to help schedule the tasks? Don’t be. Timer
uses the exact same mechanism to determine when a task needs to run.
This actually makes both Timer
and our custom timer sensitive to the user changing system time. That is, if you schedule a task to run 1 hour from now, and then you change the clock back 1 year, the task will take 1 year and 1 hour to start running. Some consider this a “tiny” bug, others a feature. In contrast, the ScheduledThreadPoolExecutor
uses System.nanoTime()
that does not suffer from this.
For all its cons, an advantage of this approach is that the thread will not waste CPU time when there are no tasks submitted. This is provided by the use of a blocking queue, which puts the thread calling queue.take
on a “waiting” state until a timer client pushes a new element/task to the queue.
We can see that by profiling the thread state:
spin method. Useful for tests.
import scala.concurrent.duration._
def spin(duration: Duration): Unit = {
val stopSpinning = System.currentTimeMillis() + duration.toMillis
while (System.currentTimeMillis() < stopSpinning) {}
}
Thread sleeps when queue is empty
import scala.concurrent.duration._
object SomeTests extends App {
// When the constructor returns, the timer thread is already up and running.
val customTimer = CustomTimer("FooBarThread")
// Note, this call happens on the main thread, not the timer thread.
spin(1.minute)
customTimer.schedule(
new Runnable { def run(): Unit = spin(20.seconds) },
30.second.toMillis
)
}
The biggest disadvantage of this attempt is that when there are pending tasks (i.e. the queue is not empty) the timer thread is needlessly active. The thread will be assigned CPU time, although nothing of value is running until the next task has to be run: If you submit a single task to run 30 minutes from now, for the next 30 minutes, the thread will be continuously looping around.
There is an implicit requirement:
The timer thread should not waste CPU cycles UNTIL the time for the next task to run expires.
This is indeed what Timer
does. The diagram shows what the timer thread is doing for the following listing.
Scheduling of two tasks
import java.util.{Timer, TimerTask}
import scala.concurrent.duration._
object SomeTests extends App {
val javaTimer = new Timer("FooBarTimer")
javaTimer.schedule(
new TimerTask { def run() = spin(20.second) },
30.second.toMillis
)
javaTimer.schedule(
new TimerTask { def run() = spin(20.second) },
90.second.toMillis
)
}
The timer (and therefore associated thread) is created at around time 0, and immediately after two tasks are submitted. Task 1, schedule to run 30 seconds from now and Task 2, schedule to run 90 seconds from now. Both tasks are just a while loop for 20 seconds.
Importantly note
- For the first 30 seconds, there is nothing to do. Adequately the thread sleeps and does not wast CPU. “It knows” exactly when to wake back up.
- After it runs Task 1, it goes back to idle until Task 2 is meant to run.
- After running Task 2, as there are no more tasks submitted, it correctly goes back to a “Waiting state”.
Attempt 2
We could try to improve our custom Naive approach, by calling Thread.sleep(<duration>)
, where duration
would be the time interval until the next task is meant to run. That is, on each iteration we take the task which should run next, and then put the thread to sleep until that task should run; when it wakes, the thread then runs it, and moves on to the next iteration.
The problem is we would miss-schedule tasks the clients submit after the timer thread goes to the sleep state. In other words, the question is:
Relative to the previous scenario, what would happen if some client – on a different thread – submitted a task at 60 seconds, when the timer thread was waiting to run Task 2, and such that this new task has a scheduled time of 10 seconds (meaning, it should run at time 70 seconds)?
Sadly, in our purposed approach with Thread.sleep()
, the timer would run Task 3 at best at 90 seconds, once it awakens.
By contrast, the Timer
correctly wakes up prematurely, and is able to run Task 3 before Task 2:
Submitting a task due sooner
import java.util.{Timer, TimerTask}
import scala.concurrent.duration._
object SomeTests extends App {
val javaTimer = new Timer("FooBarTimer")
javaTimer.schedule(
new TimerTask { def run() = spin(20.second) },
30.second.toMillis
)
javaTimer.schedule(
new TimerTask { def run() = spin(20.second) },
90.second.toMillis
)
// Note that this is the main thread,
// the only "client" of the Timer instance.
spin(60.seconds)
javaTimer.schedule(
new TimerTask { def run() = spin(10.second) },
10.second.toMillis
)
}
Furthermore, notice the dashed line at 60 seconds, when Task 3 is introduced? If we were to zoom-in, we would notice that the thread wakes up for a very brief period of time, in the order of microseconds. This is necessary to verify when the new Task 3 has to be run and take appropriate action.
This seems rather magical. The timer thread, which is effectively not running when Task 3 is submitted, wakes up as a result of the submission and then 1) processes the new task, 2) understands it should run earlier than Task 2, and 3) goes back to sleep until its new due date.
This is off course what you would intuitively expect. We should augment the previous requirement.
The timer thread should not waste CPU time UNTIL the time for the next task to run expires, OR UNTIL a new task is submitted in the meantime, such that it has to run before the former.
Solving this problem requires us to use low level concepts. The same concepts that external libraries that provide us with higher-level abstractions for concurrency, as for example the akka project, use.
And this is in fact the point. We want to explore the most basic concurrency building blocks in Java. The building blocks that all other libraries, no matter how esoteric, must ultimately use.
It is here that the interface between our source code (be it Java/Scala/Kotlin) and the underlying system (JVM, OS, Hardware) occurs.
The same way you ultimately need the JVM to have native code to handle the file system, or network calls for us, here we must also rely on the JVM to be able to deal with thread scheduling.
In turn, the JVM will call the underlying OS for this, given that threads is primarily an OS abstraction.
What is exposed by the JVM are the native methods wait()
, wait(<timeout>)
, and notify()
. These methods work alongside and are strictly associated with intrinsic locks. These are also called java monitors or synchronized
blocks.
Most likely, some readers are already familiar with these. They exist on every Java object, and are one of the pillars of all things concurrency in Java. The other pillar being atomics.
We won’t be exploring these methods in detail. That requires an article of its own, but a summary is important.
The most important method/keyword is synchronized
. Synchronized is the syntax to implement java monitors (or intrinsic locks). This is essentially the Java version of a mutex. Any and all Java objects, even the ones associated with your own classes, have this method. Any code within a synchronized
block is guaranteed to be accessed at most by one thread at a given time. Meaning, no two threads can be “inside” that code simultaneously. We say that only a given thread can have the monitor for a given object at a given time.
Equally important, the wait()
and notify()
methods above exist within the context of a particular monitor. Not outside. Meaning, calling those methods can only occur with the monitor held. Any other behaviour throws an exception.
When the thread holding the monitor of an object calls wait()
(or wait(<timeout>)
), the thread releases the monitor and goes to a waiting state. In this state, the OS will not give CPU cycles for it to run. The thread is only awaken when another thread calls notify()
on the same object, or the timeout expires.
Similarly, calling notify()
can only be done when the thread is holding the monitor of the same object. This method then releases the monitor, and it awakes one thread that is waiting. If there are many waiting on that monitor, “it selects” one thread at random.
Again, for both methods, the thread must have already acquired the monitor. Meaning those calls are normally inside synchronized
blocks.
With them, we are able to meet the requirement above.
A serious solution
import java.util.Comparator
import java.util.concurrent.{BlockingQueue, PriorityBlockingQueue}
class CustomTimer private (lock: Object, queue: BlockingQueue[(Runnable, Long)]) {
def schedule(runnable: Runnable, delay: Long): Unit = {
val thisTask = (runnable, System.currentTimeMillis() + delay)
queue.put(thisTask)
if (queue.peek() == thisTask) {
lock.synchronized {
lock.notify() // notify and wait calls "dance" together.
}
}
}
}
object CustomTimer {
private def comparator: Comparator[(Runnable, Long)] =
new Comparator[(Runnable, Long)] {
def compare(o1: (Runnable, Long), o2: (Runnable, Long)): Int =
(o1._2 - o2._2).toInt
}
def apply(threadName: String): CustomTimer = {
val lock: Object = new Object()
val queue = new PriorityBlockingQueue[(Runnable, Long)](10, comparator)
def mainLoop(): Unit =
while (true) {
val tuple @ (runnableTask, whenToRun) = queue.take()
val timeToWait = whenToRun - System.currentTimeMillis()
if (timeToWait <= 0) runnableTask.run()
else {
lock.synchronized {
lock.wait(timeToWait) // notify and wait calls "dance" together.
}
queue.offer(tuple)
}
}
val timerThread = new Thread(new Runnable { def run(): Unit = mainLoop() })
timerThread.setName(threadName)
timerThread.setDaemon(false)
timerThread.start()
new CustomTimer(lock, queue)
}
}
Above, the lock
object serves only the purpose of being the monitor for the synchronized
/wait
/notify
calls. Obviously the actual name of the variable is not important. Since these methods are available on any and all Java objects, the most appropriate object is simply new Object()
.
As on Naive approach we delegate the responsibility of maintaining state – in a thread safe way – to an external queue implementation. PriorityBlockingQueue
used here has the extra advantage of sorting the tasks that we provide by priority, where priority is defined by the Comparator
object. So when we call queue.take()
, we are guaranteed to take the next task that should run.
Developing such queue would also be very insightful onto concurrency concepts. But again, here we are focusing on other aspects.
Timer
also leverages the same essential mechanism and the synchronized
/wait
/notify
triad to schedule tasks. However, it doesn’t use a separate object to act as the monitor. Instead, it uses the queue itself. In other words it doesn’t strictly separate the responsibility of the data structure from the scheduling part. This works for Timer
as it uses a queue specially made for the timer. It is on the same file on the java.util
package and was designed with that in mind. In our case, as we use an external queue, we don’t want to call synchronize methods on it. That would make things harder to understand, and more importantly, it would increase contention on the queue. In other words, why use an external queue to which we delegate thread safety, if we then end up using explicit external locking (via synchronize)?
On the listing above , the lock.notify()
and lock.wait(<timeout>)
calls should be analyzed together. They perform a sort of dance. Let’s see how it solves the scenario submitting a task due sooner.
The starting point shall be when the timer thread is already executing Task 1. At that moment, Task 2 is already on the queue, but Task 3 is not.
Once the timer thread finishes Task 1, it begins another iteration of the loop. It removes Task 2 (at that point the only task left) and falls into the else
branch as it’s time is not up yet. It enters the synchronized
block and calls the wait method with timeout 90s-50s=30s
. At this point the timer thread goes into the waiting state. As discussed earlier, this means that the JVM/OS will a) prevent that thread from obtaining CPU scheduling time, and 2) “take note” that the thread should wake up once the monitor in which it is waiting gets notified, or the timeout expires. Whichever comes first.
At same point later, but before those 30 seconds go by, the timer thread is still asleep when a client, on another thread, submits a new task. Because this new task is the next task to run (being it the only one on the queue), our schedule
method will enter the if
block. There, it acquires the lock. This is possible, because the timer thread relinquished the lock earlier, when it called wait(30s)
. With the monitor lock held, it calls lock.notify()
. This has the power to wake up the timer thread. Recall that by the semantics of Object.wait()
, the thread wakes up when the timeout expires where when some other thread calls notify
.
After the timer thread awakens, it continues where it left off. After exiting the synchronized block, it inserts the Task 2 back into the shared queue, and then starts another loop iteration. At the point it takes an element from the queue, both tasks Task 2 and Task 3 will be present there. The one taken is the one that ought to run first. Recall that the responsibility of sorting the queue according with schedule time was deferred to the java.util.concurrent.PriorityBlockingQueue
(for convenience only; as said, there is nothing magical about this). Because of this, Task 3 will be the one removed from the queue. Still, it is not yet the time for the task to run. So we go again onto the else
branch and call lock.wait(10 seconds)
. Because for this example there are no more clients submitting tasks, there is no more calls to lock.notify()
through the schedule
method, and the only way for the timer thread to wake up is when the timeout expires.
After it wakes up, it runs the task. Well, not immediately. In this implementation we first offer it back to the queue. That’s fine, because the thread now begins another iteration of the loop, and it will take the same Task 3 again. By then timeToWait
will be negative and finally Task 3 is run. After Task 3 completes, we are back again on a new iteration. By now, only Task 2 remains on the queue, and the process repeats itself.
Lastly, notice that if instead, clients insert new tasks which are not scheduled to run before the next one, then the timer thread will not be awakened (in case it was waiting).
There you have it. A bare-bones implementation of a timer using the same scheduling mechanism as the actual Timer
class. For the purposes of simplicity we took some short-cuts. For example, offering the task back to the queue feels odd, and could probably be avoided. More importantly however, there is a small bug with this implementation.
A bug
The previous solution has a race condition. The solution is still very decent, but you probably wouldn’t want to use it in production. But that is already an assumption on everything we have been doing.
Do you want to try to identify the race condition? The tip is: Consider the listing A serious solution, and what would happen if two tasks are scheduled within a very small time-interval, and such that the second task should run before the first task. The discussion is below.
Without loss of generality, imagine Task 1 is inserted first, and should run 1 hour from now. Task 2 is inserted second (but very, very shortly after), and it should run 5 minutes from now. From the previous discussion, we expect Task 2 to run first and start running after 5 minutes.
This might or not be the case. Depends on the time interval between the insertion of the two tasks, and the OS scheduling of the timer thread relative to the client threads. In other words, there is a race condition.
The failing scenario is as follows. Imagine the queue is empty and the timer thread is blocked at queue.take()
. Task 1 is introduced and the timer thread awakens and proceeds to next line of code. Critically, before the timer thread reaches and acquires the intrinsic lock, another client thread (potentially the same even), inserts Task 2. Task 2 is meant to run sooner. On the schedule
method, the client thread enters the synchronised block, and calls notify
. However, that is pointless. At that point in time, the timer thread is not waiting on the monitor. It hasn’t even yet called object.wait(<timeout>)
. When the timer thread enters the synchronised block, Task 2 has already been inserted, and the thread has missed the notification to wake up and to re-compute the timeout to 5 minutes. Hence, the timer thread will wait for 1 full hour, at which point Task 2 is 55 minutes late. Both tasks will still run, and in fact Task 2 will even run before Task 1, but not when it should have.
Cancellation
Most of us nowadays work in the context of existing web server applications which are meant to run forever. Because we are less exposed to this issue of shutting a system down, its importance is overlooked. However, cancellation and end-of-lifecycle behaviour for a standalone timer (or any executor service) is a critical feature.
What do we mean by cancellation? We want the timer to stop running tasks, and for the associated resources to be released, which will mean at least the timer thread terminates. We also want the cancellation to be reliable, predictable, safe. Well, that goes without saying. What those things mean in this context however, is still to be determined.
When a client asks for cancellation, the system might be in four major states.
- One task is running, but no more tasks are scheduled.
- One is running, and additional tasks are scheduled.
- No task running, and no more tasks scheduled.
- No task running, but some tasks scheduled.
This raises questions about what to exactly do when cancellation is requested.
- Do we try to cancel an actively running task?
- Is it even possible to cancel an actively running task?
- If yes to above, do we also then run the additional tasks that were already on the queue prior to cancellation?
- If yes to above, do we then also allow new tasks to be submitted if they are scheduled to run before the last one? (The last one being the latest task at the time of the cancellation request)
Starting in reverse, for question 3, the answer is no. For Timer
and also for ScheduledExecutorService
. Once cancellation is requested, no more tasks can be added, regardless of their scheduled time to run. Any further submissions result in an exception. We will follow that as well.
For question 2, that is up for debate. What would you, as a user, expect?
Turns out, Timer
will not run any task already scheduled if it is not already running. They are removed from the queue immediately – when cancel
returns, the queue is empty. In contrast, a ScheduledExecutorService
has two methods: shutdown()
and shutdownNow()
. The former runs all queued tasks. If a queued task is scheduled to run in 10 hours, then at least for those 10 hours the service will be running. The latter will discard them. We will follow Timer
‘s approach.
Lastly, regarding any possible task running at the time of cancellation, Timer
will not interfere. Meaning it will not even try to cancel it. The same is true for ScheduledExecutorService#shutdown()
. In comparison, shutdownNow()
will indeed try to cancel the task.
The wording try to cancel is appropriate. There is no mechanism in the Java/JVM world to force a running task to stop. There are only cooperative mechanisms. Ultimately it is up to the task (e.g., the Runnable
instance) itself to comply.
Getting on with it
The most straightforward approach is to use a flag; set by the client thread, and read by the timer thread. It must account for the fact that it will be accessed by multiple threads (at least one of the accesses being a write). In this situation, tagging a boolean as volatile
would be the least powerful solution. However, because of the way I set up things, the mainLoop()
method is not on the same scope as the cancel()
method, and makes this awkward. I use AtomicBoolean
instead; effectively a more powerful construct than needed.
Cancellation – First try. Builds on top of previous listing
import java.util.Comparator
+ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{BlockingQueue, PriorityBlockingQueue}
class CustomTimer private (
+ shutdown: AtomicBoolean,
lock: Object,
queue: BlockingQueue[(Runnable, Long)]
) {
+ def cancel(): Unit = shutdown.set(true)
def schedule(runnable: Runnable, delay: Long): Unit = {
+ if (shutdown.get()) throw new IllegalArgumentException("Timer already cancelled.")
val thisTask = (runnable, System.currentTimeMillis() + delay)
queue.put(thisTask)
if (queue.peek() == thisTask) {
lock.synchronized {
lock.notify()
}
}
}
}
object CustomTimer {
private def comparator: Comparator[(Runnable, Long)] =
new Comparator[(Runnable, Long)] {
def compare(o1: (Runnable, Long), o2: (Runnable, Long)): Int =
(o1._2 - o2._2).toInt
}
def apply(threadName: String): CustomTimer = {
+ val shutdown = new AtomicBoolean(false)
val lock: Object = new Object()
val queue = new PriorityBlockingQueue[(Runnable, Long)](10, comparator)
def mainLoop(): Unit =
- while (true) {
+ while (!shutdown.get()) {
val tuple @ (runnableTask, whenToRun) = queue.take()
val timeToWait = whenToRun - System.currentTimeMillis()
if (timeToWait <= 0) runnableTask.run()
else {
lock.synchronized {
lock.wait(timeToWait)
}
queue.offer(tuple)
}
}
val timerThread = new Thread(new Runnable { def run(): Unit = mainLoop() })
timerThread.setName(threadName)
timerThread.setDaemon(false)
timerThread.start()
- new CustomTimer(lock, queue)
+ new CustomTimer(shutdown, lock, queue)
}
}
This solution is decent enough, but a slight problem makes it not really fit for production.
Problema 1: Program runs forever
object SomeTests extends App {
val javaTimer = CustomTimer("FooBarTimer")
// This hack tries to ensure the timer thread enters the loop
// before cancellation is requested. Not core part of the example.
Thread.sleep(1)
javaTimer.cancel()
} // Thread main exits here. What about the other thread of the program?
The program above would never terminate.
The problem is that if cancellation is requested when the queue is empty – which will definitely happen because a client has no way of knowing the state of the queue – then the timer thread blocks at queue.take()
. The thread goes into a waiting state, from which it would not wake up unless a client submitted a new task, namely via the mechanism discussed earlier. Because the JVM does not shutdown until all non-daemon threads have finished, and because our timer thread is non-daemon and is still active (despite being blocked), then the program never terminates. This behaviour would cause enormous confusion in an even small code base.
Problem 2: Program runs for 10 hours and then does not run the task.
import scala.concurrent.duration._
object SomeTests extends App {
val javaTimer = CustomTimer("FooBarTimer")
javaTimer.schedule(
new Runnable { def run() = spin(10.second) },
10.hours.toMillis
)
javaTimer.cancel()
}
Similarly, for Problem 2, if there are still tasks to run when cancellation is requested, then the timer thread might be blocked at lock.wait(timeWait)
, waiting to run. In this scenario, the thread only has a chance to see the updated shutdown flag after timeWait
expires and then exit the loop, terminating the thread. If timeWait
is 10 hours, then the JVM program will exit after 10 hours.
This is similar to what happens for the equivalent method ScheduledThreadPoolExecutor#shutdown()
(but not for shutdownNow()
). Off course in that case, the thread would wait 10 hours but actually run the associated task, which is the documented behaviour for that method; whilst here we would wait 10 hours and never run it.
Thread interrupts
The underlying issue in both problems is the call to a blocking method prevents the flag from being read. We could solve the second problem – when the tread is held at lock.wait(timeWait)
– with relative ease. The issue there is that after cancel()
is called the lock will never wake prematurely. Therefore, we could make cancel()
act the same way as schedule()
: enter the lock, and then notify the timer thread.
Fixing Problem 1
def cancel(): Unit = {
shutdown.set(true)
lock.synchronized {
lock.notify()
}
}
Because now we enter the same lock that the timer thread is waiting one, we are able to wake up the thread prematurely which will then be able to read the updated flag. This is exactly the approach that Timer
takes.
In our case the solution is not fully water-proof though. It only works if the timer thread is already blocked at lock.wait(timeWait)
. If the timer thread is already inside a loop iteration but has not entered the lock yet, then the lock.notify()
from the client thread exits before having a chance to wake up the other thread, and we go back to the same problem. This is a race condition. The time window for this to occur is very small, but exists. It can be fixed by making a few smart changes on the mainLoop()
. Changes which are present on Timer
off course.
In any case, this doesn’t solve the first problem, when the queue is empty, and the timer thread blocked at queue.take()
.
Poison pills
A solution to that problem is to use poison pills.
This concept is a common mechanism for cancellation. In the Java world, and elsewhere. It is not used by Timer
though. It consists of an agreement between the clients and the timer thread that a special message signals the cancellation. This message is the Poison Pill
. In environments where there are several producers and consumers, it is harder to implement, but for our custom timer, with just one consumer thread (the timer thread), it is quite straightforward.
This approach works because by inserting the poison pill onto the queue upon cancel()
, if the timer thread is blocked at queue.take()
, it will wake up. This is, off course, the same mechanism discussed earlier on section Attempt 2 about scheduling earlier tasks . Additionally, if it isn’t blocked there, then the thread will exit by the other mechanism anyway.
Cancellation – Fixed
+ import com.cmhteixeira.concurrent.timer.CustomTimer.PoisonPill
import java.util.Comparator
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{BlockingQueue, PriorityBlockingQueue}
class CustomTimer private (
shutdown: AtomicBoolean,
lock: Object,
queue: BlockingQueue[(Runnable, Long)]
) {
def cancel(): Unit = {
shutdown.set(true)
+ queue.put((PoisonPill, 0))
+ lock.synchronized {
+ lock.notify()
+ }
}
def schedule(runnable: Runnable, delay: Long): Unit = {
if (shutdown.get())
throw new IllegalArgumentException("Timer already cancelled.")
val thisTask = (runnable, System.currentTimeMillis() + delay)
queue.put(thisTask)
if (queue.peek() == thisTask) {
lock.synchronized {
lock.notify()
}
}
}
}
object CustomTimer {
+ private object PoisonPill extends Runnable {
+ override def run(): Unit =
+ throw new RuntimeException("This will never happen.")
+ }
private def comparator: Comparator[(Runnable, Long)] =
new Comparator[(Runnable, Long)] {
def compare(o1: (Runnable, Long), o2: (Runnable, Long)): Int =
(o1._2 - o2._2).toInt
}
def apply(threadName: String): CustomTimer = {
val shutdown = new AtomicBoolean(false)
val lock: Object = new Object()
val queue = new PriorityBlockingQueue[(Runnable, Long)](10, comparator)
def mainLoop(): Unit =
while (!shutdown.get()) {
val tuple @ (runnableTask, whenToRun) = queue.take()
+ if (runnableTask.isInstanceOf[PoisonPill.type]) ()
+ else {
val timeToWait = whenToRun - System.currentTimeMillis()
if (timeToWait <= 0) runnableTask.run()
else {
lock.synchronized {
lock.wait(timeToWait)
}
queue.offer(tuple)
}
+ }
}
val timerThread = new Thread(new Runnable { def run(): Unit = mainLoop() })
timerThread.setName(threadName)
timerThread.setDaemon(false)
timerThread.start()
new CustomTimer(shutdown, lock, queue)
}
}
Timer
does not use poison pills. As it uses a custom-made queue (namely java.util.TaskQueue
on the same file as java.util.Timer
), which it locks explicitly the same way we lock our lock
object, it is able to solve the first problem and the second problem in one go only.
With the listing above, we achieve a respectable timer/scheduled-service. It is rough around the edges, with at least one race condition, but still fundamentally sound. All this in a fraction of the lines of code.
Thread interruption
There is an alternative solution to the problem: Thread interruption.
Thread interruption is a common way to implement cancellation on the JVM. Timer
does not use it but the ScheduledExecutorService
(and related java.util.concurrent.ThreadPoolExecutor
) do use it to implement the shutdownNow()
functionality.
In a way, this approach is more interesting than Poison pills. Poison pills are language agnostic, and does not leverage any peculiarities of the JVM itself. With thread interrupts we have an excuse to study a relatively unknown behaviour of Java, which is off course important for a Java/Scala/JVM developer studying concurrency, as this can be regarded as a primitive.
Java Threads have three methods that a client holding a reference to a Thread
object (often referred to as thread handle) can call:
public class Thread {
public void interrupt() {/* impl */}
public boolean isInterrupted() {/* impl */}
public static boolean interrupted() {/* impl */}
}
Here is the important part. The JVM’s documented behaviour states that when a given thread is blocked at native blocking methods, such as Object#wait()
, and some other different thread calls interrupt()
on the former, then the underlying system (i.e., the JVM) can detect it and is able to wake-up the former thread, throwing a InterruptedException
.
This behaviour solves both problems at once.
On the first problem, the timer thread is blocked at lock.wait(<timeWaiting>)
. Therefore, if another thread gets hold of the timer thread’s handle, and calls interrupt()
we can expect the timer thread will wake-up and then throw an exception. We can catch that exception and proceed with cancellation.
On the second problem, the timer thread is blocked at queue.take()
. Now, this is not one of the naive methods documented for interrupts. However, the documentation on the priority queue that we are using reads:
Java docs for method take() of java.util.concurrent.BlockingQueue<E>.
/*
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
E take() throws InterruptedException;
We can be sure that in this scenario the timer thread will also wake-up and throw an exception. If we were to explore how PriorityBlockingQueue
is able to guarantee the InterruptedException
is thrown when the thread is interrupted we would conclude that eventually it is making a call to a native method (e.g., Object#wait()
) as well, and therefore ultimately relying on the very same mechanism. This highlights the importance of studying low-level mechanisms. All other libraries must ultimately rely on them.
Cancellation – Fixed with thread interrupts
import java.util.Comparator
- import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{BlockingQueue, PriorityBlockingQueue}
class CustomTimer private (
- shutdown: AtomicBoolean,
+ threadHandle: Thread,
lock: Object,
queue: BlockingQueue[(Runnable, Long)]
) {
- def cancel(): Unit = shutdown.set(true)
+ def cancel(): Unit = threadHandle.interrupt()
def schedule(runnable: Runnable, delay: Long): Unit = {
- if (shutdown.get())
+ if (threadHandle.isInterrupted || !threadHandle.isAlive)
throw new IllegalArgumentException("Timer already cancelled.")
val thisTask = (runnable, System.currentTimeMillis() + delay)
queue.put(thisTask)
if (queue.peek() == thisTask) {
lock.synchronized {
lock.notify()
}
}
}
}
object CustomTimer {
private def comparator: Comparator[(Runnable, Long)] =
new Comparator[(Runnable, Long)] {
def compare(o1: (Runnable, Long), o2: (Runnable, Long)): Int =
(o1._2 - o2._2).toInt
}
def apply(threadName: String): CustomTimer = {
- val shutdown = new AtomicBoolean(false)
val lock: Object = new Object()
val queue = new PriorityBlockingQueue[(Runnable, Long)](10, comparator)
def mainLoop(): Unit = {
+ try {
- while (!shutdown.get()) {
+ while (!Thread.currentThread().isInterrupted) {
val tuple @ (runnableTask, whenToRun) = queue.take()
val timeToWait = whenToRun - System.currentTimeMillis()
if (timeToWait <= 0) runnableTask.run()
else {
lock.synchronized {
lock.wait(timeToWait)
}
queue.offer(tuple)
}
}
+ } catch {
+ case _: InterruptedException => ()
+ } finally {
+ queue.clear()
+ }
}
val timerThread = new Thread(new Runnable { def run(): Unit = mainLoop() })
timerThread.setName(threadName)
timerThread.setDaemon(false)
timerThread.start()
- new CustomTimer(shutdown, lock, queue)
+ new CustomTimer(timerThread, lock, queue)
}
}
Note
On the listing above, schedule()
checks threadHandle.isInterrupted || !threadHandle.isAlive
. Why is the first condition not sufficient? It turns out that when a thread is no longer running the JVM specification does not explicitly define isInterruped()
should return true, and some implementations of it (namely openJDK, and Oracle) return false. So that extra check might or might not be needed. It’s definitely not needed after Java 14, where the behaviour was finally corrected.
Common Issues
Delayed Tasks
Because Timer
and our custom timer only have one thread, if a task takes to long, the remaining tasks will not execute when they should. This should be self-evident from the previous detailed analysis.
Delay of execution of a task
import java.util.{Timer, TimerTask}
import scala.concurrent.duration._
object SomeTests extends App {
val javaTimer = new Timer("FooBarTimer")
javaTimer.schedule(
new TimerTask { def run() = spin(70.second) },
10.second.toMillis
)
javaTimer.schedule(
new TimerTask { def run() = spin(20.second) },
30.second.toMillis
)
}
The ScheduledThreadPoolExecutor
lessens this problem by having multiple threads that can pick up tasks. Lessens is the appropriate word, as the problem is not fully solved.
If the clients submit more tasks than there are threads, and each of these takes longer to run than the next is due, some task will start running later than intended.
This shortcoming is why Timer
(and similarly for ScheduledThreadPoolExecutor
) has two similar methods for scheduling periodic tasks:
This shortcoming is why Timer
(and similarly for ScheduledThreadPoolExecutor
) has two similar methods for scheduling periodic tasks:
schedule(<task>, <delay>, <period>)
scheduleAtFixedRate(<task>, <delay>, <period>)
.
For the first, the period
represents the time that should elapse between the end of one execution, to the beginning of the next execution of the task.
For the second, the period
represents the frequency (i.e., rate) at which the task should run.
These two concepts are not equivalent in the scenario where the thread is busy running other tasks and starts running the task later than it should. For example, if we schedule a task to run every 1 minute, and the thread is currently busy running a task that takes 1 hour, then after that hour, in the first case, our task runs only once (and every minute thereafter), whilst on the second case it runs 60 times to keep up.
While it is useful to know this, it is mostly irrelevant to the current analysis. We only care about the concurrency fundamentals of Timer
. Once that is known, the remaining behaviour is a matter of time, and thinking a bit harder about corner cases.
Exceptions destroy the timer
When a task being executed by Timer
throws an exception, the thread dies, and the Timer
itself behaves as if it had been cancelled; meaning no more tasks can be scheduled. In contrast, ScheduledThreadPoolExecutor
takes a more defensive approach, and is able to recuperate and continue running normally.
On our current solution, given that there is no try/catch
block in sight, it’s clear that our thread will exit as well. Our offense is that we have no mechanism to stop any further task submissions. Less concerning, but also bad, is the fact that we don’t remove the tasks on the queue; these will never be run, and are obsolete references.
Final comparison
java.util.Timer | Our custom timer | |
---|---|---|
Number of threads | 1 | 1 |
Thread created upon instantiation | ✓ | ✓ |
Thread started upon instantiation | ✓ | ✓ |
Shared data structure | Bespoke binary queue w/ locking | Existing blocking priority queue |
Relies on Object#wait(<timeout>) for scheduling? | ✓ | ✓ |
Object used for locking | Uses the queue directly. | Specific object, created with that objective |
Relies on System.currentTimeMillis() ? | ✓ | ✓ |
Timer
and the timer we have developed