计算机 · 2021年12月19日 0

Java多线程

Java中创建线程的两种方式

  • 继承Thread
  • 实现Runnable接口

Thread类的几个方法:

  • Thread#start()
  • Thread#currentThread()
  • Thread#sleep()
  • Thread#setName()

线程优先级

Thread#setPriority()

  • Thread.MIN_PRIORITY
  • Thread.NORM_PRIORITY
  • Thread.MAX_PRIORITY

线程yield()

Daemon Threads

  • JVM在结束主线程之前会等待非守护线程,但是不会等待守护线程
  • 由守护线程创建的线程仍然是守护线程,由非守护线程创建的线程仍然是非守护线程。调用main入口方法的线程是非守护线程
  • 可以通过apiThread#setDaemon(true/false)设置线程是否是守护线程
  • System.exit(0)会直接结束JVM而不论是否还有守护/非守护线程在运行

等待线程结束

  • Thread#join()
  • Thread#join(long millis)
  • Thread#join(long millis, int nanos)

线程中断(interrupt)

Java线程中断的两种情况:

  • 线程通过Object.wait(..),Thread.join(..),Thread.sleep(..)方法处于阻塞状态时,如果有另外的线程调用了此线程的interrupt方法,那么此线程就会收到一个InterruptedException,知道自己不要再呆呆地阻塞,形势有变化了
  • 另一种是线程正在执行自己的代码,另外有线程调用了这个线程的interrupt方法,这个时候如果此线程代码在中断事件发生后有调用isInterrupted()方法检查自己的中断状态,那么这个线程就可以知道有中断事件发生,否则这个中断状态就一直保存在那里等待这个线程的发现

避免多个线程同时执行一个方法

synchronized关键字

隐式锁(Intrinsic locks)与synchronized关键字

synchronized关键字的实现原理其实是每个synchronized关键字关联了一个对象,而这些对象都有一个隐式的锁。

  • Intrinsic lock是和对象关联在一起的而不是方法
    如果几个synchronized方法关联的是同一个对象,那么这些方法就有可能因为一个方法在被调用而阻塞。
public class MultipleSyncMethodsDemo {

    public static void main (String[] args) throws InterruptedException {
        MultipleSyncMethodsDemo demo = new MultipleSyncMethodsDemo();
        Thread thread1 = new Thread(() -> {
            System.out.println("thread1 before call "+ LocalDateTime.now());
            demo.syncMethod1("from thread1");
            System.out.println("thread1 after call "+LocalDateTime.now());
        });
        Thread thread2 = new Thread(() -> {
            System.out.println("thread2 before call "+LocalDateTime.now());
            demo.syncMethod2("from thread2");
            System.out.println("thread2 after call "+LocalDateTime.now());
        });

        thread1.start();
        thread2.start();
    }

    private synchronized void syncMethod1 (String msg) {
        System.out.println("in the syncMethod1 "+msg+" "+LocalDateTime.now());
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private synchronized void syncMethod2 (String msg) {
        System.out.println("in the syncMethod2 "+msg+" "+LocalDateTime.now());
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • Intrinsic lock是可重入锁Intrinsic lock are acquired on a per-thread basis rather than per-method call basis. Once a thread has acquired the lock it can internally call other methods without reacquiring the lock. The Lock will only be release when the thread is done with the entry method invocation.
public class ReentrantDemo {

    public static void main (String[] args) throws InterruptedException {
        ReentrantDemo demo = new ReentrantDemo();
        Thread thread1 = new Thread(() -> {
            System.out.println("thread1 before call "+ LocalDateTime.now());
            demo.syncMethod1("from thread1");
            System.out.println("thread1 after call "+LocalDateTime.now());
        });
        Thread thread2 = new Thread(() -> {
            System.out.println("thread2 before call "+LocalDateTime.now());
            demo.syncMethod2("from thread2");
            System.out.println("thread2 after call "+LocalDateTime.now());
        });

        thread1.start();
        thread2.start();
    }

    private synchronized void syncMethod1 (String msg) {
        System.out.println("in the syncMethod1 "+msg+" "+LocalDateTime.now());
        syncMethod2("from method syncMethod1, reentered call");
    }

    private synchronized void syncMethod2 (String msg) {
        System.out.println("in the syncMethod2 "+msg+" "+LocalDateTime.now());
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 对于static方法的synchronized关键字,关联的是代表这个类的对象而不是一个类实例。
  • synchronized关键字也可以用于代码块,只是此时需要显示地指明一个非空的用于同步的对象。这种做法有两个好处:
    • 更精细地控制需要同步的代码块;
    • 可以自己选择使用哪个对象的intrinsic lock进行同步
  • Generally speaking we should always use objects locks which our code maintains instead of relying on JVM managed objects.
public class SyncBlockStringLock {
    private Map<String, Object> locks = new HashMap<>();

    private static final File rootFolder = new File("d:\\test");

    static {
        if (!rootFolder.exists()) {
            rootFolder.mkdir();
        }
    }

    public static void main (String[] args) {
        SyncBlockStringLock obj = new SyncBlockStringLock();
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                String path = rootFolder.getAbsolutePath() + File.separatorChar + i;
                obj.writeData(path, " thread1 data " + i);
                obj.readData(path);
            }
        });

        Thread thread2 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                String path = rootFolder.getAbsolutePath() + File.separatorChar + i;
                obj.writeData(path, " thread2 data " + i);
                obj.readData(path);
            }
        });

