#JUC并发编程
1. 并发基础概念:
并发编程是指多个线程同时执行程序的情况。在并发编程中,由于多个线程可能同时访问共享资源,因此需要考虑线程同步、原子性、可见性等问题。
线程安全:
指在多线程环境下,对共享数据进行访问时,不会出现数据污染或不一致的问题。为了实现线程安全,可以使用锁机制或者其他并发控制手段。
代码语言:javascript复制public class Counter {<!-- -->
private int count;
public synchronized void increment() {<!-- -->
count ;
}
public int getCount() {<!-- -->
return count;
}
}
在这个示例中,我们定义了一个计数器类Counter,并使用synchronized关键字来实现线程安全。在increment()方法中,我们使用synchronized关键字来保证一次只有一个线程可以进入临界区执行操作,从而避免了多个线程同时访问count变量的问题。
原子性:
指一个操作要么完全执行成功,要么完全执行失败,不会出现部分执行的情况。为了实现原子性,可以使用原子类或者CAS(Compare-and-Swap)等机制。
代码语言:javascript复制public class AtomicIntegerDemo {<!-- -->
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {<!-- -->
Thread t1 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
counter.incrementAndGet();
}
});
Thread t2 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
counter.incrementAndGet();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(counter.get());
}
}
在这个示例中,我们使用AtomicInteger类来实现原子操作。在线程t1和t2中,我们使用incrementAndGet()方法对计数器进行原子性自增操作,从而避免了多个线程同时访问count变量的问题。
可见性:
指当一个线程修改了共享数据后,其他线程能够立即看到该修改。为了实现可见性,可以使用volatile关键字或者synchronized关键字等机制。
代码语言:javascript复制public class VolatileDemo {<!-- -->
private static volatile boolean flag = false;
public static void main(String[] args) throws InterruptedException {<!-- -->
Thread readerThread = new Thread(() -> {<!-- -->
while (!flag) {<!-- -->
// do nothing
}
System.out.println("Flag is now true");
});
Thread writerThread = new Thread(() -> {<!-- -->
flag = true;
System.out.println("Flag is now true");
});
readerThread.start();
writerThread.start();
readerThread.join();
writerThread.join();
}
}
在这个示例中,我们使用volatile关键字来实现可见性。在readerThread中,我们不断地循环检查flag变量的值;在writerThread中,我们将flag变量设为true,并打印输出。由于flag变量是volatile类型的,因此一旦writerThread修改了该变量的值,readerThread就能够立即看到修改。
2. 线程池:
线程池是一种用于管理和复用线程的机制,可以提高系统资源利用率和响应速度,避免了频繁创建和销毁线程的开销。Java中提供了ThreadPoolExecutor类和Executors工具类来实现线程池。
ThreadPoolExecutor类:
是Java中线程池的核心实现类,通过参数配置可以自定义线程池的大小、任务队列、拒绝策略等属性。
代码语言:javascript复制public class ThreadPoolExecutorDemo {<!-- -->
public static void main(String[] args) {<!-- -->
ExecutorService executorService = new ThreadPoolExecutor(
2, // corePoolSize
5, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<Runnable>(10), // workQueue
new ThreadPoolExecutor.CallerRunsPolicy() // handler
);
for (int i = 1; i <= 20; i ) {<!-- -->
final int taskId = i;
executorService.execute(() -> {<!-- -->
System.out.println("Task #" taskId " is running on thread " Thread.currentThread().getName());
try {<!-- -->
Thread.sleep(2000);
} catch (InterruptedException e) {<!-- -->
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
在这个示例中,我们使用ThreadPoolExecutor类来实现线程池,并设置corePoolSize为2、maximumPoolSize为5、keepAliveTime为60秒、workQueue为LinkedBlockingQueue(容量为10)、handler为CallerRunsPolicy。然后我们使用execute()方法向线程池中提交20个任务,每个任务都会打印输出当前的线程名并休眠2秒钟。最后调用shutdown()方法关闭线程池。
Executors工具类:
是Java中线程池的辅助类,提供了一些静态方法来创建常用类型的线程池。
代码语言:javascript复制public class ExecutorsDemo {<!-- -->
public static void main(String[] args) {<!-- -->
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 1; i <= 20; i ) {<!-- -->
final int taskId = i;
executorService.execute(() -> {<!-- -->
System.out.println("Task #" taskId " is running on thread " Thread.currentThread().getName());
try {<!-- -->
Thread.sleep(2000);
} catch (InterruptedException e) {<!-- -->
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
在这个示例中,我们使用Executors工具类中的newFixedThreadPool()方法来创建固定大小为5的线程池,并使用execute()方法向线程池中提交20个任务,每个任务都会打印输出当前的线程名并休眠2秒钟。最后调用shutdown()方法关闭线程池。
总之,线程池是一种非常重要的并发编程机制,可以提高系统资源利用率和响应速度。通过ThreadPoolExecutor类和Executors工具类,我们可以方便地创建、配置和管理线程池,以及执行任务。
3. Lock接口:
Lock接口是Java中提供的一种显式锁机制,可以实现更细粒度的控制和管理。Lock接口提供了加锁和释放锁的方法,具有可重入性、公平性等特点,常用的实现类包括ReentrantLock、ReentrantReadWriteLock、StampedLock等。
ReentrantLock:
是Lock接口的一种实现方式,具有可重入性、公平性、可中断性等特点。
代码语言:javascript复制public class ReentrantLockDemo {<!-- -->
private static final Lock lock = new ReentrantLock();
private static int count = 0;
public static void main(String[] args) throws InterruptedException {<!-- -->
Thread t1 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
increment();
}
});
Thread t2 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
increment();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Count: " count);
}
private static void increment() {<!-- -->
lock.lock();
try {<!-- -->
count ;
} finally {<!-- -->
lock.unlock();
}
}
}
在这个示例中,我们使用ReentrantLock类来实现锁机制,并定义了一个计数器变量count。在increment()方法中,我们首先调用lock()方法获取锁;然后对计数器变量进行自增操作;最后调用unlock()方法释放锁。在main()方法中,我们创建两个线程t1和t2来分别执行increment()方法10000次,最终打印输出计数器变量count的值。
ReentrantReadWriteLock:
是Lock接口的另一种实现方式,具有读写分离、公平性等特点。适用于读操作远多于写操作的场景。
代码语言:javascript复制public class ReentrantReadWriteLockDemo {<!-- -->
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private static final Lock readLock = lock.readLock();
private static final Lock writeLock = lock.writeLock();
private static int count = 0;
public static void main(String[] args) throws InterruptedException {<!-- -->
Thread t1 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
increment();
}
});
Thread t2 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
get();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Count: " count);
}
private static void increment() {<!-- -->
writeLock.lock();
try {<!-- -->
count ;
} finally {<!-- -->
writeLock.unlock();
}
}
private static int get() {<!-- -->
readLock.lock();
try {<!-- -->
return count;
} finally {<!-- -->
readLock.unlock();
}
}
}
在这个示例中,我们使用ReentrantReadWriteLock类来实现读写锁机制,并定义了一个计数器变量count。在increment()方法中,我们首先调用writeLock()方法获取写锁;然后对计数器变量进行自增操作;最后调用writeUnlock()方法释放写锁。在get()方法中,我们首先调用readLock()方法获取读锁;然后返回计数器变量的值;最后调用readUnlock()方法释放读锁。在main()方法中,我们创建两个线程t1和t2来分别执行increment()和get()方法10000次,最终打印输出计数器变量count的值。
StampedLock:
是Lock接口的另一种实现方式,具有乐观锁和悲观锁等特点。适用于读操作频繁而写操作较少的场景。
代码语言:javascript复制public class StampedLockDemo {<!-- -->
private static final StampedLock lock = new StampedLock();
private static int count = 0;
public static void main(String[] args){<!-- -->
Thread t1 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
increment();
}
});
Thread t2 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
get();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Count: " count);
}
private static void increment() {<!-- -->
long stamp = lock.writeLock();
try {<!-- -->
count ;
} finally {<!-- -->
lock.unlockWrite(stamp);
}
}
private static int get() {<!-- -->
long stamp = lock.tryOptimisticRead();
int c = count;
if (!lock.validate(stamp)) {<!-- -->
stamp = lock.readLock();
try {<!-- -->
c = count;
} finally {<!-- -->
lock.unlockRead(stamp);
}
}
return c;
}
}
在这个示例中,我们使用StampedLock类来实现乐观锁机制,并定义了一个计数器变量count。在increment()方法中,我们首先调用writeLock()方法获取写锁;然后对计数器变量进行自增操作;最后调用unlockWrite()方法释放写锁。在get()方法中,我们首先调用tryOptimisticRead()方法获取乐观读锁,并记录当前的版本号stamp和计数器变量的值c;然后判断版本号是否有效,如果无效则调用readLock()方法获取悲观读锁,并重新读取计数器变量的值;最后调用unlockRead()方法释放悲观读锁。在main()方法中,我们创建两个线程t1和t2来分别执行increment()和get()方法10000次,最终打印输出计数器变量count的值。
4. Condition接口:
Condition接口是与Lock接口配合使用的一种线程协作机制。它可以实现更细粒度的线程等待和通知,并且可以支持多个条件变量,比如对于生产者-消费者模型中的缓冲区,可以分别使用一个notFull和notEmpty条件变量来进行生产者和消费者之间的协作。
代码语言:javascript复制public class ConditionDemo {<!-- -->
private static final Lock lock = new ReentrantLock();
private static final Condition notEmpty = lock.newCondition();
private static final Condition notFull = lock.newCondition();
private static final int CAPACITY = 10;
private static final Queue<Integer> queue = new LinkedList<>();
public static void main(String[] args) throws InterruptedException {<!-- -->
Thread t1 = new Thread(() -> {<!-- -->
for (int i = 0; i < 20; i ) {<!-- -->
try {<!-- -->
produce(i);
} catch (InterruptedException e) {<!-- -->
e.printStackTrace();
}
}
});
Thread t2 = new Thread(() -> {<!-- -->
for (int i = 0; i < 20; i ) {<!-- -->
try {<!-- -->
consume();
} catch (InterruptedException e) {<!-- -->
e.printStackTrace();
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
private static void produce(int value) throws InterruptedException {<!-- -->
lock.lock();
try {<!-- -->
while (queue.size() == CAPACITY) {<!-- -->
notFull.await();
}
queue.offer(value);
System.out.println("Produced: " value);
notEmpty.signalAll();
} finally {<!-- -->
lock.unlock();
}
}
private static void consume() throws InterruptedException {<!-- -->
lock.lock();
try {<!-- -->
while (queue.isEmpty()) {<!-- -->
notEmpty.await();
}
int value = queue.poll();
System.out.println("Consumed: " value);
notFull.signalAll();
} finally {<!-- -->
lock.unlock();
}
}
}
在这个示例中,我们使用Lock接口的实现类ReentrantLock来提供锁机制,并使用newCondition()方法创建了两个条件变量notEmpty和notFull。然后定义了一个容量为10的队列queue,并编写了produce()方法和consume()方法来分别实现生产者和消费者的功能。在produce()方法中,首先调用lock()方法获取锁;然后判断队列是否已满,如果是则调用notFull.await()方法等待notFull条件变量的信号;否则将数据加入到队列中,打印输出生产的数据,并通过notEmpty.signalAll()方法通知等待notEmpty条件变量的其他线程。在consume()方法中,首先调用lock()方法获取锁;然后判断队列是否为空,如果是则调用notEmpty.await()方法等待notEmpty条件变量的信号;否则从队列中取出数据,打印输出消费的数据,并通过notFull.signalAll()方法通知等待notFull条件变量的其他线程。在main()方法中,我们创建两个线程t1和t2来分别执行生产者和消费者的功能,最终演示缓冲区中的数据生产和消费过程。
5. CAS(Compare-And-Swap)操作:
CAS操作是一种基于硬件指令级别的原子操作,可以实现非阻塞算法。它可以在多线程并发执行时保证数据的一致性和正确性。
CAS操作涉及到三个参数:需要更新的内存位置V、期望值A和新值B。当且仅当预期值A与内存位置V中的当前值相同时,才会将内存位置V中的值更新为新值B;否则,不进行任何操作。通过不断重试直至成功,从而保证了并发情况下的数据原子性和一致性。
代码语言:javascript复制public class CASDemo {<!-- -->
private static final AtomicReference<String> atomicStr = new AtomicReference<>("Hello, World!");
public static void main(String[] args) throws InterruptedException {<!-- -->
Thread t1 = new Thread(() -> {<!-- -->
boolean swapped = false;
while (!swapped) {<!-- -->
String prev = atomicStr.get();
String next = prev.replace("World", "John");
swapped = atomicStr.compareAndSet(prev, next);
}
});
Thread t2 = new Thread(() -> {<!-- -->
boolean swapped = false;
while (!swapped) {<!-- -->
String prev = atomicStr.get();
String next = prev.replace("World", "Mary");
swapped = atomicStr.compareAndSet(prev, next);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(atomicStr.get());
}
}
在这个示例中,我们使用AtomicReference类来实现CAS操作,并定义了一个初始字符串"Hello, World!“。在t1线程中,我们不断尝试将字符串中的"World"替换成"John”;在t2线程中,我们不断尝试将字符串中的"World"替换成"Mary"。通过compareAndSet()方法比较并更新字符串的值,直至成功,最终输出更新后的字符串。
6. 原子类:
原子类是Java中提供的一种线程安全机制,它封装了常见的原子操作,并保证了这些操作的原子性和可见性。原子类包括AtomicBoolean、AtomicInteger、AtomicLong等,可以用于实现非阻塞算法、锁机制等多种并发编程场景。
代码语言:javascript复制public class AtomicDemo {<!-- -->
private static final AtomicInteger atomicInt = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {<!-- -->
Thread t1 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
atomicInt.incrementAndGet();
}
});
Thread t2 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
atomicInt.addAndGet(2);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Count: " atomicInt.get());
}
}
在这个示例中,我们使用AtomicInteger类来实现原子操作,并定义了一个初始值为0的计数器atomicInt。在t1线程中,我们调用incrementAndGet()方法对计数器进行自增;在t2线程中,我们调用addAndGet()方法对计数器进行自增2。最终输出计数器的值。
除了AtomicInteger,Java还提供了其他原子类如AtomicBoolean、AtomicLong等,并且可以通过自定义的方式实现自己的原子类。这些原子类可以用于实现非阻塞算法、锁机制等多种并发编程场景,是Java中常见的一种线程安全机制。
7. 并发容器:
并发容器是Java中提供的一种线程安全机制,它封装了常见的容器类,并保证了这些容器类的线程安全性。并发容器包括ConcurrentHashMap、CopyOnWriteArrayList等,可以用于实现多线程并发访问数据的场景。
代码语言:javascript复制public class ConcurrentHashMapDemo {<!-- -->
public static void main(String[] args) throws InterruptedException {<!-- -->
Map<String, Integer> map = new ConcurrentHashMap<>();
Thread t1 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
map.put("Key" i, i);
}
});
Thread t2 = new Thread(() -> {<!-- -->
for (int i = 0; i < 10000; i ) {<!-- -->
map.remove("Key" i);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Size: " map.size());
}
}
在这个示例中,我们使用ConcurrentHashMap类来实现并发容器,并定义了一个空的map对象。在t1线程中,我们利用put()方法向map中添加10000个键值对;在t2线程中,我们利用remove()方法从map中移除10000个键值对。最终输出map的大小。
除了ConcurrentHashMap,Java还提供了其他并发容器如CopyOnWriteArrayList、ConcurrentLinkedQueue等,并且可以通过自定义的方式实现自己的并发容器。这些并发容器可以用于实现多线程并发访问数据的场景,是Java中常见的一种线程安全机制。
8. CountDownLatch:
CountDownLatch是Java中提供的一种线程同步机制,用来控制线程的执行顺序和同步。它通过计数器来实现,当计数器的值减为0时,所有等待线程会被释放,继续执行后续操作。
代码语言:javascript复制public class CountDownLatchDemo {<!-- -->
private static final int THREAD_COUNT = 5;
private static final CountDownLatch startLatch = new CountDownLatch(1);
private static final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
public static void main(String[] args) throws InterruptedException {<!-- -->
for (int i = 0; i < THREAD_COUNT; i ) {<!-- -->
Thread t = new Thread(() -> {<!-- -->
try {<!-- -->
System.out.println(Thread.currentThread().getName() " waiting to start...");
startLatch.await();
System.out.println(Thread.currentThread().getName() " started");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() " finished");
} catch (InterruptedException e) {<!-- -->
e.printStackTrace();
} finally {<!-- -->
endLatch.countDown();
}
}, "Thread-" i);
t.start();
}
Thread.sleep(3000);
System.out.println("All threads ready, start now!");
startLatch.countDown();
endLatch.await();
System.out.println("All threads finished!");
}
}
在这个示例中,我们使用CountDownLatch类来实现线程同步,并定义了两个计数器startLatch和endLatch。在main方法中,我们创建了5个线程,每个线程都会等待startLatch的计数器值减为0后才开始执行;执行完毕后,将endLatch的计数器值减1。在主线程中,我们等待3秒钟后,将startLatch的计数器值减为0,从而使5个等待的线程开始执行;然后等待所有线程的执行结束,即当endLatch的计数器值减为0时输出"All threads finished!"。
9. CyclicBarrier:
CyclicBarrier是Java中提供的一种线程同步机制,它也用来控制线程的执行顺序和同步。与CountDownLatch不同的是,CyclicBarrier可以重复使用,即在计数器值减为0后可以自动重置计数器,从而继续等待下一轮任务的到来。
代码语言:javascript复制public class CyclicBarrierDemo {<!-- -->
private static final int THREAD_COUNT = 5;
private static final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {<!-- -->
System.out.println("All threads arrived at barrier!");
});
public static void main(String[] args) throws InterruptedException {<!-- -->
for (int i = 0; i < THREAD_COUNT; i ) {<!-- -->
Thread t = new Thread(() -> {<!-- -->
try {<!-- -->
System.out.println(Thread.currentThread().getName() " working...");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() " arrived at barrier!");
barrier.await();
System.out.println(Thread.currentThread().getName() " continue working...");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() " finished");
} catch (InterruptedException | BrokenBarrierException e) {<!-- -->
e.printStackTrace();
}
}, "Thread-" i);
t.start();
}
}
}
在这个示例中,我们使用CyclicBarrier类来实现线程同步,并定义了一个计数器barrier。在main方法中,我们创建了5个线程,每个线程都会先工作一段时间,然后等待其他线程到达barrier;当所有线程都到达barrier时,会执行barrier的回调函数,并将计数器值重置为初始值。之后,每个线程继续工作一段时间,最终输出"finished"。
10. Semaphore:
Semaphore是Java中提供的一种线程同步机制,用来控制资源的访问数量。它通过内部维护的计数器来实现,当计数器的值大于0时,允许访问资源;否则,需要等待其他线程释放资源后才能访问。
代码语言:javascript复制public class SemaphoreDemo {<!-- -->
private static final int THREAD_COUNT = 10;
private static final Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) throws InterruptedException {<!-- -->
for (int i = 0; i < THREAD_COUNT; i ) {<!-- -->
Thread t = new Thread(() -> {<!-- -->
try {<!-- -->
semaphore.acquire();
System.out.println(Thread.currentThread().getName() " acquiring resource...");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() " releasing resource...");
semaphore.release();
} catch (InterruptedException e) {<!-- -->
e.printStackTrace();
}
}, "Thread-" i);
t.start();
}
}
}
在这个示例中,我们使用Semaphore类来实现线程同步,并定义了一个计数器semaphore。在main方法中,我们创建了10个线程,每个线程都会尝试获取semaphore的许可证;当许可证已被占用时,线程会被阻塞,直到有其他线程释放许可证。然后,每个线程会工作一段时间,最终释放许可证。
11. Future接口和CompletableFuture类:
Future接口是Java中提供的一种异步编程模型,它可以对异步任务进行处理,并在任务完成后获取结果。Future接口提供了一系列方法用于查询任务是否完成、等待任务完成以及获取任务执行结果。
CompletableFuture类是Java 8中新增的一个类,它继承自Future接口,并提供了更加简洁易用的异步编程方式。CompletableFuture类支持链式调用、组合多个子任务、异常处理等特性,可以方便地实现复杂的异步编程逻辑。
代码语言:javascript复制public class CompletableFutureDemo {<!-- -->
public static void main(String[] args) throws InterruptedException, ExecutionException {<!-- -->
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {<!-- -->
System.out.println("Task started.");
try {<!-- -->
Thread.sleep(5000);
} catch (InterruptedException e) {<!-- -->
e.printStackTrace();
}
System.out.println("Task finished.");
return "Hello, World!";
});
System.out.println("Waiting for the result...");
String result = future.get();
System.out.println("Result: " result);
}
}
在这个示例中,我们使用CompletableFuture类来实现异步编程,并定义了一个supplyAsync()方法,它会在另一个线程中执行任务并返回计算结果。在main方法中,我们创建了一个CompletableFuture对象future,然后调用get()方法等待任务完成,并获取计算结果。最终输出结果"Hello, World!"。
除了supplyAsync()方法,CompletableFuture类还提供了一系列方法用于处理异步任务,例如thenApply()、thenAccept()、thenRun()、thenCompose()等。这些方法都支持链式调用,可以方便地实现异步编程逻辑。