java 对线程安全支持有哪些?

2024-02-21 11:24:38 浏览数 (2)

同步容器。它的原理是将状态封装起来,并对每个公有方法都实行同步,使得每次只有1个线程能够访问容器的状态。

  • Vector和HashTable
  • Collections.synchronizedXXX方法

同步容器的问题

  1. 这种方式使得对容器的访问都串行化,严重降低了并发性,如果多个线程来竞争容器的锁时,吞吐量严重降低
  2. 对容器的多个方法的复合操作,是线程不安全的,比如一个线程负责删除,另一个线程负责查询,有可能出现越界的异常

并发容器。java.util.concurrent包里面的一系列实现

  • Concurrent开头系列。以ConcurrentHashMap为例,它的实现原理为分段锁。默认情况下有16个,每个锁守护1/16的散列数据,这样保证了并发量能达到16

分段锁缺陷在于虽然一般情况下只要一个锁,但是遇到需要扩容等类似的事情,只能去获取所有的锁 ConcurrentHashMap一些问题

  1. 需要对整个容器中的内容进行计算的方法,比如size、isEmpty、contains等等。由于并发的存在,在计算的过程中可能已进过期了,它实际上就是个估计值,但是在并发的场景下,需要使用的场景是很少的。 以ConcurrentHashMap的size方法为例:

/** * Returns the number of key-value mappings in this map. If the * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns * <tt>Integer.MAX_VALUE</tt>. * * @return the number of key-value mappings in this map */ public int size() { //为了能够算准数量,会算2次,如果两次算的不准,就锁住再算 final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // 第一轮的计算总数不重试 try { for (;;) { if (retries == RETRIES_BEFORE_LOCK) { //RETRIES_BEFORE_LOCK 默认是2 for (int j = 0; j < segments.length; j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum = seg.modCount; int c = seg.count; if (c < 0 || (size = c) < 0) overflow = true; } } //第一次计算的时候 if (sum == last) break; //如果前后两次数数一致,就认为已经算好了 last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }

  1. 不能提供线程独占的功能
  • CopyOnWrite系列。以CopyOnWriteArrayList为例,只在每次修改的时候,进行加锁控制,修改会创建并重新发布一个新的容器副本,其它时候由于都是事实上不可变的,也就不会出现线程安全问题

CopyOnWrite的问题 每次修改都复制底层数组,存在开销,因此使用场景一般是迭代操作远多于修改操作 CopyOnWriteArrayList的读写示例 /** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return <tt>true</tt> (as specified by {@link Collection#add}) */ public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } } /** * {@inheritDoc} * * @throws IndexOutOfBoundsException {@inheritDoc} */ public E get(int index) { return get(getArray(), index); } /** * Gets the array. Non-private so as to also be accessible * from CopyOnWriteArraySet class. */ final Object[] getArray() { return array; } private E get(Object[] a, int index) { return (E) a[index]; }

java中的同步工具类

阻塞队列,BlockingQueue。它提供了put和take方法,在队列不满足各自条件时将产生阻塞

BlockingQueue使用示例,生产者-消费者 public static void main(String[] args) throws Exception { BlockingQueue queue = new ArrayBlockingQueue(1024); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer).start(); new Thread(consumer).start(); } } public class Producer implements Runnable{ protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { try { queue.put("1"); Thread.sleep(1000); queue.put("2"); Thread.sleep(2000); queue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } } } public class Consumer implements Runnable{ protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { try { System.out.println(queue.take()); System.out.println("Wait 1 sec"); System.out.println(queue.take()); System.out.println("Wait 2 sec"); System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } 输出为 1 Wait 1 sec 2 Wait 2 sec 3

闭锁

  • CountDownLatch。使多个线程等待一组事件发生,它包含一个计数器,表示需要等待的事件的数量,每发生一个事,就递减一次,当减为0时,所有事情发生,允许“通行”

CountDownLatch示例: public class TestHarness{ public long timeTasks(int nThreads,final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i=0;i<nThreads;i ){ Thread t = new Thread(){ public void run(){ try { startGate.await(); try { task.run(); }finally { endGate.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end=System.nanoTime(); return end-start; } }

启动门使主线程能够同时释放所有的工作线程,结束门使得主线程能够等待最后一个线程执行完

  • FutureTask。Future.get的如果任务执行完成,则立即返回,否则将阻塞直到任务完结,再返回结果或者是抛出异常

信号量,Semaphore 。它管理着一组虚拟的许可,许可的数量可通过构造函数指定,在执行操作时首先获得许可,并在使用后释放许可,如果没有,那么accquire将阻塞直到有许可。

Semaphore示例 public class BoundedHashSet<T>{ private final Set<T> set; private final Semaphore sem; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<T>()); this.sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; }finally { if (!wasAdded){ sem.release(); } } } public boolean remove(Object o){ boolean wasRemoved = set.remove(o); if(wasRemoved){ sem.release(); } return wasRemoved; } }

栅栏。它能阻塞一组线程直到某个事件发生。 与闭锁的区别:

  • 所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其它线程。
  • 闭锁一旦进入终止状态,就不能被重置,它是一次性对象,而栅栏可以重置
  • CyclicBarrier。可以使一定数量的参与方反复地在栅栏位置汇集

CyclicBarrier使用示例

public static void main(String[] args) { //第k步执行完才能执行第k 1步 CyclicBarrier barrier = new CyclicBarrier(3,new StageKPlusOne()); StageK[] stageKs = new StageK[3]; for (int i=0;i<3;i ){ stageKs[i] = new StageK(barrier,"k part " (i 1)); } for (int i=0;i<3;i ){ new Thread(stageKs[i]).start(); } } class StageKPlusOne implements Runnable{ @Override public void run() { System.out.println("stage k over"); System.out.println("stage k 1 start counting"); } } class StageK implements Runnable{ private CyclicBarrier barrier; private String stage; public StageK(CyclicBarrier barrier, String stage) { this.barrier = barrier; this.stage = stage; } @Override public void run() { System.out.println("stage " stage " counting..."); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("stage " stage " count over"); try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } 输出为 stage k part 1 counting... stage k part 3 counting... stage k part 2 counting... stage k part 2 count over stage k part 3 count over stage k part 1 count over stage k over stage k 1 start counting

  • Exchanger。它是一种两方栅栏,各方在栅栏位置交换数据 Exchanger 使用示例:
代码语言:javascript复制
public static void main(String[] args) {
       Exchanger exchanger = new Exchanger();
        ExchangerRunnable er1 = new ExchangerRunnable(exchanger,"1");
        ExchangerRunnable er2 = new ExchangerRunnable(exchanger,"2");
        new Thread(er1).start();
        new Thread(er2).start();
    
    }
    class ExchangerRunnable implements Runnable{
    
    private Exchanger e;
    private Object o;

    public ExchangerRunnable(Exchanger e, Object o) {
       this.e = e;
        this.o = o;
}
   
    @Override
    public void run() {
       Object pre=o;
        try {
            o=e.exchange(o);
            System.out.println("pre:" pre " now:" o);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
    }
}

输出如下

代码语言:javascript复制
pre:1 now:2
pre:2 now:1

0 人点赞