多线程学习笔记(3)

in 四月桐花 with 0 comment

线程池

Java线程池有两个比较重要的概念:

  1. 线程数量:指当前线程池中,可以同时工作的线程数
  2. 任务队列:该队列可以接收实现Runnable接口的对象,并通过线程池自动管理执行
    线程池的创建通过Executors工厂类提供,JAVA默认提供了四种快速创建线程池的方法:

Executors.newSingleThreadExecutor()

线程数量为一
线程池中只有一个线程工作,只有当这个线程完成后,才会有下一个线程进入,可以更安全的替代直接使用Thread类.

Executors.newFixedThreadPool(int threads)

线程数量可指定
固定数量的线程池,较为常用,但由于队列没有限制长度,可能导致队列无线增大.

Executors.newCachedThreadPool()

线程数量不确定
这种线程池不会限制线程池中线程的数量,当有新任务进入线程池中时,如果当前线程池中的线程都繁忙,则会添加一个新线程处理这个任务
当可执行的任务小于当前线程池中的线程数时,会回收空闲的线程(keepAliveTime参数,默认60秒不执行任务)
这种类型的线程池,常用于CPU占用小的任务,如Netty的NIO任务
由于没有限制线程数目,这种线程池存在瞬时提交大量异步任务,从而耗尽系统资源的问题

Executors.newScheduledThreadPool()

计划型线程池,可以配置定时调度或延迟调度任务的线程池,这种线程池主要用来处理特定类型的业务.

对于以上四种快速构建线程池的方式,都可以通过标准构造器ThreadPoolExecutor创建.

ThreadPoolExecutor

// 使用标准构造器构造一个普通的线程池
public ThreadPoolExecutor(int corePoolSize, // 核心线程数,即使线程空闲(Idle),也不会回收
                          int maximumPoolSize, // 线程数的上限
                          long keepAliveTime, // 线程最大空闲(Idle)时长
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, // 任务的排队队列
                          ThreadFactory threadFactory, // 新线程的产生方式
                          RejectedExecutionHandler handler) // 拒绝策略

ScheduledThreadPoolExecutor

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

可以看出计划型线程池的实现,是通过DelayedWorkQueue实现.

submit&execute

具体区别同笔记(1),execute入参是Runnable,且没有返回值;submit入参是Callable,可以通过返回的Future类实例控制当前线程.

线程调度

为了避免任务队列长度无线导致的资源耗尽问题,通常会设置一个指定长度的队列,此时线程调度的核心流程如下:

  1. 如果当前工作线程数量小于核心线程数量(corePoolSize),执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程。
  2. 如果线程池中总的任务数量大于核心线程(corePoolSize)数量,新接收的任务将被加入阻塞队列中,一直到阻塞队列已满。在核心线程数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程
  3. 当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光。
  4. 在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出maximumPoolSize。如果线程池的线程总数超过maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略。

对于上面的流程描述,需要注意,如果线程池参数配置不合理,会导致严重的队列等待问题.

线程工厂ThreadFactory

/**
 * An object that creates new threads on demand.  Using thread factories
 * removes hardwiring of calls to {@link Thread#Thread(Runnable) new Thread},
 * enabling applications to use special thread subclasses, priorities, etc.
 *
 * <p>
 * The simplest implementation of this interface is just:
 * <pre> {@code
 * class SimpleThreadFactory implements ThreadFactory {
 *   public Thread newThread(Runnable r) {
 *     return new Thread(r);
 *   }
 * }}</pre>
 *
 * The {@link Executors#defaultThreadFactory} method provides a more
 * useful simple implementation, that sets the created thread context
 * to known values before returning it.
 * @since 1.5
 * @author Doug Lea
 */
public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

线程工厂是一个接口,源码的注释中已经写了一个最简单的实现.

任务阻塞队列

