Java ThreadPoolExecutor example
Java 5 has introduced new concurrent API called “Executor frameworks” to make programmer life easy. It simplifies design and development of multi-thread applications. It consists of mainly Executor, ExecutorService interface and ThreadPoolExecutor class which implements both interfaces i.e. Executor and ExecutorService. ThreadPoolExecutor class provide the implementation of thread pool. We will understand more about it in the later part of the tutorial.
Why do we need Executor framework?
When we create a simple multi threading application, we create Runnable objects and construct Thread object using Runnable, We need to create, execute and manage thread. It may be difficult for us to do that. Executor Framework does it for you. It is responsible for creating, executing and managing the thread and not only this, it improves the performance of the application too.
When you follow task per thread policy, you create a new thread for each task then if system is highly overloaded, you will get out of memory error and your system will fail. If you use ThreadPoolExecutor , you won’t create thread for new task. You will assign task to a limited number of threads once thread completes one task, it will be given another task.
Core interface of Executor framework is Executor. It has a method called “execute”.
1
2
3
4
5
|
public interface Executor {
void execute(Runnable command);
}
|
There is another interface called ExecutorService which extends Executor interface. It can be termed as Executor that provides methods that can control termination and methods that can produce a Future for tracking the progress of one or more asynchronous tasks. It has method such as submit, shutdown, shutdownNow etc.
ThreadPoolExecutor is actual implementation of ThreadPool. It extends AbstractThreadPoolExecutor which implements ExecutorService interface. You can create ThreadPoolExecutor from factory methods of Executor class. It is recommended a way to get an instance of ThreadPoolExecutor.
There are 4 factory methods in Executors class which can be used to get an instance of ThreadPoolExecutor. We are using Executors’ newFixedThreadPool to get an instance of ThreadPoolExecutor.
Example:
1
2
3
|
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
|
Here are four factory method present in Executors class.
newFixedThreadPool: This method returns thread pool executor whose maximum size(let’s say n threads) is fixed.If all n threads are busy performing the task and additional tasks are submitted, then they will have to be in the queue until thread is available.
newCachedThreadPool: this method returns an unbounded thread pool. It doesn’t have maximum size but if it has less number of tasks, then it will tear down unused thread. If thread has been unused for 1 mins(keepAliveTime), then it will tear it down.
newSingleThreadedExecutor: this method returns an executor which is guaranteed to use the single thread.
newScheduledThreadPool: this method returns a fixed size thread pool that can schedule commands to run after a given delay, or to execute periodically.
Let’s create a basic example of ThreadPoolExecutor and we will use newFixedThreadPool to create a instance of ThreadPoolExecutor.
Let’s create a Task. Here Task will be to read different files and process them.
Let’s create a Task. Here Task will be to read different files and process them.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package org.arpit.java2blog.bean;
public class FetchDataFromFile implements Runnable{
private final String fileName;
public FetchDataFromFile(String fileName) {
super();
this.fileName = fileName;
}
@Override
public void run() {
try {
System.out.println("Fetching data from "+fileName+" by "+Thread.currentThread().getName());
Thread.sleep(5000); // Reading file
System.out.println("Read file successfully: "+fileName+" by "+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getFileName() {
return fileName;
}
}
|
Let’s create ThreadPoolExecutor which will consume above task and process it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package org.arpit.java2blog;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class ThreadPoolExecutorMain {
public static void main(String args[]) {
// Getting instance of ThreadPoolExecutor using Executors.newFixedThreadPool factory method
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
FetchDataFromFile fdff = new FetchDataFromFile("File " + i);
System.out.println("A new file has been added to read : " + fdff.getFileName());
// Submitting task to executor
threadPoolExecutor.execute(fdff);
}
threadPoolExecutor.shutdown();
}
}
|
When you run above program, you will get below output:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
A new file has been added to read : File 0
A new file has been added to read : File 1
A new file has been added to read : File 2
Fetching data from File 0 by pool-1-thread-1
Fetching data from File 1 by pool-1-thread-2
A new file has been added to read : File 3
Fetching data from File 2 by pool-1-thread-3
A new file has been added to read : File 4
Fetching data from File 3 by pool-1-thread-4
A new file has been added to read : File 5
Fetching data from File 4 by pool-1-thread-5
A new file has been added to read : File 6
A new file has been added to read : File 7
A new file has been added to read : File 8
A new file has been added to read : File 9
Read file successfully: File 1 by pool-1-thread-2
Read file successfully: File 3 by pool-1-thread-4
Fetching data from File 5 by pool-1-thread-4
Read file successfully: File 4 by pool-1-thread-5
Read file successfully: File 2 by pool-1-thread-3
Read file successfully: File 0 by pool-1-thread-1
Fetching data from File 8 by pool-1-thread-3
Fetching data from File 7 by pool-1-thread-5
Fetching data from File 6 by pool-1-thread-2
Fetching data from File 9 by pool-1-thread-1
Read file successfully: File 5 by pool-1-thread-4
Read file successfully: File 7 by pool-1-thread-5
Read file successfully: File 6 by pool-1-thread-2
Read file successfully: File 8 by pool-1-thread-3
Read file successfully: File 9 by pool-1-thread-1
|
We have used new newFixedThreadPool, so when we have submitted 10 task, 5 new threads will be created and will execute 5 tasks. Other 5 tasks will wait in wait queue. As soon as any task will be completed by thread, another task will be picked by this thread and will execute it.
Using constructor of ThreadPoolExecutor:
If you want to customize creation of ThreadPoolExecutor, you can use its constructors too.
1
2
3
4
5
6
7
8
9
|
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue ,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) ;
|
corePoolSize: corePoolSize is the number of threads to keep in the pool, even if they are idle
MaximumPoolSize: the maximum number of threads to allow in the pool
keepAliveTime: When you have more threads already available than corePoolSize, then keepAliveTime is time upto which that thread will wait for task before terminating.
unit: time unit is for keepAliveTime
workQueue: workQueue is the BlockingQueue which holds the tasks before execution.
threadFactory: Factory which is used to create a new Thread.
handler : RejectedExecutionHandler which is used in case execution is block or queue is full. Lets create a RejectedExecutionHandler for handling rejected task.
MaximumPoolSize: the maximum number of threads to allow in the pool
keepAliveTime: When you have more threads already available than corePoolSize, then keepAliveTime is time upto which that thread will wait for task before terminating.
unit: time unit is for keepAliveTime
workQueue: workQueue is the BlockingQueue which holds the tasks before execution.
threadFactory: Factory which is used to create a new Thread.
handler : RejectedExecutionHandler which is used in case execution is block or queue is full. Lets create a RejectedExecutionHandler for handling rejected task.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
package org.arpit.java2blog.bean;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RejectTaskHandler implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
FetchDataFromFile ffdf=(FetchDataFromFile) r;
System.out.println("Sorry!! We won't be able to read :"+ffdf.getFileName());
}
}
|
Lets change ThreadPoolExecutorMain.java to below code to make use of ThreadPoolExecutor constructor.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package org.arpit.java2blog.bean;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorMain {
public static void main(String args[]) {
// Wait queue is used to store waiting task
BlockingQueue queue=new LinkedBlockingQueue(4);
// Thread factory to create new threads
ThreadFactory threadFactory=Executors.defaultThreadFactory();
// Rejection handler in case task get rejected
RejectTaskHandler rth=new RejectTaskHandler();
// ThreadPoolExecutor constructor to create its instance
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 10L, TimeUnit.MILLISECONDS, queue,
threadFactory,rth
);
for (int i = 1; i <= 10; i++) {
FetchDataFromFile fdff = new FetchDataFromFile("File " + i);
System.out.println("A new file has been added to read : " + fdff.getFileName());
// Submitting task to executor
threadPoolExecutor.execute(fdff);
}
threadPoolExecutor.shutdown();
}
}
|
When you run above program, you will get below output:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
A new file has been added to read : File 1
A new file has been added to read : File 2
A new file has been added to read : File 3
A new file has been added to read : File 4
Fetching data from File 1 by pool-1-thread-1
A new file has been added to read : File 5
A new file has been added to read : File 6
A new file has been added to read : File 7
Sorry!! We won't be able to read :File 7
A new file has been added to read : File 8
Sorry!! We won't be able to read :File 8
A new file has been added to read : File 9
Sorry!! We won't be able to read :File 9
A new file has been added to read : File 10
Sorry!! We won't be able to read :File 10
Fetching data from File 6 by pool-1-thread-2
Read file successfully: File 1 by pool-1-thread-1
Read file successfully: File 6 by pool-1-thread-2
Fetching data from File 2 by pool-1-thread-1
Fetching data from File 3 by pool-1-thread-2
Read file successfully: File 3 by pool-1-thread-2
Read file successfully: File 2 by pool-1-thread-1
Fetching data from File 4 by pool-1-thread-2
Fetching data from File 5 by pool-1-thread-1
|
If you notice here File 7, File 8, File 9 and File 10 got rejected. Lets understand why they got rejected. Max pool size in ThreadPoolExecutor’s Constructor is 2, so when we submitted 10 tasks to thread pool, 2 threads got created and started processing 2 tasks and 4 tasks got queued in LinkedBlockingQueue, so once LinkedBlockingQueue became full, rest tasks got rejected.
How to decide the size of thread pool:
You should not hardcode size of thread pool. It should be provided by configuration or calculated from Runtime.availableProcessors.
Thread size should not be too big or too small. If you choose thread pool size that is too big, then it will overload the system and will not work properly. If you choose thread pool size too small, it will affect throughput and performance.