Java并发编程深度解析:从理论到实践 🧵 并发编程核心要点
深入理解Java内存模型和happens-before原则 掌握synchronized、volatile、Lock等同步机制 熟练运用线程池和并发工具类 了解常见并发问题的原因和解决方案 学习高性能并发编程的最佳实践 🎯 为什么需要并发编程? 在现代计算机系统中,CPU通常拥有多个核心,为了充分利用硬件资源,提升程序性能,我们需要让程序能够同时执行多个任务。这就是并发编程的核心价值。
并发 vs 并行 graph LR
subgraph "并发(Concurrency)"
A[Task1] -.-> B[Task2]
B -.-> C[Task3]
C -.-> A
end
subgraph "并行(Parallelism)"
D[Task1]
E[Task2]
F[Task3]
end
A --> G[单核CPU时间片轮转]
D --> H[多核CPU同时执行] 🧠 Java内存模型 (JMM) 内存模型基础 graph TB
subgraph "JVM内存结构"
A[主内存 Main Memory]
subgraph "线程1"
B[工作内存1]
C[本地变量副本]
end
subgraph "线程2"
D[工作内存2]
E[本地变量副本]
end
B <--> A
D <--> A
C -.-> B
E -.-> D
end Happens-Before规则 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class HappensBeforeExample { private int x = 0 ; private volatile boolean flag = false ; public void writer () { x = 42 ; flag = true ; } public void reader () { if (flag) { int y = x; } } }
分析 :由于flag是volatile变量,操作2 happens-before 操作3,而操作1在操作2之前,操作4在操作3之后,所以操作1 happens-before 操作4。
🔐 同步机制详解 1. synchronized关键字 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class SynchronizedExample { private int count = 0 ; private final Object lock = new Object (); public synchronized void increment1 () { count++; } public void increment2 () { synchronized (this ) { count++; } } public void increment3 () { synchronized (lock) { count++; } } public static synchronized void staticMethod () { } }
synchronized底层实现 1 2 3 4 5 6 public void increment () { count++; }
2. volatile关键字 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class VolatileExample { private volatile boolean running = true ; private volatile int counter = 0 ; public void stop () { running = false ; } public void run () { while (running) { counter++; } } private volatile int volatileCount = 0 ; public void wrongIncrement () { volatileCount++; } private final AtomicInteger atomicCount = new AtomicInteger (0 ); public void correctIncrement () { atomicCount.incrementAndGet(); } }
3. Lock接口和ReentrantLock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class LockExample { private final ReentrantLock lock = new ReentrantLock (); private final Condition condition = lock.newCondition(); private int count = 0 ; public void increment () { lock.lock(); try { count++; condition.signalAll(); } finally { lock.unlock(); } } public void waitForCondition () throws InterruptedException { lock.lock(); try { while (count < 10 ) { condition.await(); } } finally { lock.unlock(); } } public void interruptibleLock () throws InterruptedException { if (lock.tryLock(5 , TimeUnit.SECONDS)) { try { } finally { lock.unlock(); } } else { throw new RuntimeException ("无法获取锁" ); } } }
ReentrantLock vs synchronized 特性 synchronized ReentrantLock 使用简便性 🟢 简单 🟡 需要手动管理 功能丰富性 🟡 基础功能 🟢 功能丰富 性能 🟢 JVM优化 🟡 相当 可中断性 ❌ 不支持 ✅ 支持 公平锁 ❌ 不支持 ✅ 支持 条件变量 ❌ 单一wait/notify ✅ 多个Condition
🧵 线程池详解 ThreadPoolExecutor参数解析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class ThreadPoolExample { public static void main (String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor ( 5 , 10 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(100 ), new ThreadPoolExecutor .CallerRunsPolicy() ); for (int i = 0 ; i < 20 ; i++) { final int taskNum = i; executor.submit(() -> { System.out.println("执行任务 " + taskNum + " - 线程: " + Thread.currentThread().getName()); try { Thread.sleep(2000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } executor.shutdown(); try { if (!executor.awaitTermination(60 , TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); } } }
线程池执行流程 flowchart TD
A[提交任务] --> B{核心线程池是否已满?}
B -->|否| C[创建核心线程执行]
B -->|是| D{任务队列是否已满?}
D -->|否| E[任务加入队列等待]
D -->|是| F{最大线程池是否已满?}
F -->|否| G[创建非核心线程执行]
F -->|是| H[执行拒绝策略]
C --> I[任务执行完成]
E --> I
G --> I 常用拒绝策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class RejectionPolicyExample { ThreadPoolExecutor executor1 = new ThreadPoolExecutor ( 2 , 4 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(2 ), new ThreadPoolExecutor .CallerRunsPolicy() ); ThreadPoolExecutor executor2 = new ThreadPoolExecutor ( 2 , 4 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(2 ), new ThreadPoolExecutor .AbortPolicy() ); ThreadPoolExecutor executor3 = new ThreadPoolExecutor ( 2 , 4 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(2 ), new ThreadPoolExecutor .DiscardPolicy() ); ThreadPoolExecutor executor4 = new ThreadPoolExecutor ( 2 , 4 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(2 ), new ThreadPoolExecutor .DiscardOldestPolicy() ); ThreadPoolExecutor executor5 = new ThreadPoolExecutor ( 2 , 4 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(2 ), (task, executor) -> { System.err.println("任务被拒绝: " + task.toString()); } ); }
🛠️ 并发工具类 1. CountDownLatch - 倒计时门闩 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class CountDownLatchExample { public static void main (String[] args) throws InterruptedException { int numTasks = 3 ; CountDownLatch latch = new CountDownLatch (numTasks); for (int i = 0 ; i < numTasks; i++) { final int taskId = i; new Thread (() -> { try { System.out.println("任务 " + taskId + " 开始执行" ); Thread.sleep(1000 + taskId * 500 ); System.out.println("任务 " + taskId + " 执行完成" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }).start(); } System.out.println("等待所有任务完成..." ); latch.await(); System.out.println("所有任务已完成!" ); } }
2. CyclicBarrier - 循环栅栏 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class CyclicBarrierExample { public static void main (String[] args) { int numThreads = 3 ; CyclicBarrier barrier = new CyclicBarrier (numThreads, () -> { System.out.println("所有线程都到达栅栏,开始执行汇总操作!" ); }); for (int i = 0 ; i < numThreads; i++) { final int threadId = i; new Thread (() -> { try { System.out.println("线程 " + threadId + " 完成第一阶段工作" ); Thread.sleep(1000 + threadId * 200 ); barrier.await(); System.out.println("线程 " + threadId + " 开始第二阶段工作" ); Thread.sleep(500 ); System.out.println("线程 " + threadId + " 完成第二阶段工作" ); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
3. Semaphore - 信号量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class SemaphoreExample { private static final Semaphore semaphore = new Semaphore (3 ); public static void main (String[] args) { for (int i = 0 ; i < 10 ; i++) { final int taskId = i; new Thread (() -> accessResource(taskId)).start(); } } private static void accessResource (int taskId) { try { System.out.println("任务 " + taskId + " 等待获取许可证..." ); semaphore.acquire(); System.out.println("任务 " + taskId + " 获得许可证,开始执行" ); Thread.sleep(2000 ); System.out.println("任务 " + taskId + " 执行完成,释放许可证" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { semaphore.release(); } } }
4. 原子类家族 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public class AtomicExample { private final AtomicInteger counter = new AtomicInteger (0 ); private final AtomicLong longCounter = new AtomicLong (0 ); private final AtomicBoolean flag = new AtomicBoolean (false ); private final AtomicIntegerArray array = new AtomicIntegerArray (10 ); private final AtomicReference<User> userRef = new AtomicReference <>(); public void basicAtomicOperations () { int oldValue = counter.get(); int newValue = counter.incrementAndGet(); boolean success = counter.compareAndSet(1 , 2 ); array.set(0 , 100 ); int arrayValue = array.getAndIncrement(0 ); counter.updateAndGet(current -> current * 2 ); counter.accumulateAndGet(10 , Integer::sum); } public void customAtomicOperation () { AtomicReference<Node> head = new AtomicReference <>(); Node newNode = new Node ("data" ); Node currentHead; do { currentHead = head.get(); newNode.next = currentHead; } while (!head.compareAndSet(currentHead, newNode)); } static class User { String name; int age; User(String name, int age) { this .name = name; this .age = age; } } static class Node { String data; Node next; Node(String data) { this .data = data; } } }
🚨 常见并发问题和解决方案 1. 死锁问题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class DeadlockExample { private final Object lock1 = new Object (); private final Object lock2 = new Object (); public void method1 () { synchronized (lock1) { System.out.println("方法1获得lock1" ); try { Thread.sleep(100 ); } catch (InterruptedException e) {} synchronized (lock2) { System.out.println("方法1获得lock2" ); } } } public void method2 () { synchronized (lock2) { System.out.println("方法2获得lock2" ); try { Thread.sleep(100 ); } catch (InterruptedException e) {} synchronized (lock1) { System.out.println("方法2获得lock1" ); } } } public void safeMethod1 () { synchronized (lock1) { synchronized (lock2) { } } } public void safeMethod2 () { synchronized (lock1) { synchronized (lock2) { } } } private final ReentrantLock reentrantLock1 = new ReentrantLock (); private final ReentrantLock reentrantLock2 = new ReentrantLock (); public void timeoutMethod () { boolean lock1Acquired = false ; boolean lock2Acquired = false ; try { lock1Acquired = reentrantLock1.tryLock(1 , TimeUnit.SECONDS); if (lock1Acquired) { lock2Acquired = reentrantLock2.tryLock(1 , TimeUnit.SECONDS); if (lock2Acquired) { } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { if (lock2Acquired) reentrantLock2.unlock(); if (lock1Acquired) reentrantLock1.unlock(); } } }
2. 生产者-消费者问题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class ProducerConsumerExample { private final BlockingQueue<String> queue = new ArrayBlockingQueue <>(10 ); private final AtomicBoolean running = new AtomicBoolean (true ); class Producer implements Runnable { @Override public void run () { int count = 0 ; while (running.get()) { try { String item = "Item-" + count++; queue.put(item); System.out.println("生产: " + item); Thread.sleep(100 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } } } class Consumer implements Runnable { private final String name; Consumer(String name) { this .name = name; } @Override public void run () { while (running.get() || !queue.isEmpty()) { try { String item = queue.poll(1 , TimeUnit.SECONDS); if (item != null ) { System.out.println(name + " 消费: " + item); Thread.sleep(150 ); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } } } public void start () { new Thread (new Producer ()).start(); new Thread (new Consumer ("消费者1" )).start(); new Thread (new Consumer ("消费者2" )).start(); new Thread (() -> { try { Thread.sleep(10000 ); running.set(false ); System.out.println("停止生产消费" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); } }
3. 缓存一致性问题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public class CacheConsistencyExample { private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap <>(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock (); public String getValue (String key) { String value = cache.get(key); if (value == null ) { value = expensiveOperation(key); cache.put(key, value); } return value; } public String getValueWithRWLock (String key) { rwLock.readLock().lock(); try { String value = cache.get(key); if (value != null ) { return value; } } finally { rwLock.readLock().unlock(); } rwLock.writeLock().lock(); try { String value = cache.get(key); if (value == null ) { value = expensiveOperation(key); cache.put(key, value); } return value; } finally { rwLock.writeLock().unlock(); } } public String getValueWithPutIfAbsent (String key) { return cache.computeIfAbsent(key, this ::expensiveOperation); } private final ConcurrentHashMap<String, Future<String>> futureCache = new ConcurrentHashMap <>(); public String getValueWithFuture (String key) throws ExecutionException, InterruptedException { Future<String> future = futureCache.get(key); if (future == null ) { Callable<String> callable = () -> expensiveOperation(key); FutureTask<String> futureTask = new FutureTask <>(callable); future = futureCache.putIfAbsent(key, futureTask); if (future == null ) { future = futureTask; futureTask.run(); } } return future.get(); } private String expensiveOperation (String key) { try { Thread.sleep(1000 ); return "Value for " + key; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException (e); } } }
🎯 性能优化最佳实践 1. 减少锁的粒度 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class LockGranularityExample { private final Object lock = new Object (); private int counter1 = 0 ; private int counter2 = 0 ; public void coarseGrainedLock () { synchronized (lock) { counter1++; counter2++; } } private final Object lock1 = new Object (); private final Object lock2 = new Object (); public void fineGrainedLock1 () { synchronized (lock1) { counter1++; } } public void fineGrainedLock2 () { synchronized (lock2) { counter2++; } } }
2. 使用无锁数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class LockFreeExample { private final ConcurrentLinkedQueue<String> lockFreeQueue = new ConcurrentLinkedQueue <>(); private final ConcurrentHashMap<String, String> lockFreeMap = new ConcurrentHashMap <>(); private final AtomicLong lockFreeCounter = new AtomicLong (0 ); public void lockFreeOperations () { lockFreeQueue.offer("item" ); String item = lockFreeQueue.poll(); lockFreeMap.put("key" , "value" ); String value = lockFreeMap.get("key" ); long count = lockFreeCounter.incrementAndGet(); } }
3. 合理使用ThreadLocal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class ThreadLocalExample { private static final ThreadLocal<UserSession> userSessionThreadLocal = new ThreadLocal <>(); private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" )); public void setUserSession (UserSession session) { userSessionThreadLocal.set(session); } public UserSession getCurrentUser () { return userSessionThreadLocal.get(); } public String formatDate (Date date) { return dateFormatThreadLocal.get().format(date); } public void cleanup () { userSessionThreadLocal.remove(); dateFormatThreadLocal.remove(); } static class UserSession { String userId; String userName; } }
🔍 并发调试技巧 1. 线程转储分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class ThreadDumpExample { public static void printThreadDump () { ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); ThreadInfo[] threadInfos = threadBean.dumpAllThreads(true , true ); for (ThreadInfo threadInfo : threadInfos) { System.out.println("线程名: " + threadInfo.getThreadName()); System.out.println("线程状态: " + threadInfo.getThreadState()); if (threadInfo.getLockName() != null ) { System.out.println("等待锁: " + threadInfo.getLockName()); } StackTraceElement[] stackTrace = threadInfo.getStackTrace(); for (StackTraceElement element : stackTrace) { System.out.println("\t" + element); } System.out.println(); } } }
2. 并发测试工具 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class ConcurrencyTestExample { private final AtomicInteger counter = new AtomicInteger (0 ); @Test public void testConcurrentIncrement () throws InterruptedException { int threadCount = 100 ; int operationsPerThread = 1000 ; CountDownLatch latch = new CountDownLatch (threadCount); for (int i = 0 ; i < threadCount; i++) { new Thread (() -> { try { for (int j = 0 ; j < operationsPerThread; j++) { counter.incrementAndGet(); } } finally { latch.countDown(); } }).start(); } latch.await(); int expected = threadCount * operationsPerThread; assertEquals(expected, counter.get()); } }
🚀 总结与最佳实践 并发编程金律 优先使用并发工具类 :ConcurrentHashMap > synchronized HashMap减少锁的范围 :只锁必要的代码块避免嵌套锁 :防止死锁发生使用不可变对象 :天然线程安全合理使用线程池 :避免频繁创建销毁线程性能优化checklist ✅ 使用合适的并发工具类 ✅ 减少锁的竞争和粒度 ✅ 避免不必要的同步 ✅ 合理设置线程池参数 ✅ 及时释放资源(如ThreadLocal) 常见误区 ❌ 过度使用synchronized ❌ 忽略volatile的非原子性 ❌ ThreadLocal内存泄漏 ❌ 不合理的线程池配置 💡 记住 :并发编程是一门艺术,需要在正确性、性能和复杂度之间找到平衡点。
📖 延伸阅读 如果这篇文章对你有帮助,欢迎点赞分享! ⭐