@Slf4j
public class AppTest {
    @Test
    public void testPriorityBlockingQueue() {
        //Usage-1
        User u1 = new User(1);
        User u2 = new User(2);
        User u3 = new User(3);
        User u4 = new User(4);
        PriorityBlockingQueue<User> priorityBlockingQueue = new PriorityBlockingQueue();
        // 乱序加入队列
        priorityBlockingQueue.offer(u1);
        priorityBlockingQueue.offer(u3);
        priorityBlockingQueue.offer(u4);
        priorityBlockingQueue.offer(u2);
        while (!priorityBlockingQueue.isEmpty()) {
            User u = priorityBlockingQueue.poll();
            System.out.println(u.getIndex());
        }
        //Usage-2
        People p = new People();
        PriorityBlockingQueue<People> priorityBlockingQueueP = new PriorityBlockingQueue();
        // 没有实现Comparable接口的类,加入队列会抛出下面异常
        // java.lang.ClassCastException: cannot be cast to class java.lang.Comparable
        priorityBlockingQueueP.offer(p);
    }

    /**
     * 需要实现Comparable接口
     */
    class User implements Comparable<User> {
        @Getter
        int index;

        public User(int index) {
            this.index = index;
        }

        @Override
        public int compareTo(User o) {
            return this.index - o.index;
        }
    }

    class People {
        @Getter
        int index;
    }
}

输出

1
2
3
4

java.lang.ClassCastException: class com.joshua.test.AppTest$People cannot be cast to class java.lang.Comparable...
@Test
public void testSynchronousQueue() {
    Object o = new Object();
    final SynchronousQueue synchronousQueue = new SynchronousQueue();
    System.out.println("start");
    try {
        Thread thread = new Thread(() -> {
            try {
                System.out.println("put obj");
                synchronousQueue.put(o); //同步队列put后会阻塞,等待主线程(也可以是其他线程)take请求之后,继续向下执行.
                System.out.println("thread end");
            } catch (Exception e) {
                log.error("send error", e);
            }
        });
        thread.start(); // 启动线程
        Thread.sleep(500);
        System.out.println("take obj");
        synchronousQueue.take();
        System.out.println("end");
        Thread.sleep(200); // 确保thread类执行完毕
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

输出:

start
put obj
take obj
end
thread end

钩子方法

ThreadPoolExecutor有三个钩子方法,这三个方法可以在任务(Runnable)执行的前,后,任务结束前对当前的任务线程进行操作(通常情况是操作ThreadLocal类).

// Runnable实例执行前
protected void beforeExecute(Thread t, Runnable r) { }
// Runnable实例执行后,抛出异常时
protected void afterExecute(Runnable r, Throwable t) { }
// Runnable实例的线程结束前
protected void terminated() { }

源码调用位置

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

terminated的调用:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 如果信号量是TIDYING
                try {
                    terminated(); // 调用钩子方法
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0)); // set TERMINATED信号量
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

拒绝策略

以下两种情况下会触发拒绝策略:

  1. 线程池已经被关闭
  2. 工作队列已满且maximumPoolSize已满
    ThreadPoolExecutor提供了四种默认拒绝策略:

AbortPolicy:(默认策略)

若任务无法进入任务队列,会抛出RejectedExecutionException异常

DiscardPolicy:

直接抛出无法进入队列的任务,不会抛出异常

DiscardOldestPolicy:

抛弃队列中最老的任务(也就是最早进入队列的任务)

CallerRunsPolicy:

若任务无法进入任务队列,将使用提交任务的线程自己执行该任务,不会使用线程池中的线程执行新任务

自定义策略:

通过实现RejectedExecutionHandler接口的rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法,自定义其操作

新建ThreadPool例子

public void testNewThreadPool() {
    int corePoolSize = 4; // 核心线程数
    int maximumPoolSize = 4; // 最大线程
    long keepAliveTime = 30; // 超时
    ExecutorService executorService = new ThreadPoolExecutor(
            corePoolSize,
            maximumPoolSize,
            keepAliveTime,
            TimeUnit.SECONDS, // 秒级(超时)
            new LinkedBlockingQueue<>(100), // 长度100的任务队列
            new ThreadFactory() { // 自定义工厂
                static AtomicInteger atomicInteger = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "joshua-" + atomicInteger.getAndIncrement());
                }
            },
            new RejectedExecutionHandler() { // 自定义拒绝策略
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    log.error("{} rejectedExecution error,{}", r, executor);
                }
            }
    );
}