多线程三 并发容器简单使用

2021-01-29 11:06:31 浏览数 (1)

1.threadLocal

本地线程变量,在每个线程会独立开辟内存空间。

在高并发先不要使用。

代码语言:javascript复制
private static ThreadLocal local = new ThreadLocal();

public static void main(String[] args) {
    new Thread(()->{
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        local.set("A");
        System.out.println(Thread.currentThread().getName() " " local.get());
    },"线程 1").start();
    new Thread(()->{
        local.set("B");
        System.out.println(Thread.currentThread().getName() " " local.get());
    },"线程 2").start();
    System.out.println("main " local.get());
}

2.ArrayList、Vector、CopyOnWriteArrayList

这里三个容器,ArrayList、vector和CopyOnwriteArrayList

从测试结果中可以看出:ArrayList不是线程安全,会进行很多重复的操作,致使在所有线程运行完后,大小达不到10000,而Vector是线程安全的,不会出现重复读,重复写的问题,而copyOnWriterArrayList呢,是读快,写慢,因为他写的时候要进行复制,大大拉低了效率;

代码语言:javascript复制
 public static void main(String[] args) {
//        List<String> list1 = new ArrayList<>();// 耗时:12  大小:9754
//        Vector<String> list1 = new Vector<>(); // 耗时:10 大小:10000
        List<String> list1 = new CopyOnWriteArrayList<>(); // 写耗时:75 写大小:10000 读耗时:22
        Thread[] threads = new Thread[100];

        Random r = new Random();
        for (int i = 0; i < 100; i  ) {
            threads[i] =  new Thread(()->{
                for (int j = 0; j < 100; j  ) {
                    int i1 = r.nextInt(10000);
                    list1.add("A" i1);
                    ;
                }
            });
        }
        long time = System.currentTimeMillis();
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("大小 " list1.size());
        System.out.println("耗时:" (end - time));

        Thread[] threads2 = new Thread[100];
        for (int i = 0; i < 100; i  ) {
            threads2[i] = new Thread(()->{
                for (int j = 0; j < 10000; j  ) {
                    list1.get(j);
                }
            });
        }
        long time2 = System.currentTimeMillis();
        for (Thread thread : threads2) {
            thread.start();
        }
        for (Thread thread : threads2) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("耗时:" (System.currentTimeMillis() - time2));
    }

Collections提供了一个方法synchronizedList可以将非线程安全的容器转换为线程安全的容器,看它的实现就是在方法上加了一个synchronized关键字,但是使用iterator的时候就不行了,因为他调用的还是List本身的迭代器,没有做同步处理,所以在这种情况下要手动上锁;注意:我们常使用的增加for也是利用了迭代器。

代码语言:javascript复制
List<String> list = new ArrayList<>();
List<String> strings = Collections.synchronizedList(list);

Synchronized 和Vecotr最主要的区别:

  1. vector扩容为原来的2倍长度,ArrayList扩容为原来的1.5倍
  2. synchronized有很好的扩展和兼容功能,它可以将所有的list的子类转成线程安全的类
  3. 使用synchronizedList的时候,进行遍历需要手动进行同步处理
  4. synchronizedlist可以指定锁定的对象

3.map相关并发容器

代码语言:javascript复制
public static void main(String[] args) {
//    Map<String, String> map = new ConcurrentHashMap<>(); // 302
//        Map<String, String> map = new ConcurrentSkipListMap<>(); // 358
//    Map<String, String> map = new Hashtable<>(); // 400

      Map<String, String> m = new HashMap<>();
      Map<String, String> map = Collections.synchronizedMap(m); // 465

        Random random = new Random();
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads.length; i  ) {
            threads[i] = new Thread(()->{
                for(int j=0; j<10000;j  ) {
                    map.put("a"   random.nextInt(100000), "a"   random.nextInt(100000));
//             map1.put("a"   random.nextInt(100000), "a"   random.nextInt(100000));
                }
                latch.countDown();
            });
        }
        Arrays.asList(threads).forEach(t->t.start());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        System.out.println(end-start);
    }

hashMap

hashTable

concurrentHashMap

key/value = null

可以存储null键,nullvalue

key和value都不能存储

key和value都不能存储

初始容量

16

11

16

线程安全

效率

< concurrentHashMap

>hashTable

ConcurrentSkipListMap 和上面不同,他的实现方式不一样,且key是有序的,支持更高的并发,存取是是log(N),和线程数无关,线程越多越能体现出优势。

4.ConcurrentLinkedQueue