        thread1.start();
        thread2.start();
    }

    private void writeData (String path, String data) {
        synchronized (getLock(path)) {
            try {
                Files.write(Paths.get(path), data.getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void readData (String path) {
        synchronized (getLock(path)) {
            String s = null;
            try {
                s = new String(Files.readAllBytes(Paths.get(path)));
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println(s);
        }
    }

    private Object getLock (String path) {
        if (!locks.containsKey(path)) {
            locks.put(path, new Object());
        }

        return locks.get(path);
    }
}

死锁

wait/notify/notifyAll

对于Intrinsic lock,wait/notify/notifyAll这写api是与之相关的。

  • 调用wait/notify/notifyAll时必须已经获得了对象的Intrinsic lock,否则会报错。所以这些操作通常都在synchronized方法/代码块中被调用
  • 调用wait后线程会自动释放相应的intrinsic lock
  • notify只唤醒一个阻塞的线程,notifyAll是唤醒所有阻塞的线程

饥饿与公平性

活锁

A livelock is a recursive situation where two or more threads would keep repeating a particular code logic. The intended logic is typically giving opportunity to the other threads to proceed in favor of ‘this’ thread.

A real-world example of livelock occurs when two people meet in a narrow corridor, and each tries to be polite by moving aside to let the other pass, but they end up swaying from side to side without making any progress because they both repeatedly move the same way at the same time.

Java线程的状态

public Thread.State getState()

  • NEW
    Thread has not yet started
  • RUNNABLE
    Thread is currently running without blocking/waiting in its run method
  • BLOCKED
    Thread is blocked from entering a synchronized block/method, waiting for the monitor lock to be released by the other thread
  • WAITING
    Thread is waiting due to one of these calls:Object.wait(),Thread.join(),LockSupport.park()
  • TIMED_WAITING
    Thread is waiting due to one of these timeout based method calls:Thread.sleep(long millis), Object.wait(long millis), Thread.join(long millis), LockSupport.parkNanos(Object blocker, long nanos), LockSupport.parkUntil(Object blocker, long nanos)
  • TERMINATED
    A thread has exited from its run() method

happens-before relation

要想多线程的Java能够正常工作,确定happens-before关系是非常重要的:

Happens-before relationship is a guarantee that action performed by one thread is visible to another action in different thread.
Happens-before defines a partial ordering on all actions within the program. To guarantee that the thread executing action Y can see the results of action X (whether or not X and Y occur in different threads), there must be a happens-before relationship between X and Y. In the absence of a happens-before ordering between two operations, the JVM is free to reorder them as it wants (JIT compiler optimization).

这个happens-before不仅意味着时间上的先后,更是意味着后一个行为是在前一个行为的结果基础上(比如内存修改)进行的。

建立happens-before关系的几种方法

  1. Single thread rule:Each action in a single thread happens-before every action in that thread that comes later in the program order.
  2. Monitor lock rule:An unlock on a monitor lock (exiting synchronized method/block) happens-before every subsequent acquiring on the same monitor lock.
  3. Volatile variable rule:A write to a volatile field happens-before every subsequent read of that same field. Writes and reads of volatile fields have similar memory consistency effects as entering and exiting monitors (synchronized block around reads and writes), but without actually aquiring monitors/locks.
  4. Thread start rule:A call to Thread.start() on a thread happens-before every action in the started thread. Say thread A spawns a new thread B by calling threadA.start(). All actions performed in thread B’s run method will see thread A’s calling threadA.start() method and before that (only in thread A) happened before them.
  5. Thread join rule:All actions in a thread happen-before any other thread successfully returns from a join on that thread. Say thread A spawns a new thread B by calling threadA.start() then calls threadA.join(). Thread A will wait at join() call until thread B’s run method finishes. After join method returns, all subsequent actions in thread A will see all actions performed in thread B’s run method happened before them.
  6. Transitivity:If A happens-before B, and B happens-before C, then A happens-before C.

记住这几条规则的方法:首先先来后到和传递性是两个很自然的规则,其次对于线程来说线程开始的操作肯定在线程的run()方法之前,线程的run()方法肯定在另外的线程join()这个线程之前,最后Java为了让开发者能够方便实现多线程程序的同步,引入了锁和volatile。于是这6个规则就这样记住了。

Java Memory Model

由于真实硬件的内存结构、处理器优化和JIT编译器的优化实在太复杂,Java Memory Model(JSR-133)被设计出来隐藏这些真实情况的细节,Java开发者们只需要在这一层抽象上进行思考即可。

需要一个Java Memory Model的理由:

  • Variable Visibility Problem
    在多核/多处理器的硬件环境下,内存往往是具有多层次缓存结构的。为了性能上的优化,对于一个共享变量,一个核/处理器往往读到/写到的是相应的缓存。如果显示添加指令更新/同步这个共享变量,那么一个核/处理器对这个变量值做的修改其他核/处理器可能就不能及时的看到。
  • Code reordering Problem
    Java允许Javac编译器或者即时编译编译对代码做优化,这些优化可能会重新排列开发者的代码执行顺序;而内存层次结构的存在也可能使得代码看起来像重新排列过一样。对于单线程程序来讲这种重新排列代码的优化没有问题,但是对于多线程程序却可能使程序产生异常行为。
  • Sequential Consistency

Java Memory Model之上正确实现多线程同步:

  1. 和同步、共享变量有关的操作都具有happens-before关系。
  2. 没有数据竞争(Data race)。数据竞争应该用Intrinsic Locks避免。
    数据竞争(Data race):一个程序中存在两处没有happens-before关系的冲突访问。When a program contains two conflicting accesses that are not ordered by a happens-before relationship, it is said to contain a data race.冲突访问(conflicting access):两个操作访问同一个共享变量,且其中至少有一个是写操作。Two actions using the same shared field or array variable are said to be conflicting if at least one of the accesses is a write.
  • 用synchronized同步的代码块
    Intrinsic Locks做了两件事情:
    1. 保证了对于共享变量/代码块的互斥访问;
    2. 获得和释放Intrinsic Locks的操作锁定和刷新了与各线程相对应的本地处理器缓存,解决了可见性问题(visibility problems)。
  • volatile关键字
    用volatile关键字修饰共享变量时起到了和synchronized类似的作用,但是:
    1. volatile不能保证一个组合型操作或者多个操作的原子性;
      譬如,下面的代码是有问题的:
package com.logicbig.example;

public class CompositeActionWithVolatile {
    static volatile int c;

    public static void main (String[] args) throws InterruptedException {

        for (int t = 0; t < 10; t++) {
            c = 0;

            Thread thread1 = new Thread(() -> {
                for (int i = 0; i < 1000; i++) {
                    c++;
                }
            });

            Thread thread2 = new Thread(() -> {
                for (int i = 0; i < 1000; i++) {
                    c++;
                }
            });

            thread1.start();
            thread2.start();

            thread1.join();
            thread2.join();

            System.out.println("counter value = " + c);
        }

    }
}

像这种c++的操作不是原子操作,是分读取值,加一,写回值三个步骤执行的,volatile关键字不能保证变量c的正确同步(估计volatile需要在完成加一再写会后才会触发同步)。

对于这种情况就应该使用synchronized关键字或者使用java.util.concurrent.atomic package里的操作具有原子性的类。

  1. volatile关键字不解决代码可能被重新排列的问题;
public class ReorderExample {
    private int a = 1;
    private boolean flg = true;

    public void method1 () {
        flg = false;
        a = 2;
    }

    public void method2 () {
        if (flg) {
            System.out.println("a = " + a);
        }
    }

    public static void main (String[] args) {

        for (int i = 0; i < 100; i++) {

            ReorderExample reorderExample = new ReorderExample();

            Thread thread1 = new Thread(() -> {
                reorderExample.method1();
            });

            Thread thread2 = new Thread(() -> {
                reorderExample.method2();
            });

            thread1.start();
            thread2.start();
        }
    }
}
 像这种问题用volatile是解决不了的,因为volatile只是解决的可见性问题(visibility problem),让一个线程对变量做的改动对另一个线程可见。这种情况的正确处理方式是加锁:  
public class ReorderExampleWithSynchronized {
    private int a = 1;
    private boolean flg = true;

    public synchronized void method1 () {
        flg = false;
        a = 2;
    }

    public synchronized void method2 () {
        if (flg) {
            System.out.println("a = " + a);
        }
    }

    public static void main (String[] args) {

        for (int i = 0; i < 100; i++) {

            ReorderExampleWithSynchronized reorderExample =
                                             new ReorderExampleWithSynchronized();

            Thread thread1 = new Thread(() -> {
                reorderExample.method1();
            });

            Thread thread2 = new Thread(() -> {
                reorderExample.method2();
            });

            thread1.start();
            thread2.start();
        }
    }
}

用volatile时可以让代码更简洁,相当于让JVM区管理同步而不是我们自己;如果一段代码或者某个方法里频繁访问某个volatile变量是可能导致程序性能下降的,倒不如使用synchronized修饰这个代码快或者方法。

  • Final域
    由于final域通常在被初始化后就不会改变值,因此final域没有可见性和重排代码的问题。
    一个变量不能同时用final和volatile两个关键字修饰。

使用java.util.Timer调度(安排)任务

可以使用线程安全的java.util.Timer类来安排任务在指定延迟时间后执行,或者定时重复。

ThreadLocal

对于ThreadLocal类的对象,每个线程对ThreadLocal对象的引用都会变成对该ThreadLocal对象的一个与该线程相关联的拷贝的应用(因此,ThreadLocal对象一般是private static, 用来保存和线程相关的状态)。

JSR166:Concurrency Utilities

  • java.util.concurrent
  • java.util.concurrent.atomic
  • java.util.concurrent.locks

Executor、Trhead Pool和ForkJoin Pool

Executor Framework

java.util.concurrent包里和executor相关的接口:

java.util.concurrent包里和executor相关的类:

Executor interface

package java.util.concurrent;
    public interface Executor {
    void execute(Runnable command);
    }

Executor#executor(command)实际上等效于(new Thread(command)).start();

ExecutorService interface

作为Executor的子类,ExecutorService类添加了一些用于管理Executor本身和要执行的任务的生命周期的接口:

package java.util.concurrent;

import java.util.List;
import java.util.Collection;

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • ExecutorService之submit方法
    类似于Executor#execute方法,ExecutorService提供了更灵活的submit方法
  • ExecutorService之Callable接口
    submit方法不仅接受Runnable类型参数,也接受Callable类型参数:
package java.util.concurrent;

public interface Callable<V> {
    V call() throws Exception;
}
  • ExecutorService之Future方法
    submit方法返回的是Future类型的值,用于获取Callable的返回值和管理Callable/Runable任务的状态
package java.util.concurrent;

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • ExecutorService之批量执行任务的接口
    关于Bulk Submission的方法就是ExecutorService定义中和Collection有关的那些接口
  • ExecutorService之生命周期管理的接口
    看方法名字就知道哪些接口是用来干这个的了

Executors类

基于工厂模式,Executors类提供了用于创建ExecutorService类实例的静态方法(一般不用new创建ExecutorService类实例)。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class SingleThreadExecutorExample {

    public static void main (String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<?> future = executorService.submit(new Runnable() {
            @Override
            public void run () {
                System.out.println("task running");
            }
        });

        executorService.shutdown();
    }
}

Executors类提供的创建ExecutorService类实例的方法:

Executors methodDescription
newCachedThreadPool(…)Creates a cached thread pool’s ExecutorService that can create new threads as needed or reused cached thread.
newFixedThreadPool(…)Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
newScheduledThreadPool(…)Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
newSingleThreadExecutor(…)Creates an Executor that uses a single worker thread operating off an unbounded queue. Above, We saw an example already.
newSingleThreadScheduledExecutor(..)Creates a single-threaded executor that can schedule commands to run after a given delay, or to execute periodically.
newWorkStealingPool(..)Creates a work-stealing thread pool using all available processors as its target parallelism level. May use multiple queues to reduce contention.

Thread Pool

  • 线程池通过用提前实例化的线程来执行应用程序级别的任务,减少了创建线程时的额外开销。
  • 另一方面,线程池限制了同时执行任务的线程数(超过线程池线程数的任务被放入队列中等待执行,而不是每来一个任务就立马创建一个新线程执行)从而避免了系统资源被用尽的情况出现。

关于线程池的一些概念

  • Number of threads(N)
    线程池中正在执行任务的线程数与idle线程数总和
    getPoolSize()的返回值
  • Core pool size(C)
    线程池在把新任务放入队列中等待执行之前最多可以同时存在的线程数。
    在线程池里同时存在的线程数达到C之前,对于新来的任务,线程池会新建线程来执行任务,而不是让idle threads来执行这些任务。只有在线程池的线程数达到C时,才会重用idle threads。
    getCorePoolSize()的返回值
  • Maximum pool size(M)
    对于有限队列(bounded queues),在等待任务数填满了队列时,线程池可以再进一步扩充可以同时存在的线程数达到M。在线程池已经将同时存在的线程数提升到了M之后,等待队列仍然是满的话,那么对于新加进来的任务线程池只有拒绝(reject)了。
  • keepAliveTime
    idle threads在空闲超过指定时间后会被结束。也可以用allowCoreThreadTimeOut(boolean)选择让空闲线程一直存在。
  • workQueue
    必须是一个BlockingQueue。有界队列,ArrayBlockingQueue;无界队列,LinkedBlockingQueue。或者可以用hand-offs queue, SynchronousQueue,用SynchronousQueue时会尝试立即执行加入的任务,如果线程池线程数还没有达到M,就增加线程池线程数,如果线程池线程数达到了M,就拒绝新任务。对于这种SynchronousQueue,CorePoolSize就显得没有意义了。

Java Thread Pool的实现

ThreadPoolExecutor类的构造函数:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
  • handler
    在线程池拒绝加入新的任务时,线程池会调用handler进行相应处理:
package java.util.concurrent;

public interface RejectedExecutionHandler {
   void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
  • ThreadPoolExecutor.AbortPolicy
    public static class AbortPolicy implements RejectedExecutionHandler {

      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
      }
    }
  • ThreadPoolExecutor.CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
  • ThreadPoolExecutor.DiscardPolicy
    public static class DiscardPolicy implements RejectedExecutionHandler {

      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

      }
    }
  • ThreadPoolExecutor.DiscardOldestPolicy
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
         if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
       }
      }
    }
  • threadFactory
