This code walk through is not sequential nor atomic (per method).
We will try to pick and choose code which will get us a working version of executors.
In each subsequent step, we will add one feature or address a problem.
Thus more complicated aspects like locking are discussed in the latter half.
Instead of starting from constructors, lets start from the heart of Executors i.e. running a submitted task.
The basic idea is, tasks submitted are added to a queue, and threads keep picking tasks from that queue and execute them.
Based on the configuration and current number of threads, this process is slightly tweaked.
Adding tasks and new threads
The tasks are assigned to the threads in 3 ways
If thread pool count < core pool size, then create new worker thread and assign task to it.
If thread pool count >= core pool size, add task to the queue (will be retrieved by worker thread later)
If task queue is bounded and full, then add create new worker thread and assign task to it.
The second argument to addWorker method just indicates the pool-size (true = corePoolSize, and false = maxPoolSize).
So if number of threads are more than that size, new worker thread is not added, and method returns false.
Worker threads creation
In books and tutorials related to Java Threads, we are shown a new Thread instance is generally created by passing argument of a Runnable. In this class, the Runnable (Worker) creates its own thread from the thread-factory (see constructor) and holds a reference to the same.
Once the worker is created, it is added to set of workers, representing the thread-pool. The worker is then started
and its status is returned to the caller.
Worker Threads executing tasks
As seen in code earlier, the run method of a Worker calls method runWorker.
This method keeps taking tasks from the queue and calls run() for each task explicitly.
Note: Process of getting tasks from queue involves more checks which we will see later.
Reduce or Maintain pool size post-task-completion
When the worker runs out of tasks to execute (queue is empty), then it calls processWorkerExit,
which can reduce the size of the pool or can create new worker thread.
New worker thread is created based on few conditions
If core threads are allowed to timeout, and there are no pending tasks, worker is not replaced.
If current count of workers is greater than minimum required (corePoolSize), worker is not replaced.
If both conditions above fail, then new worker is created.
Reduce or Maintain pool size pre-task-acceptance
While getting new tasks to execute, we may need to stop the worker based on few more conditions.
If shutdown is requested for the executor
If there are no tasks available and there is atleast 1 thread to execute tasks submitted later
If there are more worker threads than maximum allowed
The polling for new task, is either blocking or timeout-based depending on the thread-count (if its more than core pool size) or allowCoreThreadTimeOut (if core threads are allowed to timeout and thus reduce in number).
The tasks which are rejected (due to queue full, or poolSize being full) are handled using rejectionHandlers
Caller runs Policy - calls the run method with current context (i.e. thread of the caller itself)
Discard Policy - do nothing (swallows the fact that it was unable to run the task)
Discard Oldest Policy - removes first task from queue (oldest), and enqueues the new task
Using a common main lock
Most of the methods in this class use a ReentrantLock called mainLock to perform synchronized access
to the common state.
Using ctl lock
This class uses an AtomicInteger to maintain combined state of 2 fields
Number of worker threads (29 bits)
Run state of the executor (2 bits)
Updating of the worker thread count and the state of the executor is then performed using compareAndSet operations.
Worker class extends AbstractQueuedSynchronizer to gain locking mechanism.
Many of the JDK Concurrent utilities like Semaphore, ReEntrantLock, CountdownLatch extend the aforementioned class.
Worker class needs to override only few methods, to get the desired locking (for its state).
Executor shutdown (with states)
When a shutdown is requested for the executor,
it changes its state to SHUTDOWN/STOP to stop accepting new tasks
changes it state to TIDYING
tries to shut all running threads (on best-effort-basis)
changes its state to TERMINATED
if shutdownNow is called, returns the list of pending tasks
When the executor is shutdown, it will change its state and ask all threads to interrupt.
Note that worker threads run the tasks using its lock, thus executor cannot interrupt such threads.
It can only interrupt threads which are idle.
The worker threads before starting to execute the tasks, check the state of the executor,
and interrupt the thread.
Thus stopping the executor is on a best effort basis. Any threads which are running long
running tasks might take a while to respond to interrupt (or to complete the task).
Default thread factory (Executors)
The default thread factory, creates non-daemon threads, with same priority as calling thread (or Normal priority if not set)
and sets appropriate names for the threads to distinguish between threads of different pools.
Types of executors
Now that we understand how task queue, keepAlive time, thread factory, corePoolSize and maxPoolSize are used by the executor, creating various types of Executors is easy.
Fixed size of worker threads
Single worker thread
Unlimited max threads
For scheduled tasks
There is also an WorkStealingPool which uses ForkJoinPool, that is a separate class not covered in this post.
~2000 lines of code for ThreadPoolExecutor class looks overwhelming at first.
Though, if we start with only the essentials and
keep adding each layer, it all starts to make sense. Dare I say, the code looks quite straight-forward.
There is a sense of elegance and beauty in its simplicity.