模拟场景:1000张票,10个售票窗口

代码语言:javascript复制
public class ThreadContainer2 {

    private static List<Integer> list = new ArrayList<Integer>();
    private Integer tickets = 100;

    public synchronized void sall(){
        if (tickets > 0) {
            list.add(tickets--);
            System.out.println(Thread.currentThread().getName() " sall " tickets);
        }
    }

    public Integer getTicket(){
        return tickets;
    }

    public static void main(String[] args) {
        ThreadContainer2 t = new ThreadContainer2();

        System.out.println("开始售票。。。");
        for (int i = 0; i < 10; i  ) {
            new Thread(()->{
                while (true) {
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        t.sall();
                    }
            },"售票口 " (i 1)).start();
        }
    }
}

使用ConcurrentLinkedQueue来实现看看:

代码语言:javascript复制
private static Queue<Integer> list = new ConcurrentLinkedQueue<Integer>();

static{
    for (int i = 0; i < 1000; i  ) {
        list.add(i);
    }
}

public static void main(String[] args) {
    for (int i = 0; i < 10; i  ) {
        new Thread(()->{
            while (true) {
                // 这个方法是线程安全的
                Integer poll = list.poll();
                if (poll == null) {
                    break;
                }
                // 就算多个线程都同时来到这里,因为上一个代码是线程安全的,所以就算来到这,也是空
                System.out.println(Thread.currentThread().getName() " ticket " poll);
            }
        }," 售票口 " i).start();
    }
}

代码简化了,没有之前的那种复杂

这个容器比较有用的方法:

代码语言:javascript复制
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();

for (int i = 0; i < 10; i  ) {
    queue.add("a" i);
}
System.out.println("容器" queue.size());
System.out.println("poll取值:" queue.poll());
System.out.println("poll 取值后的容器" queue.size());
System.out.println("peek取值:" queue.peek());
System.out.println("peek取值后的容量:" queue.size());

这个容器多用于生产者和消费者问题

offer(e) // 队列可以放入,则返回true,不能返回false

poll() // 取不到数据,返回null,取到的对象会被丢弃

peek() //和poll一样,但是取出的对象不会被丢弃

add() // 底层调用offer,不会抛出异常

5.BlockingQueue提供的方法

抛异常

特定值

阻塞

超时

插入

add(E e)

offer(E e)

put(E e)

offer(E e, long timeout, TimeUnit unit)

移除

remove(Object o)

E poll()

take()

poll(long timeout, TimeUnit unit)

offer(E e); // 队列可以放入,则返回true,不能返回false

offer(E e, long timeout, TimeUnit unit) // 指定时间内,还不能放入就返回false

E poll() // 取不到数据,返回null,取到的对象会被丢弃

poll(long timeout, TimeUnit unit) // 指定时间内,能取到数据就取队头,没有则返回异常

E peek() // 和poll一样,但是取出的对象不会被丢弃

5.1ArrayBlockingQueue

基于数组的有界队列,创建后无法改变容量

代码语言:javascript复制
public static void main(String[] args) throws InterruptedException {
   BlockingQueue<String> strings = new ArrayBlockingQueue<>(10);
   for (int i = 0; i < 10; i  ) {
      strings.put("a"   i);
   }
   strings.add("aaaa");
   strings.offer("aaaa",1, TimeUnit.SECONDS);
   System.out.println(strings);
}

5.2LinkedBlockingQeque

无界,最大长度为Integer.MAX_VALUE