package java.util.concurrent;

public interface ThreadFactory {
  Thread newThread(Runnable var1);
}

ThreadPoolExecutor里的其他有用方法:

  • int prestartAllCoreThreads()
    提前创建线程而不是默认的在需要时创建线程。返回实际创建的线程数。
    *boolean prestartCoreThread()*只会创建一个线程。如果所有的线程都已创建完毕,那么返回false。

手动使用ThreadPoolExecutor类的构造函数的例子:

public class ThreadPoolExecutorExample {

    public static void main (String[] args) {
        createAndRunPoolForQueue(new ArrayBlockingQueue<Runnable>(3), "Bounded");
        createAndRunPoolForQueue(new LinkedBlockingDeque<>(), "Unbounded");
        createAndRunPoolForQueue(new SynchronousQueue<Runnable>(), "Direct hand-off");
    }

    private static void createAndRunPoolForQueue (BlockingQueue<Runnable> queue,
                                                                      String msg) {
        System.out.println("---- " + msg + " queue instance = " +
                                                  queue.getClass()+ " -------------");

        ThreadPoolExecutor e = new ThreadPoolExecutor(2, 5, Long.MAX_VALUE,
                                 TimeUnit.NANOSECONDS, queue);

        for (int i = 0; i < 10; i++) {
            try {
                e.execute(new Task());
            } catch (RejectedExecutionException ex) {
                System.out.println("Task rejected = " + (i + 1));
            }
            printStatus(i + 1, e);
        }

        e.shutdownNow();

        System.out.println("--------------------\n");
    }

