线程池
Java线程池有两个比较重要的概念:
- 线程数量:指当前线程池中,可以同时工作的线程数
- 任务队列:该队列可以接收实现Runnable接口的对象,并通过线程池自动管理执行
线程池的创建通过Executors工厂类提供,JAVA默认提供了四种快速创建线程池的方法:
Executors.newSingleThreadExecutor()
线程数量为一
线程池中只有一个线程工作,只有当这个线程完成后,才会有下一个线程进入,可以更安全的替代直接使用Thread类.
Executors.newFixedThreadPool(int threads)
线程数量可指定
固定数量的线程池,较为常用,但由于队列没有限制长度,可能导致队列无线增大.
Executors.newCachedThreadPool()
线程数量不确定
这种线程池不会限制线程池中线程的数量,当有新任务进入线程池中时,如果当前线程池中的线程都繁忙,则会添加一个新线程处理这个任务
当可执行的任务小于当前线程池中的线程数时,会回收空闲的线程(keepAliveTime参数,默认60秒不执行任务)
这种类型的线程池,常用于CPU占用小的任务,如Netty的NIO任务
由于没有限制线程数目,这种线程池存在瞬时提交大量异步任务,从而耗尽系统资源的问题
Executors.newScheduledThreadPool()
计划型线程池,可以配置定时调度或延迟调度任务的线程池,这种线程池主要用来处理特定类型的业务.
对于以上四种快速构建线程池的方式,都可以通过标准构造器ThreadPoolExecutor创建.
ThreadPoolExecutor
// 使用标准构造器构造一个普通的线程池
public ThreadPoolExecutor(int corePoolSize, // 核心线程数,即使线程空闲(Idle),也不会回收
int maximumPoolSize, // 线程数的上限
long keepAliveTime, // 线程最大空闲(Idle)时长
TimeUnit unit,
BlockingQueue<Runnable> workQueue, // 任务的排队队列
ThreadFactory threadFactory, // 新线程的产生方式
RejectedExecutionHandler handler) // 拒绝策略
ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
可以看出计划型线程池的实现,是通过DelayedWorkQueue实现.
submit&execute
具体区别同笔记(1),execute入参是Runnable,且没有返回值;submit入参是Callable,可以通过返回的Future类实例控制当前线程.
线程调度
为了避免任务队列长度无线导致的资源耗尽问题,通常会设置一个指定长度的队列,此时线程调度的核心流程如下:
- 如果当前工作线程数量小于核心线程数量(corePoolSize),执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程。
- 如果线程池中总的任务数量大于核心线程(corePoolSize)数量,新接收的任务将被加入阻塞队列中,一直到阻塞队列已满。在核心线程数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程。
- 当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光。
- 在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出maximumPoolSize。如果线程池的线程总数超过maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略。
对于上面的流程描述,需要注意,如果线程池参数配置不合理,会导致严重的队列等待问题.
线程工厂ThreadFactory
/**
* An object that creates new threads on demand. Using thread factories
* removes hardwiring of calls to {@link Thread#Thread(Runnable) new Thread},
* enabling applications to use special thread subclasses, priorities, etc.
*
* <p>
* The simplest implementation of this interface is just:
* <pre> {@code
* class SimpleThreadFactory implements ThreadFactory {
* public Thread newThread(Runnable r) {
* return new Thread(r);
* }
* }}</pre>
*
* The {@link Executors#defaultThreadFactory} method provides a more
* useful simple implementation, that sets the created thread context
* to known values before returning it.
* @since 1.5
* @author Doug Lea
*/
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
线程工厂是一个接口,源码的注释中已经写了一个最简单的实现.
任务阻塞队列
- ArrayBlockingQueue:有界队列,必须要设置队列大小,数组实现
- LinkedBlockingQueue:链表实现,可以不设置大小(Integer.Max_VALUE,相当于无界队列),也可以设置队列长度
- PriorityBlockingQueue:支持优先级的无界阻塞队列,要求入参必须实现了Comparable接口,该队列根据compareTo方法判断元素的优先级.
@Slf4j
public class AppTest {
@Test
public void testPriorityBlockingQueue() {
//Usage-1
User u1 = new User(1);
User u2 = new User(2);
User u3 = new User(3);
User u4 = new User(4);
PriorityBlockingQueue<User> priorityBlockingQueue = new PriorityBlockingQueue();
// 乱序加入队列
priorityBlockingQueue.offer(u1);
priorityBlockingQueue.offer(u3);
priorityBlockingQueue.offer(u4);
priorityBlockingQueue.offer(u2);
while (!priorityBlockingQueue.isEmpty()) {
User u = priorityBlockingQueue.poll();
System.out.println(u.getIndex());
}
//Usage-2
People p = new People();
PriorityBlockingQueue<People> priorityBlockingQueueP = new PriorityBlockingQueue();
// 没有实现Comparable接口的类,加入队列会抛出下面异常
// java.lang.ClassCastException: cannot be cast to class java.lang.Comparable
priorityBlockingQueueP.offer(p);
}
/**
* 需要实现Comparable接口
*/
class User implements Comparable<User> {
@Getter
int index;
public User(int index) {
this.index = index;
}
@Override
public int compareTo(User o) {
return this.index - o.index;
}
}
class People {
@Getter
int index;
}
}
输出
1
2
3
4
java.lang.ClassCastException: class com.joshua.test.AppTest$People cannot be cast to class java.lang.Comparable...
- DelayQueue:Executors.newScheduledThreadPool计划型线程池实现用到的队列
- SynchronousQueue:Executors.newCachedThreadPool用到的任务队列,这个队列没有长度,当有任务进入时,队列会阻塞,直到其他线程将队列中任务取出(调用take方法).
@Test
public void testSynchronousQueue() {
Object o = new Object();
final SynchronousQueue synchronousQueue = new SynchronousQueue();
System.out.println("start");
try {
Thread thread = new Thread(() -> {
try {
System.out.println("put obj");
synchronousQueue.put(o); //同步队列put后会阻塞,等待主线程(也可以是其他线程)take请求之后,继续向下执行.
System.out.println("thread end");
} catch (Exception e) {
log.error("send error", e);
}
});
thread.start(); // 启动线程
Thread.sleep(500);
System.out.println("take obj");
synchronousQueue.take();
System.out.println("end");
Thread.sleep(200); // 确保thread类执行完毕
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
输出:
start
put obj
take obj
end
thread end
钩子方法
ThreadPoolExecutor有三个钩子方法,这三个方法可以在任务(Runnable)执行的前,后,任务结束前对当前的任务线程进行操作(通常情况是操作ThreadLocal类).
// Runnable实例执行前
protected void beforeExecute(Thread t, Runnable r) { }
// Runnable实例执行后,抛出异常时
protected void afterExecute(Runnable r, Throwable t) { }
// Runnable实例的线程结束前
protected void terminated() { }
源码调用位置
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
terminated的调用:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 如果信号量是TIDYING
try {
terminated(); // 调用钩子方法
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // set TERMINATED信号量
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
拒绝策略
以下两种情况下会触发拒绝策略:
- 线程池已经被关闭
- 工作队列已满且maximumPoolSize已满
ThreadPoolExecutor提供了四种默认拒绝策略:
AbortPolicy:(默认策略)
若任务无法进入任务队列,会抛出RejectedExecutionException异常
DiscardPolicy:
直接抛出无法进入队列的任务,不会抛出异常
DiscardOldestPolicy:
抛弃队列中最老的任务(也就是最早进入队列的任务)
CallerRunsPolicy:
若任务无法进入任务队列,将使用提交任务的线程自己执行该任务,不会使用线程池中的线程执行新任务
自定义策略:
通过实现RejectedExecutionHandler接口的rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法,自定义其操作
新建ThreadPool例子
public void testNewThreadPool() {
int corePoolSize = 4; // 核心线程数
int maximumPoolSize = 4; // 最大线程
long keepAliveTime = 30; // 超时
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS, // 秒级(超时)
new LinkedBlockingQueue<>(100), // 长度100的任务队列
new ThreadFactory() { // 自定义工厂
static AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "joshua-" + atomicInteger.getAndIncrement());
}
},
new RejectedExecutionHandler() { // 自定义拒绝策略
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("{} rejectedExecution error,{}", r, executor);
}
}
);
}
本文由 momoker 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Apr 14,2023