代码语言:javascript复制
public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

        Thread[] threads = new Thread[1];
        Thread[] threads2 = new Thread[10];

        for (int i = 0; i < 1; i  ) {
            threads[i] = new Thread(()->{
//                while (true) {
                    for (int j = 0; j < 100; j  ) {
                        try {
                            queue.put(j);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
//                }
            },"线程 " i);
        }

        for (int i = 0; i < 10; i  ) {
            threads2[i] = new Thread(()->{
                while (true) {
                    try {
                        System.out.println(Thread.currentThread().getName() " 获取 " queue.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        Arrays.asList(threads).forEach(a->a.start());
        Arrays.asList(threads2).forEach(a->a.start());
    }

5.3DelayQueue

队列中的元素需要实现Delayed接口,实现排序和延时的定义

代码语言:javascript复制
public class ThreadContainer11 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<MyQ> queue = new DelayQueue<>();
        long start = System.currentTimeMillis();
        MyQ m1 = new MyQ(start 1000,"延迟1秒");
        MyQ m2 = new MyQ(start 2000,"延迟2秒");
        MyQ m3 = new MyQ(start 3000,"延迟3秒");
        MyQ m4 = new MyQ(start 4000,"延迟4秒");
        MyQ m5 = new MyQ(start 5000,"延迟5秒");
        queue.put(m1);
        queue.put(m2);
        queue.put(m3);
        queue.put(m4);
        queue.put(m5);

        for (int i = 0; i < 5; i  ) {
            System.out.println(queue.take().getName());
        }
    }

    static class MyQ implements Delayed {
        long time ;
        String name;
        public MyQ(long time,String name){
            this.time = time;
            this.name = name;
        }
        // 定义剩余到期时间
        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }

        // 定义排序规则
        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
                return 1;
            }else if(this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)){
                return -1;
            }else {
                return 0;
            }
        }
        public String getName(){
            return name;
        }
    }
}

5.4LinkedTransferQueue

比其他队列多一个transfer ;使用transfer 的方式例如有多个消费者,和一个生产者,当存在消费者的情况下,他会将对象给消费者,不会放到队列里,再让消费取,比较快

代码语言:javascript复制
public static void main(String[] args) throws InterruptedException {
    LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

    Thread t1 = new Thread(()->{
        try {
            System.out.println("线程1 " queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    Thread t2 = new Thread(()->{
        try {
            System.out.println("线程2 " queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    t1.start();
    t2.start();
    
    queue.transfer("aaaaaa");
    System.out.println(queue.size());
}

5.5 PriorityBlockingQueue

这个队列,会对放入的对象进行优先级排序,而优先级排序是有compareTo方法定义的,必须实现接口Comparable,但是他的排序只会将排序后的第一个放到队列第一个,其他仍是乱的

代码语言:javascript复制
public class ThreadContainer8 implements Comparable {

    private Integer i;

    public ThreadContainer8(){}

    public ThreadContainer8(Integer i) {
        this.i = i;
    }

    @Override
    public int compareTo(Object o) {
        return i - ((ThreadContainer8) o).getI();
    }

    public Integer getI() {
        return i;
    }

    public static void main(String[] args) {
        PriorityBlockingQueue<ThreadContainer8> q = new PriorityBlockingQueue();
        Random r = new Random();
        Thread[] threads = new Thread[10];

      for (int i = 0; i < 10; i  ) {
          threads[i] = new Thread(()->{
              for (int j = 0; j < 10; j  ) {
                  q.put(new ThreadContainer8(r.nextInt(100)));
              }
          });
      }
        Arrays.asList(threads).forEach(t->t.start());
        Arrays.asList(threads).forEach(t->
                {
                    try {
                        t.join();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        );
        q.forEach(e-> System.out.print(e.getI() " "));
        System.out.println();
        System.out.println("s=" q.peek().getI());
    }
}

5.6 SynchronousQueue

同步队列;容量为1,放入一个就要消费掉,不要用add

代码语言:javascript复制
public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();

        // 报queue full异常
//        for (int i = 0; i < 5; i  ) {
//            queue.add(i);
//        }
        System.out.println(queue.size());

        Thread t1 = new Thread(()->{
            try {
                System.out.println("线程1 " queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Thread t2 = new Thread(()->{
            try {
                System.out.println("线程2 " queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        t1.start();
        t2.start();
        for (int i = 0; i < 2; i  ) {
            queue.put(i);
        }
    }

总结:

名称

类名

说明

List相关并发容器

CopyOnWriteArrayList

读快写慢;写前复制,写完替换

Map/Set相关并发容器

ConcurrentHashMap

key和value都不能存储

ConcurrentSkipListMap

key和value都不能存储;key有序;它的存取时间log(N)

队列相关容器

ConcurrentLinkedQeque

FIFO(先入先出);cas实现

ConcurrentLinkedDueue

双向队列

ArrayBlockingQueue

有界队列,固定容量,不支持空元素

DelayQueue

提供过期时间功能;队列头是最接近过期的元素,没有过期元素则会取出null

LinkedBlockingQeque

无界队列,最大是Integer.MAX_VALUE;添加删除不是互斥的

LinkedBlockingDueue

双向阻塞队列

LinkedTransferQueue

比其他队列多transfer方法,这个方法是会将元素给一个正在等待的消费者,不会放入队列,没有消费者则等待

PriorityBlockingQueue

放入对象必须实现Comparable接口,按compareTo实现方法,将优先级高的放在第一个,其他对象无序

SynchronousQueue

容量为1,即时消费,提供了put的阻塞方法,底层使用transfer

0 人点赞