    private static void printStatus (int taskSubmitted, ThreadPoolExecutor e) {
        StringBuilder s = new StringBuilder();
        s.append("poolSize = ")
         .append(e.getPoolSize())
         .append(", corePoolSize = ")
         .append(e.getCorePoolSize())
         .append(", queueSize = ")
         .append(e.getQueue()
                  .size())
         .append(", queueRemainingCapacity = ")
         .append(e.getQueue()
                  .remainingCapacity())
         .append(", maximumPoolSize = ")
         .append(e.getMaximumPoolSize())
         .append(", totalTasksSubmitted = ")
         .append(taskSubmitted);

        System.out.println(s.toString());
    }

    private static class Task implements Runnable {

        @Override
        public void run () {
            while (true) {
                try {
                    Thread.sleep(1000000);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }
}

不过我们一般不直接调用这些ThreadPoolExecutor的构造函数,而是调用Executors的工厂方法:

Executors methodThreadPoolExecutor的等价调用
newCachedThreadPool()new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
newFixedThreadPool(int nThreads)new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
newSingleThreadExecutor()new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

Scheduled Thread Pools

注意scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);的delay是跟在每个Task后面的Delay。
ScheduledExecutorService接口里的方法返回的都是ScheduledFuture
接口类型,该接口实现了FutureDelayComparable三个接口。

ScheduledThreadPoolExecutor类

ScheduledThreadPoolExecutor(int corePoolSize) 

ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) 

ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) 

ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

