Apr 052016

ADM

How to create thread pool in Java

  • 05 April 2016
  • ADM

 

 

 

How to create thread pool in Java - images/logos/java.jpg

 

A thread pool is a group of pre-instantiated, idle threads which stand ready to be given work. Is the same idea as for Object Pool and is usually used to ensure that only a specific number of threads are running on the same time.

JDK already provides classes in the java.util.concurrent package to handle the thread pool.

In the thread pool can be added any class that implements Runnable interface.

Example 1

First we need to define a runnable class that will be executed.

package com.admfactory.threads;

public class ThreadWorker implements Runnable {

    private String job;

    public ThreadWorker(String job) {
        this.job = job;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " - start.");
        doSomethingAwesome();
        System.out.println(Thread.currentThread().getName() + " - stop.");
    }

    /**
     * just wait 5 sec
     */
    private void doSomethingAwesome() {
        try {
            System.out.println("Executing job: " + this.job);
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.job;
    }

}

Here is the test program where we are creating the thread pool using Executors framework.

package com.admfactory.threads;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPool {
    public static void main(String[] args) {
        
        /** create a fixed thread pool with max 5 thread running simultaneous */
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        /** adding 10 threads to the pool */
        for (int i = 0; i < 10; i++) {
            Runnable worker = new ThreadWorker("Job" + i);
            /** add the thread to the pool and execute when possible */
            executor.execute(worker);
        }
        
        /** close the thread pool */
        executor.shutdown();
        
        /** wait until all thread are executed from the pool's queue. */
        while (!executor.isTerminated()) {
        }
        System.out.println("All threads finished successfully!");
    }
}

In above program, we are creating a fixed size thread pool of 5 worker threads. Then we are submitting 10 jobs to this pool. Since the pool size is 5, it will start working on 5 jobs and other jobs will be in wait state, as soon as one of the job is finished, another job from the wait queue will be picked up by worker thread and get's executed.

Output

pool-1-thread-1 - start.
pool-1-thread-3 - start.
pool-1-thread-4 - start.
Executing job: Job3
Executing job: Job2
pool-1-thread-2 - start.
Executing job: Job0
Executing job: Job1
pool-1-thread-5 - start.
Executing job: Job4
pool-1-thread-1 - stop.
pool-1-thread-4 - stop.
pool-1-thread-3 - stop.
pool-1-thread-2 - stop.
pool-1-thread-2 - start.
Executing job: Job8
pool-1-thread-5 - stop.
pool-1-thread-5 - start.
Executing job: Job9
pool-1-thread-3 - start.
Executing job: Job7
pool-1-thread-1 - start.
Executing job: Job5
pool-1-thread-4 - start.
Executing job: Job6
pool-1-thread-2 - stop.
pool-1-thread-3 - stop.
pool-1-thread-4 - stop.
pool-1-thread-1 - stop.
pool-1-thread-5 - stop.
All threads finished successfully!

Conclusions

If you check the output you will notice that first thread started the execution before all the threads being added in the pool. This is because the pool was empty and the pool started the first thread as soon as possible.

Exemple 2

Executors class provide simple implementation of ExecutorService using ThreadPoolExecutor. ThreadPoolExecutor class provides much more feature, like:

  • specify the number of threads that will be alive when we create ThreadPoolExecutor instance
  • limit the size of thread pool
  • create our own RejectedExecutionHandler implementation to handle the jobs that can't fit in the worker queue.

Here is the class to handle the rejected jobs.

package com.admfactory.threads;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " is rejected");
    }

}

rejectecExecution method will be executed when we try to add a new thread to the queue and the queue is full.

ThreadPoolExecutor class also provides method to check the state of the pool: pool size, active thread count and task count.

So to monitor these metrics we need to create other thread that reads and prints these data on a specific interval (in our case every 2 sec).

Here is the monitor thread class.

package com.admfactory.threads;

import java.util.concurrent.ThreadPoolExecutor;

public class MonitorThread implements Runnable {
    private ThreadPoolExecutor executor;

    private boolean run = true;

    public MonitorThread(ThreadPoolExecutor executor) {
        this.executor = executor;
    }

    public void shutdown() {
        this.run = false;
    }

    @Override
    public void run() {
        while (run) {
            /** check the content of the pool every 1 second */
            System.out.println(String.format("[monitor thread] [%d/%d] Active: %d, Completed: %d, Task Count: %d",
                    this.executor.getPoolSize(), this.executor.getCorePoolSize(), this.executor.getActiveCount(),
                    this.executor.getCompletedTaskCount(), this.executor.getTaskCount()));
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

After we define our auxiliary classes is time to put all at work.

package com.admfactory.threads;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class AdvancedThreadPool {
    public static void main(String[] args) throws InterruptedException {
        /** Create a RejectedExecutionHandler object */
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();

        /** Get the ThreadFactory implementation to use */
        ThreadFactory threadFactory = Executors.defaultThreadFactory();

        /** creating the ThreadPoolExecutor */
        ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
        
        /** start the monitoring thread */
        MonitorThread monitor = new MonitorThread(executorPool);
        Thread monitorThread = new Thread(monitor);
        monitorThread.start();

        /** add 10 jobs to the thread pool */
        for (int i = 0; i < 10; i++) {
            executorPool.execute(new ThreadWorker("job" + i));
        }

        /** wait 15 seconds before closing the pool. */
        Thread.sleep(15000);
        
        /** shut down the pool */
        executorPool.shutdown();

        /** wait 4 seconds and shut down the monitor thread */
        Thread.sleep(4000);
        monitor.shutdown();

        System.out.println("All threads finished successfully!");
    }
}

In above program, we are creating a pool with initial 2 threads, extendible to maximum 4 threads, with a queue of maximum 2 threads. The third parameter is the interval to check if there is any idle threads.

Output

pool-1-thread-1 - start.
Executing job: job0
pool-1-thread-2 - start.
Executing job: job1
pool-1-thread-3 - start.
Executing job: job4
job6 is rejected
pool-1-thread-4 - start.
Executing job: job5
job7 is rejected
job8 is rejected
job9 is rejected
[monitor thread] [0/2] Active: 0, Completed: 0, Task Count: 0
[monitor thread] [4/2] Active: 4, Completed: 0, Task Count: 6
[monitor thread] [4/2] Active: 4, Completed: 0, Task Count: 6
pool-1-thread-4 - stop.
pool-1-thread-3 - stop.
pool-1-thread-4 - start.
pool-1-thread-1 - stop.
pool-1-thread-2 - stop.
Executing job: job2
pool-1-thread-3 - start.
Executing job: job3
[monitor thread] [4/2] Active: 2, Completed: 4, Task Count: 6
[monitor thread] [4/2] Active: 2, Completed: 4, Task Count: 6
pool-1-thread-4 - stop.
pool-1-thread-3 - stop.
[monitor thread] [4/2] Active: 0, Completed: 6, Task Count: 6
[monitor thread] [4/2] Active: 0, Completed: 6, Task Count: 6
[monitor thread] [4/2] Active: 0, Completed: 6, Task Count: 6
[monitor thread] [0/2] Active: 0, Completed: 6, Task Count: 6
[monitor thread] [0/2] Active: 0, Completed: 6, Task Count: 6
All threads finished successfully!

Conclusions

Checking the output you can notice the monitor thread output, also you can see that some of the jobs were rejected. Now you may wonder why four jobs were rejected on two size queue? Remember, the pool was created with initial two threads with the possibility to be extended to four threads, it means that from ten thread that we are trying to add to the queue four of them are running and two are in the queue, resulting four threads being rejected.

 

 

 

References