Java 并发编程(四)并发容器

本贴最后更新于 1961 天前,其中的信息可能已经事过景迁

并发容器的引出: 售票问题

  有 N 张火车票,每张票都有一个编号同时有 10 个窗口对外售票,请写一个模拟程序。

实现 1:使用 List-非原子性操作

public class TicketSeller1 { static List<String> tickets = new ArrayList<>(); static { for (int i = 0; i < 1000; i++) { tickets.add("票-" + i); } } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(() -> { while (tickets.size() > 0) { // List的remove操作不是原子性的 System.out.println("销售了:" + tickets.remove(0)); } }).start(); } } }

输出如下,我们发现发生了重售:

... 销售了:票-998 销售了:票-999 销售了:票-999

实现 2:使用 Vector-判断与操作分离,复合操作不保证原子性

  Vector 的所有操作均为原子性的,但仍会出现问题,因为判断与操作是分离的,形成的复合操作不能保证原子性。

public class TicketSeller2 { static Vector<String> tickets = new Vector<>(); static { for (int i = 0; i < 1000; i++) { tickets.add("票-" + i); } } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(() -> { while (tickets.size() > 0) { // 将问题方法,睡1s try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("销售了:" + tickets.remove(0)); } }).start(); } } }

实现 3: 使用同步代码块锁住复合操作-保证正确性但效率低

  我们使用 synchronized 代码块将判断和取票操作锁在一起执行,保证其原子性。

  这样可以保证售票过程的正确性,但每次取票都要锁定整个队列,效率低。

public class TicketSeller3 { static List<String> tickets = new ArrayList<>(); static { for (int i = 0; i < 1000; i++) { tickets.add("票-" + i); } } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(() -> { while (tickets.size() > 0) { // sychronized 保证了原子性 synchronized (tickets) { System.out.println("销售了:" + tickets.remove(0)); } } }).start(); } } }

实现 4: 使用并发队列,先取票再判断

  使用 JDK1.5 之后提供的并发队列 ConcurrentLinkedQueue 存储元素,其底层使用 CAS 实现而非加锁实现的,其效率较高。
  并发队列 ConcurrentLinkedQueue 的 poll()方法会尝试从队列头中取出一个元素,若获取不到,则返回 null,对其返回值做判断可以实现先取票后判断,可以避免加锁。

public class TicketSeller4 { static ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); static { for (int i = 0; i < 1000; i++) { queue.add("票-" + i); } } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(() -> { while (true) { String t = queue.poll(); // 取出头,拿不到就是空值 if (t == null) { break; } System.out.println("销售了:" + t); } }).start(); } } }

并发容器

Map/Set

  MapSet 容器类型是类似的,Set 无非就是屏蔽了 Mapvalue 项,只保留 key 项.
2019072108573679.png

非并发容器

  主要的非并发容器有 HashMap,TreeMap,LinkedHashMap

并发容器

  主要的并发容器有 HashTable,SynchronizedMap,ConcurrentMap

  • HashTableSynchronizedMap 的效率较低,其同步的实现原理类似,都是给容器的所有方法都加锁.

    其中 SynchronizedMap 使用装饰器模式,调用其构造方法并传入一个 Map 实现类,返回一个同步的 Map 容器.

  • ConcurrentMap 的效率较高,有两个实现类:

    • ConcurrentHashMap: 使用哈希表实现,key 是无序的
    • ConcurrentSkipListMap: 使用跳表实现,key 是有序的

    其同步的实现原理在 JDK1.8 前后不同

    • 在 JDK1.8 以前,其实现同步使用的是分段锁,将整个容器分为 16 段(Segment),每次操作只锁住操作的那一段,是一种细粒度更高的锁.
    • 在 JDK1.8 及以后,其实现同步用的是 Node+CAS.关于 CAS 的实现,可以看这篇文章 CAS 乐观锁
public class ConcurrentMap { public static void main(String[] args) { //Map<String, String> map = new HashMap<>(); //Map<String, String> map = new Hashtable<>(); // 423 每次加锁,都锁一个对象 //Map<String, String> map = new ConcurrentHashMap<>(); // 309,加的是分段所,将容器分为16段,每段都有一个锁 segment; 1.8以后 使用 Node + synchronized+CAS Map<String, String> map = new ConcurrentSkipListMap<>(); // 317 并发且排序,插入效率较低,但是读取很快 Random r = new Random(); Thread[] ths = new Thread[100]; CountDownLatch latch = new CountDownLatch(ths.length); // 启动了一个门闩,每有一个线程退出,门闩就减1,直到所有线程结束,门闩打开,主线程结束 long start = System.currentTimeMillis(); // 创建100个线程,每个线程添加10000个元素到map,并启动这些线程 for (int i = 0; i < ths.length; i++) { ths[i] = new Thread(() -> { for (int j = 0; j < 10000; j++) { map.put("a" + r.nextInt(10000), "a" + r.nextInt(100000)); } latch.countDown(); }, "t" + i); } Arrays.asList(ths).forEach(Thread::start); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(end - start); System.out.println(map.size()); } }

队列

2019072108582080.png

低并发队列

  低并发队列有:VectorSynchronizedList,其中 vector 类似 HashTable,是 JDK1.2 就提供的类;SynchronizedList 类似 SynchronizedMap 使用装饰器模式,其构造函数接受一个 List 实现类并返回同步 List,在 java.util.Collections 包下。
  它们实现同步的原理都是将所有方法用同步代码块包裹起来。

public class SynchronizedList { public static void main(String[] args) { List<String> list = new ArrayList<>(); // 返回的实例,每个方法都加了一个互斥锁 List<String> syncList = Collections.synchronizedList(list); } }

写时复制 CopyOnWriteList

  CopyOnWriteArrayList 位于 java.util.concurrent 包下,它实现同步的方式是: 当发生写操作(添加,删除,修改)时,就会复制原有容器然后对新复制出的容器进行写操作,操作完成后将引用指向新的容器.其写效率非常低,读效率非常高

  • 优点: 读写分离,使得读操作不需要加锁,效率极高。
  • 缺点: 写操作效率极低
  • 应用场合: 应用在读少写多的情况,如事件监听器
public class CopyOnWriteList { public static void main(String[] args) { List<String> list = //new ArrayList<>(); //会出现并发问题 //new Vector<>(); new CopyOnWriteArrayList<>(); // 写速极慢,读取快 Random r = new Random(); Thread[] ths = new Thread[100]; for (int i = 0; i < ths.length; i++) { Runnable task = () -> { for (int j = 0; j < 1000; j++) { list.add("a" + r.nextInt(100)); } }; ths[i] = new Thread(task); } runAndComputeTime(ths); System.out.println(list.size()); } static void runAndComputeTime(Thread[] ths) { long start = System.currentTimeMillis(); Arrays.asList(ths).forEach(Thread::start); Arrays.asList(ths).forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.currentTimeMillis(); System.out.println(end - start); } }

高并发队列

| 方法 | 抛出异常 | 返回特殊值 | 一直阻塞(非阻塞队列不可用) | 阻塞一段时间(非阻塞队列不可用) |
| --- | --- | --- |
| 插入元素 |add(element)|offer(element)|put(element)|offer(element,time,unit)|
| 移除首个元素 |remove()|poll()|take()|poll(time,unit)|
| 返回首个元素 |element()|peek()| 不可用 | 不可用 |
对于高并发队列,若使用不同的方法对空队列执行查询和删除,以及对满队列执行插入,会产生不同行为。

  • 抛出异常: 使用 add(),remove(),element()方法,若执行错误操作会直接抛出异常
  • 返回特殊值: 若使用 offer(),poll(),peek()方法执行错误操作会返回 false 或 null,并放弃当前错误操作,不抛出异常.
  • 一直阻塞: 若使用 put(),take()方法执行错误操作,当前线程会一直阻塞直到条件允许才唤醒线程执行操作.
  • 阻塞一段时间: 若使用 offer(),poll()方法并传入时间单位,会将当前方法阻塞一段时间,若阻塞时间结束后仍不满足条件则返回 false 或 null,并放弃当前错误操作,不抛出异常.

非阻塞队列 ConcurrentLinkedQueue

  非阻塞队列使用 CAS 保证操作的原子性,不会因为加锁而阻塞线程.类似于 ConcurrentMap

public class T04_ConcurrentQueue { public static void main(String[] args) { Queue<String> queue = new ConcurrentLinkedQueue<>(); // LinkedQueue,无界队列 for (int i = 0; i < 10; i++) { queue.offer("a" + i); // 有返回值,返回false代表没有加入成功,true 代表成功,并且此方法不会阻塞 } System.out.println(queue); System.out.println(queue.size()); System.out.println(queue.poll()); // 取出队头 System.out.println(queue.size()); System.out.println(queue.peek()); // 取出队头,但是不删除队头 System.out.println(queue.size()); // 双端队列 Deque 发音: dai ke //Deque<String> deque = new ConcurrentLinkedDeque<>(); //deque.addFirst(); //deque.addLast(); //deque.pollFirst(); //deque.pollLast(); //deque.peekFirst(); //deque.peekLast(); } }

阻塞队列 BlockingQueue

  阻塞队列的常用实现类有 LinkedBlockingQueue,ArrayBlockingQueue,DelayedQueue,TransferQueue,SynchronousQueue。分别对应于不同的应用场景。

经典阻塞队列 LinkedBlockingQueueArrayBlockingQueue

  LinkedBlockingQueueArrayBlockingQueue 是阻塞队列的最常用实现类,用来更容易地实现 生产者/消费者模式

public class LinkedBlockingQueue { public static void main(String[] args) { BlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 启动生产者线程生产 new Thread(() -> { for (int j = 0; j < 100; j++) { try { queue.put("aaa" + j); // put 方法,给容器添加元素,如果容器已经满了,则会阻塞等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "p").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 启用消费者线程消费 for (int i = 0; i < 5; i++) { new Thread(() -> { while (true) { try { System.out.println(Thread.currentThread().getName() + ":" + queue.take()); // 从队列中拿数据,如果空了,则会阻塞等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "c" + i).start(); } } }
public class ArrayBlockingQueue { public static void main(String[] args) throws InterruptedException { BlockingQueue queue = new ArrayBlockingQueue<>(10); for (int i = 0; i < 10; i++) { queue.put("a" + i); } //queue.put("a11"); // 会阻塞 //queue.add("a11"); // 会抛出异常 //System.out.println(queue.offer("a11")); // 会返回false System.out.println(queue.offer("a11", 1, TimeUnit.SECONDS)); // 会等待1s,返回false, 如果1s内有空闲,则添加成功后返回true } }

延迟队列 DelayedQueue

  延迟队列 DelayedQueue 中存储的元素必须实现 Delay 接口,其中定义了 getDelay()方法;而 Delay 接口继承自 Comparable 接口,其中定义了 compareTo()方法.各方法作用如下:

  • getDelay(): 规定当前元素的延时,Delay 类型的元素必须要等到其延时过期后才能从容器中取出,提前取会取不到.
  • compareTo(): 规定元素在容器中的排列顺序,按照 compareTo()的结果升序排列。

Delayqueue 可以用来执行定时任务。

public class DelayQueue { public static void main(String[] args) throws InterruptedException { long timestamp = System.currentTimeMillis(); MyTask myTask1 = new MyTask(timestamp + 1000); // 1s后执行 MyTask myTask2 = new MyTask(timestamp + 2000); MyTask myTask3 = new MyTask(timestamp + 1500); MyTask myTask4 = new MyTask(timestamp + 2500); MyTask myTask5 = new MyTask(timestamp + 500); DelayQueue<MyTask> tasks = new DelayQueue<>(); tasks.put(myTask1); tasks.put(myTask2); tasks.put(myTask3); tasks.put(myTask4); tasks.put(myTask5); System.out.println(tasks); // 确实按照我们排的顺序执行的 while (!tasks.isEmpty()){ System.out.println(tasks.take()); } } static class MyTask implements Delayed { private long runningTime; public MyTask(long runTime) { this.runningTime = runTime; } // 这是每个元素的等待时间, 越是后加入的元素,时间等待的越长 @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } // 这是排序规律, 执行等待时间最短的排在上面 @Override public int compareTo(Delayed o) { return (int) (o.getDelay(TimeUnit.MILLISECONDS) - this.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return runningTime + ""; } } }

  程序输出如下,我们发现延迟队列中的元素按照 compareTo() 结果升序排列,且 5 个元素都被阻塞式的取出。

[1574580515467, 1574580514967, 1574580514467, 1574580513967, 1574580513467] 1574580515467 1574580514967 1574580514467 1574580513967 1574580513467

阻塞消费队列 TransferQueue

  TransferQueue 继承自 BlockingQueue,向其中添加元素的方法除了 BlockingQueueadd(),offer(),put() 之外,还有一个 transfer() 方法,该方法会使当前线程阻塞直到消费者将该线程消费为止。

  transfer()put() 的区别: put() 方法会阻塞直到元素成功添加进队列,transfer() 方法会阻塞直到元素成功被消费。

  TransferQueue 特有的方法如下:

  • transfer(E): 阻塞当前线程直到元素 E 成功被消费者消费。

  • tryTransfer(E): 尝试将当前元素送给消费者线程消费,若没有消费者接受则返回 false 且放弃元素 E,不将其放入容器中。

  • tryTransfer(E,long,TimeUnit): 阻塞一段时间等待消费者线程消费,超时则返回 false 且放弃元素 E,不将其放入容器中。

  • hasWaitingConsumer(): 指示是否有阻塞在当前容器上的消费者线程。

  • getWaitingConsumerCount(): 返回阻塞在当前容器上的消费者线程的个数。

public class TransferQueue { public static void main(String[] args) { TransferQueue mq = new LinkedTransferQueue(); // 启动消费者线程,睡五秒后再来消费 new Thread(() -> { try { TimeUnit.SECONDS.sleep(5); System.out.println(mq.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 再让生产者线程生产 try { mq.transfer("aaa"); // put add 都不会阻塞,会添加到容器中,只有transfer才有此种功能(等待消费者直接获取),所以transfer是有容量的 } catch (InterruptedException e) { e.printStackTrace(); } } }

  运行程序,我们发现生产者线程会一直阻塞直到五秒后 "product2" 被消费者线程消费。

零容量的阻塞消费队列 SynchronousQueue

  SynchronousQueue 是一种特殊的 TransferQueue,特殊之处在于其容量为 0.。因此对其调用 add(),offer() 方法都会使程序发生错误(抛出异常或阻塞线程).只能对其调用 put() 方法,其内部调用 transfer() 方法,将元素直接交给消费者而不存储在容器中。

public class T09_SynchronousQueue { public static void main(String[] args) throws InterruptedException { BlockingQueue synchronousQueue = new SynchronousQueue(); // 启动消费者线程,睡五秒后再来消费 new Thread(() -> { try { TimeUnit.SECONDS.sleep(5); System.out.println(synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); System.out.println(synchronousQueue.size()); // 输出0 // 启动生产者线程,使用put()方法添加元素,其内部调用transfer()方法,会阻塞等待元素被消费 new Thread(() -> { try { // synchronousQueue.add("product"); // SynchronousQueue容量为0,调用add()方法会报错 synchronousQueue.put("product"); // put()方法内部调用transfer()方法会阻塞等待元素成功被消费 } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }

  该程序的输出行为与上一程序类似,生产者线程调用 put() 方法后阻塞五秒直到消费者线程消费该元素。

  SynchronousQueue 应用场景: 网游的玩家匹配: 若一个用户登录,相当于给服务器的消息队列发送一个 take() 请求;若一个用户准备成功,相当于给服务器的消息队列发送一个 put() 请求。因此若玩家登陆但未准备好 或 只有一个玩家准备好 时游戏线程都会阻塞,直到两个人都准备好了,游戏线程才会被唤醒,游戏继续。

  • 并发
    75 引用 • 73 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...