与ThreadPoolExecutor的构造函数相比,ScheduledThreadPoolExecutor少了很多构造时参数,比如没有要求指定队列,因为固定使用了unbounded queue。

While this class inherits from ThreadPoolExecutor, a few of the inherited tuning methods are not useful for it. In particular, because it acts as a fixed-sized pool using corePoolSize threads and an unbounded queue, adjustments to maximumPoolSize have no useful effect. Additionally, it is almost never a good idea to set corePoolSize to zero or use allowCoreThreadTimeOut because this may leave the pool without threads to handle tasks once they become eligible to run.

Fork/join framework


因为ForkJoinPool不是继承自ThreadPoolExecutor,所以对于ForkJoinPool而言没有前面讲的那些core pool size, queuing, maximum pool size等概念。

获取ForkJoinPool实例的方法:

  • 静态方法
    public static ForkJoinPool commonPool()
  • 构造函数
ForkJoinPool()  
ForkJoinPool(int parallelism)  
ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode)

提交任务的方法:

  • invoke(ForkJoinTask),同步
  • execute(ForkJoinTask),异步
  • submit(ForkJoinTask),异步

关于ForkJoinTask类:
一个代表可以分而治之的任务的抽象类。

  • ForkJoinTask#fork()方法:
    将此任务提交到ForkJoinPool进行异步计算。
  • ForkJoinTask#join()方法: 阻塞等待计算结果。

ForkJoinTask的三个子类:

  • RecursiveAction
    protected abstract void compute()
  • RecursiveTask
    protected abstract V compute()
  • *CountedCompleter`(since Java 8)

几种同步机制

  • Semaphore
    Semaphore#acquire()和Semaphore#release()
  • CountDownLatch
  • CyclicBarrier
    与CountDownLatch有些类似,只是在CyclicBarrier的值变为0之后,又会变回初始值。这样CyclicBarrier就可以用来对几个线程进行隔一段时间的同步工作。