余子越的博客
Toggle navigation
余子越的博客
主页
计算机网络
大数据分析
系统与工具
编程之路
容器引擎
作者
归档
标签
JAVA线程基础
2020-12-20 17:34:58
16
0
0
yuziyue
[TOC] # 一. 创建线程 - 一个线程可以等待另一个线程直到其运行结束。例如,main线程在启动t线程后,可以通过`t.join()`等待t线程结束后再继续运行。如果去掉 `t.join()` 后,main主线程和新创建的子线程就是同步运行的。 - 守护线程 - 守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。 - 在守护线程中,编写代码要注意:守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。 - 使用 `t.setDaemon(true)` 设置线程为守护线程。 ## 1.1 继承自Thread类 ``` public class Main { public static void main(String[] args) { Thread t = new MyThread(); t.start(); t.join(); } } class MyThread extends Thread { @Override public void run() { System.out.println("start new thread!"); } } ``` ## 1.2 实现Runnable接口 - 标准写法 ``` public class Main { public static void main(String[] args) { Thread t = new Thread(new MyRunnable()); t.start(); t.join(); } } class MyRunnable implements Runnable { @Override public void run() { System.out.println("start new thread!"); } } ``` - 简化写法 `Runnable`本身是接口,接口是不能实例化的,所以这里实际上是定义了一个实现了`Runnable`接口的匿名类,并且通过new实例化该匿名类,然后转型为`Runnable` ``` public class Main { public static void main(String[] args) { Thread t = new Thread(new Runnable(){ @Override public void run() { System.out.println("start new thread!"); } }); t.start(); t.join(); } } ``` ## 1.3 使用Lambda表达式 ``` public class Main { public static void main(String[] args) { Thread t = new Thread(() -> { System.out.println("start new thread!"); }); t.start(); t.join(); } } ``` ## 1.4 线程的优先级 可以对线程设定优先级,优先级高的线程被操作系统调度的优先级较高,操作系统对高优先级线程可能调度更频繁,但决不能通过设置优先级来确保高优先级的线程一定会先执行。 ``` Thread.setPriority(int n) // 1~10, 默认值5 ``` # 二. 线程状态 在Java程序中,一个线程对象只能调用一次start()方法启动新线程,线程启动以后存在着如下状态。 - `NEW`:新创建的线程,尚未执行; - `RUNNABLE`:运行中的线程,正在执行run()方法的Java代码; - `BLOCKED`:运行中的线程,因为某些操作被阻塞而挂起; - `WAITING`:运行中的线程,因为某些操作在等待中; - `TIMED_WAITING`:运行中的线程,因为执行sleep()方法正在计时等待; - `TERMINATED`:线程已终止,因为run()方法执行完毕。 <br> 运行中的线程可能会终止,线程终止的原因有: - 线程正常终止:run()方法执行到return语句返回; - 线程意外终止:run()方法因为未捕获的异常导致线程终止; - 对某个线程的Thread实例调用stop()方法强制终止(强烈不推荐使用)。 # 三. 线程中断 - 中断线程就是其他线程给该线程发一个信号,该线程收到信号后结束执行run()方法,使得自身线程能立刻结束运行。 - 捕获 InterruptedException 判断是否被中断 ``` public class Main { public static void main(String[] args) throws Exception { Thread t = new MyThread(); t.start(); Thread.sleep(5000); t.interrupt(); t.join(); System.out.println("main end"); } } class MyThread extends Thread { public void run() { try { while (true) { Thread.sleep(10); } } catch (InterruptedException e) { } } } ``` - 使用 isInterrupted 判断是否被中断 ``` public class Main { public static void main(String[] args) throws Exception { Thread t = new MyThread(); t.start(); Thread.sleep(5000); t.interrupt(); t.join(); System.out.println("main end"); } } class MyThread extends Thread { public void run() { while (!isInterrupted()) { } } } ``` - 使用running标志位判断是否中断 - 标志位boolean running是一个线程间共享的变量。线程间共享变量需要使用volatile关键字标记,确保每个线程都能读取到更新后的变量值。 - volatile关键字的目的是告诉虚拟机: - 每次访问变量时,总是获取主内存的最新值; - 每次修改变量后,立刻回写到主内存。 ``` public class Main { public static void main(String[] args) throws InterruptedException { HelloThread t = new HelloThread(); t.start(); Thread.sleep(10); t.running = false; } } class HelloThread extends Thread { public volatile boolean running = true; public void run() { int n = 0; while (running) { n ++; System.out.println(n + " hello!"); } System.out.println("end!"); } } ``` # 四. 线程同步 ## 4.1 使用synchronized锁 多线程对共享变量的操作需要加锁,否则变量的值是不确定的。使用 `synchronized(lockObject) { ... }` 加锁,缺点是性能下降。 ### 4.1.1 在业务逻辑里面使用锁 ``` public class Main { public static void main(String[] args) throws Exception { var add = new AddThread(); var dec = new DecThread(); add.start(); dec.start(); add.join(); dec.join(); System.out.println(Counter.count); } } class Counter { public static final Object lock = new Object(); public static int count = 0; } class AddThread extends Thread { public void run() { for (int i=0; i<10000; i++) { synchronized(Counter.lock) { Counter.count += 1; } } } } class DecThread extends Thread { public void run() { for (int i=0; i<10000; i++) { synchronized(Counter.lock) { Counter.count -= 1; } } } } ``` ### 4.1.2 将锁封装到方法里面去 将锁封装到方法里面,好处是避免了业务逻辑混乱,避免了自己选择锁的对象(`Counter.lock`)的问题。这种将锁封装的方式,叫做线程安全的。比如Java标准库的java.lang.StringBuffer也是线程安全的。 - 下面的示例中需要关注的是 - synchronized (this) 表示对当前实例加锁,也就是说多实例的情况是并行的。 ``` public class Main { public static void main(String[] args) throws Exception { var c1 = new Counter(); var c2 = new Counter(); var t1 = new Thread(() -> { var i = 0; for (; i < 1000000; i++) { c1.add(1); c2.add(1); } }); var t2 = new Thread(() -> { var i = 0; for (; i < 1000000; i++) { c1.dec(1); c2.dec(1); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(c1.get() + " and " + c2.get()); } } class Counter { private int count = 0; public void add(int n) { synchronized (this) { count += n; } } public void dec(int n) { synchronized (this) { count -= n; } } public int get() { return count; } } ``` <br> - 如果整个方法需要加锁,则可以这样简写。它表示整个方法都必须用this实例加锁。 ``` public synchronized void add(int n) { // 锁住this count += n; } // 解锁 ``` - 如果对一个静态方法添加synchronized修饰符,它锁住的将是这个类,相当于`synchronized(Counter.class)` ``` public synchronized static void test(int n) { ... } ``` <br> ### 4.1.3 如何避免死锁 - JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁。 - 可重入锁的问题是:可能造成死锁。 - 解决死锁:如果多个地方需要获取相同的两个锁,那么这多个地方对两个锁的加锁顺序必须保持一致,否则就有可能持有对方的锁而造成死锁。 - 下面的示例中 `add` 方法和 `dec` 方法都需要获取 lockA 和 lockB,所以加锁的顺序必须一致。 ``` public void add(int m) { synchronized(lockA) { // 获得lockA的锁 this.value += m; synchronized(lockB) { // 获得lockB的锁 this.another += m; } // 释放lockB的锁 } // 释放lockA的锁 } public void dec(int m) { synchronized(lockA) { // 获得lockA的锁 this.value -= m; synchronized(lockB) { // 获得lockB的锁 this.another -= m; } // 释放lockB的锁 } // 释放lockA的锁 } ``` ### 4.1.4 使用wait和notify wait 把当前线程获取的锁释放,进入等待状态,notifyAl唤醒所有的正在待定锁的线程。 <br> - 如果获取到锁,但是正在等待其他资源,此时可以使用wait将锁释放,该线程进入了等待状态。这样其他等待该锁的线程就会争抢该锁,但是只有一个线程会争抢成功。 - 然后争抢成功的线程执行任务,执行完后,使用该锁的`notifyAll()`方法告知其他的所有等待该锁的线程可以争抢锁了(唤醒),就这循环下去。 ``` import java.util.ArrayList; import java.util.LinkedList; import java.util.Queue; public class Main { public static void main(String[] args) throws InterruptedException { var q = new TaskQueue(); var tal = new ArrayList<Thread>(); for (int i = 0; i < 5; i++) { var t = new Thread(() -> { while (true) { try { String s = q.getTask(); System.out.println("execute task: " + s); } catch (InterruptedException e) { return; } } }); t.start(); tal.add(t); } var add = new Thread(() -> { for (int i = 0; i < 10; i++) { String s = "t-" + Math.random(); System.out.println("add task: " + s); q.addTask(s); try { Thread.sleep(100); } catch (InterruptedException e) { } } }); add.start(); add.join(); Thread.sleep(100); for (var t : tal) { t.interrupt(); } } } class TaskQueue { Queue<String> queue = new LinkedList<>(); public synchronized void addTask(String s) { this.queue.add(s); this.notifyAll(); } public synchronized String getTask() throws InterruptedException { while (queue.isEmpty()) { this.wait(); // 释放 this 锁,线程进入等待状态 } return queue.remove(); } } ``` <br> ## 4.2 使用ReentrantLock锁 ### 4.2.1 基本用法 - ReentrantLock用于替代synchronized加锁,如果是synchronized,那么在 synchronized(lock) 时,就会一直等着,直到获取锁为止。但是ReentrantLock不一样,它可以尝试去获取锁,并设置一个获取锁的超时时间,超时后可以去干点别的事,然后在重新获取。 - 下面的示例只是为了表达ReentrantLock的用法 ``` import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Counter { private static final Lock lock = new ReentrantLock(); public static int count; public static void add(int n) { // 在此循环里面 1 秒钟之内没有获取到锁,就去干点别的事,然后在来获取锁 while (true) { try { if (lock.tryLock(1, TimeUnit.SECONDS)) { try { count += n; break; } finally { lock.unlock(); } } else { System.out.println("do other thing..."); } } catch (InterruptedException e) { } } } } ``` ### 4.2.2 使用Condition - synchronized可以配合wait和notify实现线程在条件不满足时等待,条件满足时唤醒。 - 使用ReentrantLock时,使用Condition对象来实现wait和notify的功能。 <br> - 使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。 - Condition提供的await()、signal()、signalAll()原理和synchronized锁对象的wait()、notify()、notifyAll()是一致的,并且其行为也是一样的。 ``` class TaskQueue { private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); private Queue<String> queue = new LinkedList<>(); public void addTask(String s) { lock.lock(); try { queue.add(s); condition.signalAll(); } finally { lock.unlock(); } } public String getTask() { lock.lock(); try { while (queue.isEmpty()) { condition.await(); } return queue.remove(); } finally { lock.unlock(); } } } ``` ## 4.3 使用ReadWriteLock ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。 - 场景:允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待。 - 使用ReadWriteLock可以解决这个问题,它保证: - 只允许一个线程写入(其他线程既不能写入也不能读取); - 没有写入时,多个线程允许同时读(提高性能)。 - 在没有使用 ReadWriteLock 时,多线程读取是要获取独占锁的,注意的是:读锁的目的不是读的数据是错的,是保证连续读逻辑上一致的,假设obj的x,y是[0,1],某个写线程修改成[2,3],你读到的要么是[0,1],要么是[2,3],但是没有锁,你读到的可能是[0,3] ``` import java.util.Arrays; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Counter { private final Lock lock = new ReentrantLock(); private int[] counts = new int[10]; public void inc(int index) { lock.lock(); try { counts[index] += 1; } finally { lock.unlock(); } } public int[] get() { lock.lock(); try { return Arrays.copyOf(counts, counts.length); } finally { lock.unlock(); } } } ``` - 添加读锁以后,写线程在没有写入的时候,所有的线程都是可以读取的。 ``` import java.util.Arrays; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; class Counter { private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock rLock = rwLock.readLock(); private final Lock wLock = rwLock.writeLock(); private int[] counts = new int[10]; public void inc(int index) { wLock.lock(); // 加写锁 try { counts[index] += 1; } finally { wLock.unlock(); // 释放写锁 } } public int[] get() { rLock.lock(); // 加读锁 try { return Arrays.copyOf(counts, counts.length); } finally { rLock.unlock(); // 释放读锁 } } } ``` ## 4.4 StampedLock 深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock - StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。 - 通过validate()去验证版本号的变化来确定是否获取悲观锁。 ``` import java.util.concurrent.locks.StampedLock; public class Point { private final StampedLock stampedLock = new StampedLock(); private double x; private double y; public void move(double deltaX, double deltaY) { long stamp = stampedLock.writeLock(); // 获取写锁 try { x += deltaX; y += deltaY; } finally { stampedLock.unlockWrite(stamp); // 释放写锁 } } public double distanceFromOrigin() { long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁 // 注意下面两行代码不是原子操作 // 假设x,y = (100,200) double currentX = x; // 此处已读取到x=100,但x,y可能被写线程修改为(300,400) double currentY = y; // 此处已读取到y,如果没有写入,读取是正确的(100,200) // 如果有写入,读取是错误的(100,400) if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生 stamp = stampedLock.readLock(); // 获取一个悲观读锁 try { currentX = x; currentY = y; } finally { stampedLock.unlockRead(stamp); // 释放悲观读锁 } } return Math.sqrt(currentX * currentX + currentY * currentY); } } ``` ## 4.5 使用Concurrent集合 使用Java标准库的java.util.concurrent包提供的线程安全的集合,使用这些并发集合与使用非线程安全的集合类完全相同。因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。 | interface | non\-thread\-safe | thread\-safe | |-----------|-------------------------|------------------------------------------| | List | ArrayList | CopyOnWriteArrayList | | Map | HashMap | ConcurrentHashMap | | Set | HashSet / TreeSet | CopyOnWriteArraySet | | Queue | ArrayDeque / LinkedList | ArrayBlockingQueue / LinkedBlockingQueue | | Deque | ArrayDeque / LinkedList | LinkedBlockingDeque | ## 4.6 使用Atomic Java的java.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic包 - 利用AtomicLong可以编写一个多线程安全的全局唯一ID生成器 ``` class IdGenerator { static AtomicLong var = new AtomicLong(0); public static long getNextId() { return var.incrementAndGet(); } } ``` # 五. 线程池使用 ## 5.1 创建线程池 ``` import java.util.concurrent.Executors; ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(task1); executor.submit(task2); executor.submit(task3); executor.submit(task4); executor.submit(task5); ``` ExecutorService只是接口,Java标准库提供的几个常用实现类有: - FixedThreadPool:线程数固定的线程池; - CachedThreadPool:线程数根据任务动态调整的线程池; - SingleThreadExecutor:仅单线程执行的线程池。 <br> 线程池的几个方法 - shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。 - shutdownNow()会立刻停止正在执行的任务。 - awaitTermination()则会等待指定的时间让线程池关闭。 ## 5.2 其他线程池 ### 5.2.1 ThreadPoolExecutor - 线程池的大小限制一定的范围之内。 ``` import java.util.concurrent.ThreadPoolExecutor; int min = 4; int max = 10; ExecutorService es = new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); ``` ### 5.2.2 ScheduledThreadPool - 定期执行的任务 - 注意FixedRate和FixedDelay的区别 - FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间 - 而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务 ``` import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ScheduledExecutorService ses = Executors.newScheduledThreadPool(4); // 1秒后执行一次性任务 ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS); // 2秒后开始执行定时任务,每3秒执行 ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS); // 2秒后开始执行定时任务,以3秒为间隔执行 ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS); } } class Task implements Runnable { private String name; public Task(String name) { this.name = name; } @Override public void run() { System.out.println("start task " + this.name); System.out.println("end task " + this.name); } } ``` - Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer。 # 六. Future使用 ## 6.1 Future 在线程中,Runnable接口有个问题,它的方法没有返回值。所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值: ``` ExecutorService executor = Executors.newFixedThreadPool(4); // 定义任务: Callable<String> task = new Task(); // 提交任务并获得Future: Future<String> future = executor.submit(task); // 从Future获取异步执行返回的结果,可能阻塞 String result = future.get(); class Task implements Callable<String> { public String call() throws Exception { return longTimeCalculation(); } } ``` 当提交一个Callable任务后,会同时获得一个Future对象,然后,在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有: - `get()`:获取结果(可能会等待) - `get(long timeout, TimeUnit unit)`:获取结果,但只等待指定的时间; - `cancel(boolean mayInterruptIfRunning)`:取消当前任务; - `isDone()`:判断任务是否已完成。 <br> ## 6.2 CompletableFuture 使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。 <br> CompletableFuture的优点是: - 异步任务结束时,会自动回调某个对象的方法; - 异步任务出错时,会自动回调某个对象的方法; - 主线程设置好回调后,不再关心异步任务的执行。 从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。 ``` public class Main { public static void main(String[] args) throws Exception { // 创建异步执行任务: CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice); // 如果执行成功: cf.thenAccept((result) -> { System.out.println("price: " + result); }); // 如果执行异常: cf.exceptionally((e) -> { e.printStackTrace(); return null; }); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: Thread.sleep(200); } static Double fetchPrice() { try { Thread.sleep(100); } catch (InterruptedException e) { } if (Math.random() < 0.3) { throw new RuntimeException("fetch price failed!"); } return 5 + Math.random() * 20; } } ``` # 七. 使用ForkJoin Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。 # 八. 使用ThreadLocal 这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。Java标准库提供了一个特殊的`ThreadLocal`,它可以在一个线程中传递同一个对象。 - ThreadLocal实例通常总是以静态字段初始化 ``` static ThreadLocal<User> threadLocalUser = new ThreadLocal<>(); // 设置 threadLocalUser.set(user); // 在当前线程内任何地方获取user实例 threadLocalUser.get(); ``` - 实际上,可以把ThreadLocal看成一个全局`Map<Thread, Object>:`每个线程获取ThreadLocal变量时,总是使用Thread自身作为key - 特别注意`ThreadLocal`一定要在`finally`中清除,这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal没有被清除,该线程执行其他代码时,会把上一次的状态带进去。 - 方法1 ``` try { threadLocalUser.set(user); ... } finally { threadLocalUser.remove(); } ``` - 方法2 - 为了保证能释放ThreadLocal关联的实例,我们可以通过AutoCloseable接口配合try (resource) {...}结构,让编译器自动为我们关闭。 ``` // 使用的时候会自动remove try (var ctx = new UserContext("Bob")) { // 可任意调用UserContext.currentUser(): String currentUser = UserContext.currentUser(); } // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象 // 定义实现AutoCloseable接口的上下文类 public class UserContext implements AutoCloseable { static final ThreadLocal<String> ctx = new ThreadLocal<>(); public UserContext(String user) { ctx.set(user); } public static String currentUser() { return ctx.get(); } @Override public void close() { ctx.remove(); } } ``` <br><br><br> - 参考资料 [廖雪峰的官方网站](https://www.liaoxuefeng.com/wiki/1252599548343744/1255943750561472) <br><br><br>
上一篇:
Java之Lambda表达式
下一篇:
JAVA中的编码算法与加密算法
0
赞
16 人读过
新浪微博
微信
腾讯微博
QQ空间
人人网
文档导航