合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
## Java专题十三(1):线程 [TOC] ### 13.1.1.线程的定义 **进程是资源分配的最小单位,线程是CPU调度的最小单位** 电脑上开一个QQ应用程序就是一个进程,你用QQ与**甲**打电话的同时跟与**乙**文字聊天,打电话和文字聊天都是一个线程,这里使用2个线程来完成用户不同的操作,一个QQ进程包管理着多个线程 **线程生命周期**: 1. 新建(New)状态,当程序使用new关键字创建了一个线程之后,该线程就处于新建状态,此时仅由JVM为其分配内存,并初始化其成员变量的值 2. 就绪(Runnable)状态,当线程对象调用了start()方法之后,该线程处于就绪状态。Java虚拟机会为其创建方法调用栈和程序计数器,等待调度运行 3. 运行(Running)状态,如果处于就绪状态的线程获得了CPU,开始执行run()方法的线程执行体,则该线程处于运行状态 4. 阻塞(Blocked)状态,当处于运行状态的线程失去所占用资源之后,便进入阻塞状态 5. 死亡(Dead)状态,线程出错或者线程正常结束 ![](https://img.kancloud.cn/e8/6f/e86f0c4600a837599d181caabdeda37c_632x485.png) ### 13.1.2.线程的创建 #### 13.1.2.1,实现Runnable接口 ``` class PrimeRun implements Runnable { long minPrime; PrimeRun(long minPrime) { this.minPrime = minPrime; } public void run() { // compute primes larger than minPrime } } PrimeRun p = new PrimeRun(143); new Thread(p).start(); ``` #### 13.1.2.2.继承Thread类 ``` class PrimeThread extends Thread { long minPrime; PrimeThread(long minPrime) { this.minPrime = minPrime; } public void run() { // compute primes larger than minPrime } } PrimeThread p = new PrimeThread(143); p.start(); ``` #### 13.1.2.3.使用Callable和Future ``` class PrimeThread implements Callable<Long> { long minPrime; PrimeThread(long minPrime) { this.minPrime = minPrime; } public Long call() throws Exception { return this.minPrime; } } PrimeThread p = new PrimeThread(143); FutureTask task = new FutureTask(p); new Thread(task).start(); System.out.println(task.get()); ``` ### 13.1.3.线程池(ThreadPoolExecutor类) - **线程池的参数:** ~~~ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ~~~ **corePoolSize**:保存在线程池的线程数量,即使它们是空闲的 **maximumPoolSize**:允许加入到池中的最大线程数量 **keepAliveTime**:当线程数量比corePoolSize多时,为多余空闲线程终止之前等待新任务的最长时间 **workQueue**:线程池工作队列 **threadFactory**:创建线程的工厂类 - **ThreadPoolExecutor其它参数** **workers**:工人集合 **workerCount**:有效线程的数量 **runState**:线程池的状态,running, shutting down等 1. RUNNING :接收新任务,处理已经入队的任务 2. SHUTDOWN : 不接收新任务,但处理已经入队的任务 3. STOP : 不接收新任务,不处理已经入队的任务,并且中断正在进行中的任务 4. TIDYING : 所有任务都完成 ,workerCount为零,该状态下的线程会调用terminated()方法 5. TERMINATED : 完成方法terminated()的执行 ![](https://img.kancloud.cn/53/8f/538f575e9a263821217136d63cfd9b13_577x533.png) 为了更好理解线程池,先定义以下几个概念: > 产品:实现了Runnable接口的线程类,用户提交的Task任务 > 工人:ThreadPoolExecutor#Worker类,负责新产品的包装工作和待包装产品线的产品包装工作 > 包装:线程Runnable接口的实现类中run()方法实际执行的代码 > 待包装产品线:workQueue队列,Task任务等待被执行 > 进口包装机:核心线程池,数量为corePoolSize > 所有包装机:最大线程池,数量为maximumPoolSize **=>线程池是怎么工作的呢?** 当有新的产品来时,首先查看进口包装机是否都被占着(`workerCountOf(c) < corePoolSize`),未被占着,就安排工人包装新产品,都被占着的话,接着看待包装产品线是否已满(`workQueue.offer(command)`),未满的话加入新产品,等待包装,否则,再看是否所有包装机都被用完了(`addWorker(command, false)`),没有,则安排工人包装新产品(`addWorker()`),否则不包装该产品(`reject(command)`)。 ![](https://img.kancloud.cn/c6/b2/c6b21944bb870cb13a4823b8c7204747_1490x659.png) ```java // ThreadPoolExecutor#execute public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } ``` **=>工人是怎么工作的?** 首先在`ThreadPoolExecutor#addWorker`方法添加一个`Worker`对象(实现了`Runnable`接口),添加成功的话,启动`w.thread`线程,`Worker`内部调用`runWorker`方法,在`runWorker`方法,一方面做新产品包装工作(`task != null`->`task.run()`),另一方面做待包装产品线的产品包装工作(`(task = getTask()) != null`->`task.run()`) ```java // ThreadPoolExecutor#addWorker private boolean addWorker(Runnable firstTask, boolean core) { boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // ... if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } // ThreadPoolExecutor#Worker private final class Worker extends AbstractQueuedSynchronizer implements Runnable { public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; // ... while (task != null || (task = getTask()) != null) { // ... task.run(); // ... } } } // ThreadPoolExecutor#getTask private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ```