java并发中的同步器使用方式
目录
- 同步器
- 1、Semaphore(信号量)
- Semaphore 基本用法
- Semaphore 适用场景
- 2、CountDownLatch
- CountDownLatch 基本用法
- CountDownLatch 适用场景
- 3、CyclicBarrier
- CyclicBarrier 基本用法
- CyclicBarrier 计数器重置用法
- CyclicBarrier 适用场景
- 4、Phaser
- Phaser 基本用法
- Phaser 适用场景
- 5、ReentrantLock
- ReentrantLock 基本用法
- ReentrantLock 中断获取锁用法
- ReentrantLock 适用场景:
- 6、ReadwriteLock
- ReadWriteLock 基本用法
- ReadWriteLock 适用场景
- 7、 Condition
- Condition 基本用法
- Condition 实现阻塞队列
- 8、blockingQueue
- BlockingQueue 基本用法
- 9、BlockingDeque
- BlockingDeque基本用法
- 10、LockSupport
- LockSupport基本用法
- 11、Exchanger
- Exchanger基本用法
- 总结
同步器
Java 并发包中的同步器是一些用于协调多个线程执行的工具,用于实现线程之间的同步和互斥操作。这些同步器提供了不同的机制来控制线程的访问和执行顺序,以实现线程安全和并发控制。
1、Semaphore(信号量)
- Semaphore 是 Java 并发包中的同步器之一,用于控制对临界区资源的访问数量。它允许多个线程同时访问临界区资源,但限制了同一时间内可以访问资源的线程数量。
- Semaphore 维护一个许可证计数,线程可以获取和释放这些许可证。当许可证数量为零时,线程需要等待,直到其他线程释放许可证。
Semaphore 基本用法
import java.util.concurrent.Semaphore; public class SemaphoreExample { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); // 初始化信号量,允许同时访问的线程数量为3 // 创建多个线程来模拟访问临界区资源 for (int i = 1; i <= 5; i++) { int threadId = i; Thread thread = new Thread(() -> { try { semaphore.acquire(); // 获取许可证,如果没有许可证则阻塞 System.out.println("Thread " + threadId + " acquired a permit and is Accessing the resource."); Thread.sleep(2000); // 模拟访问临界区资源的耗时操作 } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); // 释放许可证 System.out.println("Thread " + threadId + " released the permit."); } }); thread.start(); } } }
运行结果:
Thread 1 acquired a permit and is accessing the resource.
Thread 3 acquired a permit and is accessing the resource.Thread 2 acquired a permit and is accessing the resource.Thread 2 released the permit.Thread 4 acquired a permit and is accessing the resource.Thread 5 acquired a permit and is accessing the resource.Thread 3 released the permit.Thread 1 released the permit.Thread 4 released the permit.Thread 5 released the permit.
在上述示例中,我们创建了一个 Semaphore 实例,并初始化许可证数量为 3。然后创建了多个线程,每个线程在获取许可证后访问临界区资源,模拟耗时操作后释放许可证。由于许可证数量有限,只有一部分线程能够同时访问资源,其他线程需要等待。
Semaphore 适用场景
- 有限资源的并发访问,如数据库连接池、线程池等。
- 控制对某个资源的同时访问数量,以避免资源竞争和过度消耗。
2、CountDownLatch
CountDownLatch 是 Java 并发包中的同步器之一,用于实现一种等待机制,允许一个或多个线程等待其他线程完成一组操作后再继续执行。
它通过维护一个计数器来实现等待和通知的机制。
在创建 CountDownLatch 时,需要指定初始计数值,每次调用 countDown() 方法会减少计数值,当计数值达到零时,等待的线程会被唤醒继续执行。
CountDownLatch 基本用法
import java.util.concurrent.CountDownLatch; public class CountDownLatchExample { public static void main(String[] args) { int numberOfTasks = 3; CountDownLatch latch = new CountDownLatch(numberOfTasks); // 创建多个线程来模拟完成任务 for (int i = 1; i <= numberOfTasks; i++) { int taskId = i; Thread thread = new Thread(() -> { try { System.out.println("Task " + taskId + " is executing..."); Thread.sleep(2000); // 模拟任务执行耗时 System.out.println("Task " + taskId + " is completed."); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); // 完成任务后减少计数 } }); thread.start(); } try { System.out.println("Main thread is waiting for tasks to complete..."); latch.await(); // 等待所有任务完成 System.out.println("All tasks are completed. Main thread continues."); } catch (InterruptedException e) { e.printStackTrace(); } } }
运行结果:
Task 1 is executing...
Main thread is waiting for tasks to complete...Task 2 is executing...Task 3 is executing...Task 1 is completed.Task 2 is completed.Task 3 is completed.All tasks are completed. Main thread continues.
在上述示例中,我们创建了一个 CountDownLatch 实例,并初始化计数值为 3。然后创建了多个线程来模拟完成任务,每个线程执行完任务后调用 countDown() 方法减少计数。主线程在执行 latch.await() 时等待计数值为零,等待所有任务完成后继续执行。
使用 CountDownLatch 可以实现多个线程之间的协调,确保某些操作在其他操作完成后再继续执行。
CountDownLatch 适用场景
- 主线程等待多个子线程完成任务后再继续执行。
- 等待多个线程完成初始化工作后再开始并行操作。
3、CyclicBarrier
CyclicBarrier 是 Java 并发包中的同步器之一,用于实现一组线程在达到一个共同点之前等待彼此,并在达到共同点后继续执行。它可以被重置并重新使用,适用于需要多个线程协同工作的场景。
CyclicBarrier 维护一个计数器和一个栅栏动作(barrier action)。当线程调用 await() 方法时,计数器减少,当计数器达到零时,所有等待的线程会被唤醒并继续执行,同时会执行栅栏动作。计数器可以被重置,并且可以设置栅栏动作,在达到共同点后执行。
CyclicBarrier 基本用法
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { public static void main(String[] args) { int numberOfThreads = 3; Runnable barrierAction = () -> System.out.println("All threads reached the barrier!"); CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, barrierAction); // 创建多个线程来模拟并行执行任务 for (int i = 1; i <= numberOfThreads; i++) { int threadId = i; Thread thread = new Thread(() -> { try { System.out.println("Thread " + threadId + " is performing its task."); Thread.sleep(2000); // 模拟任务执行耗时 System.out.println("Thread " + threadId + " has reached the barrier."); barrier.await(); // 等待其他线程达到栅栏点 System.out.println("Thread " + threadId + " continues after the barrier."); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); thread.start(); } } }
运行结果:
Thread 1 is performing its task.
Thread 3 is performing its task.Thread 2 is performing its task.Thread 2 has reached the barrier.Thread 3 has reached the barrier.Thread 1 has reached the barrier.All threads reached the barrier!Thread 1 continues after the barrier.Thread 2 continues after the barrier.Thread 3 continues after the barrier.
在上述示例中,我们创建了一个 CyclicBarrier 实例,初始化等待的线程数量为 3,并设置了栅栏动作。
然后创建多个线程,每个线程模拟执行任务后等待其他线程达到栅栏点,当所有线程都达到栅栏点时,栅栏动作会被执行。
使用 CyclicBarrier 可以实现多线程协同工作的场景,确保所有线程在某个共同点之前等待彼此,并在达到共同点后继续执行。
CyclicBarrier 计数器重置用法
package com.lf.java.basic.concurrent; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class MultipleCyclicBarrierExample { public static void main(String[] args) { int numberOfThreads = 3; int numberOfRounds = 3; Runnable barrierAction = () -> System.out.println("All threads reached the barrier!");php for (int round = 1; round <= numberOfRounds; round++) { CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, barrierAction); System.out.println("Round " + round + ": Starting tasks"); // 创建多个线程来模拟并行执行任务 for (int i = 1; i <= numberOfThreads; i++) { int threadId = i; int finalRound = round; Thread thread = new Thread(() -> { try { System.out.println("Thread " + threadId + " is performing its task for Round " + finalRound); Thread.sleep(2000); // 模拟任务执行耗时 System.out.println("Thread " + threadId + " has reached the barrier for Round " + finalRound); barrier.await(); // 等待其他线程达到栅栏点 System.out.println("Thread " + threadId + " continues after the barrier for Round " + finalRound); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); thread.start(); } // 等待所有线程完成当前轮次的任务 try { Thread.sleep(3000); // 等待一段时间以观察效果 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Round " + round + ": All tasks completed\n"); // 让当前轮次的所有线程都离开栅栏点,以便重新使用 barrier.reset(); } } }
运行结果:
Round 1: Starting tasks
Thread 1 is performing its task for Round 1Thread 2 is performing its task for Round 1Thread 3 is performing its task for Round 1Thread 3 has reached the barrier for Round 1Thread 2 has reached the barrier for Round 1Thread 1 has reached the barrier for Round 1All threads reached the barrier!Thread 2 continues after the barrier for Round 1Thread 1 continues after the barrier for Round 1Thread 3 continues after the barrier for Round 1Round 1: All tasks completedRound 2: Starting tasks
Thread 1 is performing its task for Round 2Thread 2 is performing its task for Round 2Thread 3 is performing its task for Round 2Thread 3 has reached the barrier for Round 2Thread 2 has reached the barrier for Round 2Thread 1 has reached the barrier for Round 2All threads reached the barrier!Thread 1 continues after the barrier for Round 2Thread 3 continues after the barrier for Round 2Thread 2 continues after the barrier for Round 2Round 2: All tasks completedRound 3: Starting tasks
Thread 1 is performing its task for Round 3Thread 2 is performing its task for Round 3Thread 3 is performing its task for Round 3Thread 1 has reached the barrier for Round 3Thread 2 has reached the barrier for Round 3Thread 3 has reached the barrier for Round 3All threads reached the barrier!Thread 3 continues after the barrier for Round 3Thread 1 continues after the barrier for Round 3Thread 2 continues after the barrier for Round 3Round 3: All tasks completed
在上述示例中,我们模拟了多轮任务协同。每一轮都创建一个新的 CyclicBarrier 实例,用于协调线程的等待和通知。在每一轮的任务完成后,我们使用 barrier.reset() 来重置计数器,以便进行下一轮的任务协同。
运行这个示例可以看到多轮任务协同的效果,每一轮的任务都会等待所有线程完成后再继续,然后重置计数器以准备下一轮。
CyclicBarrier 适用场景
- 将多个线程分成阶段进行,每个阶段需要等待其他线程完成后再继续。
- 并行计算中的分治操作,等待所有线程完成分治任务后进行合并计算。
4、Phaser
Phaser 是 Java 并发包中的同步器之一,它提供了更灵活的多阶段线程协调机制,适用于需要分阶段进行多个任务的并行执行和协调的场景。Phaser 可以用于更复杂的同步需求,例如循环的多阶段任务协同。
Phaser 维护了一个计数器和多个阶段(phase)。在每个阶段,线程可以注册、等待和注销,以及在某个阶段到达时执行特定的操作。
Phaser 基本用法
import java.util.concurrent.Phaser; public class PhaserExample { public static void main(String[] args) { int numberOfThreads = 3; int numberOfPhases = 3; Phaser phaser = new Phaser(numberOfThreads) { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("Phase " + phase + " completed."); return phase == numberOfPhases - 1 || registeredParties == 0; } }; // 创建多个线程来模拟并行执行任务 for (int i = 0; i < numberOfThreads; i++) { int threadId = i; Thread thread = new Thread(() -> { for (int phase = 0; phase < numberOfPhases; phase++) { System.out.println("Thread " + threadId + " is in Phase " + phase); phaser.arriveAndAwaitAdvance(); // 等待其他线程到达当前阶段 } }); thread.start(); } } }
运行结果:
Thread 0 is in Phase 0
Thread 1 is in Phase 0Thread 2 is in Phase 0Phase 0 completed.Thread 2 is in Phase 1Thread 1 is in Phase 1Thread 0 is in Phase 1Phase 1 completed.Thread 1 is in Phase 2Thread 2 is in Phase 2Thread 0 is in Phase 2Phase 2 completed.
在上述示例中,我们创建了一个 Phaser 实例,设置初始注册线程数量为 3。然后,我们创建多个线程来模拟并行执行任务,每个线程都会在每个阶段调用 phaser.arriveAndAwaitAdvance() 等待其他线程到达当前阶段。当所有线程都到达后,onAdvance() 方法会被调用,用于执行阶段结束后的操作。
Phaser 提供了更灵活的多阶段协同机制,适用于需要多个阶段的任务协同和并行执行的场景。它还支持动态添加或删除等待线程,使其更适用于动态变化的并发需求。
Phaser 适用场景
- 需要分阶段执行的任务,每个阶段可以有不同的线程数。
- 需要动态添加或删除等待线程的场景。
5、ReentrantLock
ReentrantLock 是 Java 并发包中的同步器之一,它是一个可重入的互斥锁,提供了与 synchronized 关键字相似的功能,但更为灵活。
与 synchronized 不同,ReentrantLock 具有更多的控制选项和功能,例如公平性、可中断性、超时等待等。
ReentrantLock 基本用法
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockExample { public static void main(String[] args) { Lock lock = new ReentrantLock(); // 创建多个线程来模拟使用锁 for (int i = 1; i <= 5; i++) { int threadId = i; Thread thread = new Thread(() -> { try { lock.lock(); // 获取锁 System.out.println("Thread " + threadId + " acquired the lock."); Thread.sleep(2000); // 模拟临界区操作耗时 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); // 释放锁 System.out.println("Thread " + threadId + " released the lock."); } }); thread.start(); } } }
运行结果:
Thread 1 acquired the lock.
Thread 1 released the lock.Thread 2 acquired the lock.Thread 2 released the lock.Thread 3 acquired the lock.Thread 3 released the lock.Thread 4 acquired the lock.Thread 4 released the lock.Thread 5 acquired the lock.Thread 5 released the lock.
在上述示例中,我们创建了一个 ReentrantLock 实例,并在多个线程中使用它来模拟对共享资源的访问。每个线程在访问资源前调用 lock.lock() 来获取锁,访问资源后调用 lock.unlock() 来释放锁。
需要注意的是,为了避免死锁,应该在 finally 块中释放锁,以确保无论是否发生异常,锁都会被释放。
ReentrantLock 还提供了其他方法,如 tryLock()(尝试获取锁,如果锁可用则获取,否则返回 false)、lockInterruptibly()(可中断的获取锁,可响应线程中断)等,使其更加灵活和强大。
ReentrantLock 中断获取锁用法
package com.lf.java.basic.concurrent; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class InterruptibleLockExample { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); // 创建线程尝试获取锁 Thread thread = new Thread(() -> { try { lock.lockInterruptibly(); // 可中断获取锁 System.out.println("Thread acquired the lock."); Thread.sleep(5000); // 模拟临界区操作耗时 } catch (InterruptedException e) { //中断唤醒线程 System.out.println("Thread interrupted while waiting for the lock."); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); // 释放锁 System.out.println("Thread released the lock."); } } }); // 启动线程 thread.start(); // 主线程等待一段时间后尝试中断线程 try { Thread.sleep(2000); System.out.println("Thread interrupt before"); thread.interrupt(); // 中断线程的等待 System.out.println("Thread interrupt after"); } catch (InterruptedException e) { System.out.println("InterruptedException catch"); e.printStackTrace(); } } }
运行结果:
Thread acquired the lock.
Thread interrupt beforeThread interrupt afterThread interrupted while waiting for the lock.Thread released the lock.
在上述示例中,创建了一个线程尝试获取锁,但是主线程在启动线程后等待了一段时间后中断了该线程的等待。
由于我们使用了 lock.lockInterruptibly() 来获取锁,线程在等待锁的过程中可以响应中断,一旦被中断,它会抛出 InterruptedException,从而可以捕获中断事件并做出相应处理。
ReentrantLock 适用场景:
- 需要更精细的同步控制,例如在某些情况下需要手动释放锁。
- 需要可中断或超时等待的线程。
6、ReadWriteLock
ReadWriteLock 是 Java 并发包中的同步器之一,用于实现读写分离的锁机制,提供了更高效的并发访问控制。
它允许多个线程同时读取共享资源,但在写入资源时只允许一个线程进行,从而提高了并发性能。
ReadWriteLock 包含两种锁:读锁和写锁。
- 读锁(ReadLock):多个线程可以同时获取读锁,只要没有线程持有写锁。在没有写锁的情况下,多个线程可以并发读取共享资源,从而提高并发性能。
- 写锁(Write Lock):写锁是独占的,只有一个线程可以持有写锁。在一个线程持有写锁时,其他线程无法获取读锁或写锁,从而确保对共享资源的写操作是互斥的。
ReadWriteLock 基本用法
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockExample { public static void main(String[] args) { ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 创建多个读线程 for (int i = 1; i <= 5; i++) { int threadId = i; Thread thread = new Thread(() -> { readWriteLock.readLock().lock(); // 获取读锁 try { System.out.println("Thread " + threadId + " is reading."); Thread.sleep(2000); // 模拟读取操作 } catch (InterruptedException e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); // 释放读锁 } }); thread.start(); } // 创建一个写线程 Thread writeThread = new Thread(() -> { readWriteLock.writeLock().lock(); // 获取写锁 try { System.out.println("Write thread is writing."); python Thread.sleep(2000); // 模拟写入操作 } catch (InterruptedException e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); // 释放写锁 } }); writeThread.start(); } }
运行结果:
Thread 1 is reading.
Thread 2 is reading.Thread 4 is reading.Thread 3 is reading.Thread 5 is reading.Write thread is writing
在上述示例中,我们创建了一个 ReadWriteLock 实例,然后创建多个读线程和一个写线程来模拟读写操作。
读线程在执行时调用 readWriteLock.readLock().lock() 来获取读锁,写线程在执行时调用 readWriteLock.writeLock().lock() 来获取写锁。
使用 ReadWriteLock 可以提高对共享资源的并发访问性能,适用于读操作频繁,写操作较少的场景。
ReadWriteLock 适用场景
- 读操作频繁,写操作较少的情况,以提高并发性能。
- 允许多个线程同时读取资源,但在写入资源时确保互斥。
7、 Condition
Condition 是 Java 并发包中的同步器之一,它提供了更灵活的线程等待和通知机制,用于在多线程环境下实现精细的线程协调。
Condition 是与 Lock 结合使用的,它可以替代传统的 wait() 和 notify() 方法,提供更多的控制选项和功能。
通过 Condition,我们可以实现更精确的等待和通知,以及更灵活的线程唤醒机制。
Condition 基本用法
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionExample { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); // 创建一个等待线程 Thread waitingThread = new Thread(() -> { lock.lock(); try { System.out.println("Waiting thread is waiting..."); condition.await(); // 等待条件满足 System.out.println("Waiting thread is awake."); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); // 创建一个唤醒线程 Thread signalingThread = new Thread(() -> { lock.lock(); try { Thread.sleep(2000); // 模拟等待一段时间 System.out.println("Signaling thread is awake."); condition.signal(); // 唤醒等待线程 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); // 启动线程 waitingThread.start(); signalingThread.start(); } }
运行结果:
Waiting thread is waiting...
Signaling thread is awake.Waiting thread is awake.
在上述示例中,我们创建了一个 ReentrantLock 实例和一个与之关联的 Condition,然后创建了一个等待线程和一个唤醒线程。
等待线程在调用 condition.await() 后进入等待状态,直到唤醒线程调用 condition.signal() 来唤醒它。
通过使用 Condition,我们可以更加精确地控制线程的等待和通知,使线程协调更加灵活。
Condition 实现阻塞队列
import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BlockingQueueWithCondition<T> { private final Queue<T> queue = new LinkedList<>(); private final int capacity; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final phpCondition notEmpty = lock.newCondition(); public BlockingQueueWithCondition(int capacity) { this.capacity = capacity; } public void put(T item) throws InterruptedException { lock.lock(); try { while (queue.size() == capacity) { notFull.await(); } queue.offer(item); notEmpty.signal(); } finally { lock.unlock(); } } public T take() throws InterruptedException { lock.lock(); try { while (queue.isEmpty()) { notEmpty.await(); } T item = queue.poll(); notFull.signal(); return item; } finally { lock.unlock(); } } public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public static void main(String[] args) { BlockingQueueWithCondition<Integer> queue = new BlockingQueueWithCondition<>(5); Thread producerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { queue.put(i); System.out.println("Produced: " + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = queue.take(); System.out.println("Consumed: " + item); Thread.sleep(1500); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); } }
运行结果:
Produced: 1
Consumed: 1Produced: 2Consumed: 2Produced: 3Consumed: 3Produced: 4Produced: 5Consumed: 4Produced: 6Consumed: 5Produced: 7Produced: 8Consumed: 6Produced: 9Consumed: 7Produced: 10Consumed: 8Consumed: 9Consumed: 10
在上述示例中,我们使用 Condition 来实现了一个阻塞队列,其中 put() 方法用于向队列中放入元素,take() 方法用于从队列中取出元素。
当队列满时,生产者线程会等待 notFull 条件,当队列为空时,消费者线程会等待 notEmpty 条件。
这个示例展示了如何使用 Condition 来实现线程http://www.devze.com之间的协调,以及如何实现一个简单的阻塞队列。注意,这个示例并没有处理所有的边界情况和异常情况,实际使用时需要考虑更多细节。
8、BlockingQueue
BlockingQueue 是 Java 并发包中的一个接口,它提供了一种线程安全的队列实现,用于在多线程环境下进行数据的生产和消费。
BlockingQueue 支持阻塞操作,当队列满或空时,线程会被阻塞,直到条件满足。
BlockingQueue 提供了多种实现,包括:
- ArrayBlockingQueue:基于数组的有界阻塞队列。
- LinkedBlockingQueue:基于链表的可选有界阻塞队列。
- PriorityBlockingQueue:基于优先级的无界阻塞队列。
- DelayQueue:基于延迟时间的无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列,用于直接传递数据。
- LinkedTransferQueue:基于链表的无界阻塞队列,结合了 LinkedBlockingQueue 和SynchronousQueue 特性。
- LinkedBlockingDeque:基于链表的双端阻塞队列。
BlockingQueue 基本用法
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueExample { public static void main(String[] args) { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); Thread producerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { queue.put(i); System.out.println("Produced: " + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = queue.take(); System.out.println("Consumed: " + item); Thread.sleep(1500); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); } }
运行结果:
Consumed: 1
Produced: 1Produced: 2Consumed: 2Produced: 3Consumed: 3Produced: 4Produced: 5Consumed: 4Produced: 6Consumed: 5Produced: 7Produced: 8Consumed: 6Produced: 9Consumed: 7Produced: 10Consumed: 8Consumed: 9Consumed: 10
在上述示例中,我们使用了 ArrayBlockingQueue 来实现阻塞队列,其中生产者线程使用 put() 方法向队列中放入元素,消费者线程使用 take() 方法从队列中取出元素。
当队列满或空时,线程会被阻塞,直到条件满足。
BlockingQueue 是实现线程安全的生产者-消费者模式的常用工具,它简化了线程之间的协调和通信。
9、BlockingDeque
BlockingDeque(阻塞双端队列)是 Java 并发包中的一个接口,它是 BlockingQueue 接口的扩展,提供了双端队列的功能,并支持阻塞操作。BlockingDeque 可以在队列的两端插入和删除元素,同时支持阻塞操作,使得在多线程环境下更容易实现数据的生产和消费。
BlockingDeque 接口的实现类包括:
- LinkedBlockingDeque:基于链表的阻塞双端队列,可选有界或无界。
- LinkedBlockingDeque:基于链表的双端阻塞队列编程,无界。
BlockingDeque基本用法
import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; public class BlockingDequeExample { public static void main(String[] args) { BlockingDeque<Integer> deque = new LinkedBlockingDeque<>(5); Thread producerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { deque.put(i); System.out.println("Produced: " + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = deque.take(); System.out.println("Consumed: " + item); Thread.sleep(1500); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); } }
运行结果:
Produced: 1
Consumed: 1Produced: 2Consumed: 2Produced: 3Consumed: 3Produced: 4Produced: 5Consumed: 4Produced: 6Consumed: 5Produced: 7Produced: 8Consumed: 6Produced: 9Consumed: 7Produced: 10Consumed: 8Consumed: 9Consumed: 10
在上述示例中,我们使用了 LinkedBlockingDeque 来实现阻塞双端队列,生产者线程使用 put() 方法向队列中放入元素,消费者线程使用 take() 方法从队列中取出元素。
与 BlockingQueue 类似,当队列满或空时,线程会被阻塞,直到条件满足。
BlockingDeque 可以更灵活地实现在队列两端插入和删除元素,适用于更多种类的场景,例如双向数据传输和窗口滑动等。
以下是一些常用的在队列两端插入和删除元素的方法:
- 在队列头部插入元素:
void addFirst(E e): 将元素添加到队列的头部,如果队列已满,则抛出异常。
boolean offerFirst(E e): 将元素添加到队列的头部,如果队列已满,则返回 false。
void putFirst(E e): 将元素添加到队列的头部,如果队列已满,则阻塞等待直到有空间。
- 在队列尾部插入元素:
void addLast(E e):将元素添加到队列的尾部,如果队列已满,则抛出异常。
boolean offerLast(E e):将元素添加到队列的尾部,如果队列已满,则返回 false。
void putLast(E e):将元素添加到队列的尾部,如果队列已满,则阻塞等待直到有空间。
- 从队列头部删除元素:
E removeFirst(): 移除并返回队列头部的元素,如果队列为空,则抛出异常。
E pollFirst(): 移除并返回队列头部的元素,如果队列为空,则返回 null。
E takeFirst(): 移除并返回队列头部的元素,如果队列为空,则阻塞等待直到有元素。
- 从队列尾部删除元素:
E removeLast():移除并返回队列尾部的元素,如果队列为空,则抛出异常。
E pollLast(): 移除并返回队列尾部的元素,如果队列为空,则返回 null。
E takeLast(): 移除并返回队列尾部的元素,如果队列为空,则阻塞等待直到有元素。
这些方法使得你可以在双端队列的头部和尾部执行插入和删除操作,根据具体的需求选择合适的方法来实现线程安全的双端队列操作。
10、LockSupport
LockSupport 是 Java 并发包中提供的工具类,用于线程的阻塞和唤醒操作。
它提供了一种基于许可(permit)的方式来控制线程的阻塞和唤醒,相对于传统的 wait() 和 notify() 方法,LockSupport 更加灵活和可靠。
主要的方法包括:
- void park():阻塞当前线程,直到获得许可。
- void park(Object blocker):阻塞当前线程,并将 blocker 关联到当前线程,用于监控和诊断工具。
- void parkNanos(long nanos):阻塞当前线程,最多等待指定的纳秒数,直到获得许可。
- void parkNanos(Object blocker, long nanos):阻塞当前线程,并将 blocker关联到当前线程,最多等待指定的纳秒数。
- void parkUntil(long deadline):阻塞当前线程,直到指定的时间戳,直到获得许可。
- void parkUntil(Object blocker, long deadline):阻塞当前线程,并将 blocker关联到当前线程,直到指定的时间戳。
- void unpark(Thread thread):唤醒指定的线程,如果线程被阻塞,则解除阻塞状态。
LockSupport基本用法
import java.util.concurrent.locks.LockSupport; public class LockSupportExample { public static void main(String[] args) { Thread thread = new Thread(() -> { System.out.println("Thread is going to be parked."); LockSupport.park(); // 阻塞当前线程 System.out.println("Thread is unparked."); }); thread.start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Main thread is unparking the parked thread."); LockSupport.unpark(thread); // 唤醒被阻塞的线程 } }
运行结果:
Thread is going to be parked.
Main thread is unparking the parked thread.Thread is unparked.
在上述示例中,我们创建了一个新线程,调用了 LockSupport.park() 来阻塞该线程。
然后,主线程等待 2 秒后,调用了 LockSupport.unpark(thread) 来唤醒被阻塞的线程。与传统的 wait() 和 notify() 方法不同,LockSupport 是基于许可的,不需要获取某个特定对象的锁来进行阻塞和唤醒操作。
LockSupport 提供了一种更直接、灵活和可控的线程阻塞和唤醒机制,适用于各种多线程协调的场景。
11、Exchanger
Exchanger 是 Java 并发包中的同步器之一,用于实现两个线程之间交换数据。
它提供了一个同步点,当两个线程都到达这个同步点时,它们可以交换数据。Exchanger 可以用于实现线程间的数据传递和协作
Exchanger 提供了两个线程之间交换数据的功能,但仅限于两个线程。当两个线程都到达 Exchanger 同步点时,它们可以通过 exchange() 方法交换数据,然后各自继续执行。
Exchanger基本用法
import java.util.concurrent.Exchanger; public class ExchangerExample { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); // 创建一个线程来发送数据 Thread senderThread = new Thread(() -> { try { String dataToSend = "Hello from Sender"; System.out.println("Sender is sending: " + dataToSend); exchanger.exchange(dataToSend); // 发送数据并等待接收数据 } catch (InterruptedException e) { e.printStackTrace(); } }); // 创建一个线程来接收数据 Thread receiverThread = new Thread(() -> { try { String receivedData = exchanger.exchange(null); // 等待接收数据并发送数据 System.out.println("Receiver received: " + receivedData); } catch (InterruptedException e) { e.printStackTrace(); } }); // 启动线程 senderThread.start(); receiverThread.start(); } }
运行结果
Sender is sending: Hello from Sender
Receiver received: Hello from Sender
在上述示例中,我们创建了一个 Exchanger 实例,然后创建了一个发送数据的线程和一个接收数据的线程。
当发送数据的线程调用 exchange() 方法时,它会发送数据并等待接收数据;而接收数据的线程调用 exchange() 方法时,它会等待接收数据并发送数据。当两个线程都到达 Exchanger 同步点时,它们会交换数据,并继续执行。
需要注意的是,Exchanger 只适用于两个线程之间的数据交换。如果需要更多线程之间的数据交换,可能需要组合使用多个 Exchanger。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论