并发容器的引出: 售票问题
有 N 张火车票,每张票都有一个编号同时有 10 个窗口对外售票,请写一个模拟程序。
实现 1:使用 List-非原子性操作
public class TicketSeller1 {
static List<String> tickets = new ArrayList<>();
static {
for (int i = 0; i < 1000; i++) {
tickets.add("票-" + i);
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (tickets.size() > 0) {
// List的remove操作不是原子性的
System.out.println("销售了:" + tickets.remove(0));
}
}).start();
}
}
}
输出如下,我们发现发生了重售:
...
销售了:票-998
销售了:票-999
销售了:票-999
实现 2:使用 Vector-判断与操作分离,复合操作不保证原子性
Vector 的所有操作均为原子性的,但仍会出现问题,因为判断与操作是分离的,形成的复合操作不能保证原子性。
public class TicketSeller2 {
static Vector<String> tickets = new Vector<>();
static {
for (int i = 0; i < 1000; i++) {
tickets.add("票-" + i);
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (tickets.size() > 0) {
// 将问题方法,睡1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("销售了:" + tickets.remove(0));
}
}).start();
}
}
}
实现 3: 使用同步代码块锁住复合操作-保证正确性但效率低
我们使用 synchronized 代码块将判断和取票操作锁在一起执行,保证其原子性。
这样可以保证售票过程的正确性,但每次取票都要锁定整个队列,效率低。
public class TicketSeller3 {
static List<String> tickets = new ArrayList<>();
static {
for (int i = 0; i < 1000; i++) {
tickets.add("票-" + i);
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (tickets.size() > 0) {
// sychronized 保证了原子性
synchronized (tickets) {
System.out.println("销售了:" + tickets.remove(0));
}
}
}).start();
}
}
}
实现 4: 使用并发队列,先取票再判断
使用 JDK1.5 之后提供的并发队列 ConcurrentLinkedQueue 存储元素,其底层使用 CAS 实现而非加锁实现的,其效率较高。
并发队列 ConcurrentLinkedQueue 的 poll()方法会尝试从队列头中取出一个元素,若获取不到,则返回 null,对其返回值做判断可以实现先取票后判断,可以避免加锁。
public class TicketSeller4 {
static ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
static {
for (int i = 0; i < 1000; i++) {
queue.add("票-" + i);
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
String t = queue.poll(); // 取出头,拿不到就是空值
if (t == null) {
break;
}
System.out.println("销售了:" + t);
}
}).start();
}
}
}
并发容器
Map/Set
Map
和 Set
容器类型是类似的,Set
无非就是屏蔽了 Map
的 value
项,只保留 key
项.
非并发容器
主要的非并发容器有 HashMap
,TreeMap
,LinkedHashMap
并发容器
主要的并发容器有 HashTable
,SynchronizedMap
,ConcurrentMap
。
-
HashTable
和SynchronizedMap
的效率较低,其同步的实现原理类似,都是给容器的所有方法都加锁.其中
SynchronizedMap
使用装饰器模式,调用其构造方法并传入一个Map
实现类,返回一个同步的Map
容器. -
ConcurrentMap 的效率较高,有两个实现类:
- ConcurrentHashMap: 使用哈希表实现,key 是无序的
- ConcurrentSkipListMap: 使用跳表实现,key 是有序的
其同步的实现原理在 JDK1.8 前后不同
- 在 JDK1.8 以前,其实现同步使用的是分段锁,将整个容器分为 16 段(Segment),每次操作只锁住操作的那一段,是一种细粒度更高的锁.
- 在 JDK1.8 及以后,其实现同步用的是 Node+CAS.关于 CAS 的实现,可以看这篇文章 CAS 乐观锁
public class ConcurrentMap {
public static void main(String[] args) {
//Map<String, String> map = new HashMap<>();
//Map<String, String> map = new Hashtable<>(); // 423 每次加锁,都锁一个对象
//Map<String, String> map = new ConcurrentHashMap<>(); // 309,加的是分段所,将容器分为16段,每段都有一个锁 segment; 1.8以后 使用 Node + synchronized+CAS
Map<String, String> map = new ConcurrentSkipListMap<>(); // 317 并发且排序,插入效率较低,但是读取很快
Random r = new Random();
Thread[] ths = new Thread[100];
CountDownLatch latch = new CountDownLatch(ths.length); // 启动了一个门闩,每有一个线程退出,门闩就减1,直到所有线程结束,门闩打开,主线程结束
long start = System.currentTimeMillis();
// 创建100个线程,每个线程添加10000个元素到map,并启动这些线程
for (int i = 0; i < ths.length; i++) {
ths[i] = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
map.put("a" + r.nextInt(10000), "a" + r.nextInt(100000));
}
latch.countDown();
}, "t" + i);
}
Arrays.asList(ths).forEach(Thread::start);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(map.size());
}
}
队列
低并发队列
低并发队列有:Vector
和 SynchronizedList
,其中 vector
类似 HashTable
,是 JDK1.2 就提供的类;SynchronizedList
类似 SynchronizedMap
使用装饰器模式,其构造函数接受一个 List
实现类并返回同步 List
,在 java.util.Collections
包下。
它们实现同步的原理都是将所有方法用同步代码块包裹起来。
public class SynchronizedList {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
// 返回的实例,每个方法都加了一个互斥锁
List<String> syncList = Collections.synchronizedList(list);
}
}
写时复制 CopyOnWriteList
CopyOnWriteArrayList
位于 java.util.concurrent
包下,它实现同步的方式是: 当发生写操作(添加,删除,修改)时,就会复制原有容器然后对新复制出的容器进行写操作,操作完成后将引用指向新的容器.其写效率非常低,读效率非常高
- 优点: 读写分离,使得读操作不需要加锁,效率极高。
- 缺点: 写操作效率极低
- 应用场合: 应用在读少写多的情况,如事件监听器
public class CopyOnWriteList {
public static void main(String[] args) {
List<String> list =
//new ArrayList<>(); //会出现并发问题
//new Vector<>();
new CopyOnWriteArrayList<>(); // 写速极慢,读取快
Random r = new Random();
Thread[] ths = new Thread[100];
for (int i = 0; i < ths.length; i++) {
Runnable task = () -> {
for (int j = 0; j < 1000; j++) {
list.add("a" + r.nextInt(100));
}
};
ths[i] = new Thread(task);
}
runAndComputeTime(ths);
System.out.println(list.size());
}
static void runAndComputeTime(Thread[] ths) {
long start = System.currentTimeMillis();
Arrays.asList(ths).forEach(Thread::start);
Arrays.asList(ths).forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end - start);
}
}
高并发队列
| 方法 | 抛出异常 | 返回特殊值 | 一直阻塞(非阻塞队列不可用) | 阻塞一段时间(非阻塞队列不可用) |
| --- | --- | --- |
| 插入元素 |add(element)|offer(element)|put(element)|offer(element,time,unit)|
| 移除首个元素 |remove()|poll()|take()|poll(time,unit)|
| 返回首个元素 |element()|peek()| 不可用 | 不可用 |
对于高并发队列,若使用不同的方法对空队列执行查询和删除,以及对满队列执行插入,会产生不同行为。
- 抛出异常: 使用 add(),remove(),element()方法,若执行错误操作会直接抛出异常
- 返回特殊值: 若使用 offer(),poll(),peek()方法执行错误操作会返回 false 或 null,并放弃当前错误操作,不抛出异常.
- 一直阻塞: 若使用 put(),take()方法执行错误操作,当前线程会一直阻塞直到条件允许才唤醒线程执行操作.
- 阻塞一段时间: 若使用 offer(),poll()方法并传入时间单位,会将当前方法阻塞一段时间,若阻塞时间结束后仍不满足条件则返回 false 或 null,并放弃当前错误操作,不抛出异常.
非阻塞队列 ConcurrentLinkedQueue
非阻塞队列使用 CAS 保证操作的原子性,不会因为加锁而阻塞线程.类似于 ConcurrentMap
public class T04_ConcurrentQueue {
public static void main(String[] args) {
Queue<String> queue = new ConcurrentLinkedQueue<>(); // LinkedQueue,无界队列
for (int i = 0; i < 10; i++) {
queue.offer("a" + i); // 有返回值,返回false代表没有加入成功,true 代表成功,并且此方法不会阻塞
}
System.out.println(queue);
System.out.println(queue.size());
System.out.println(queue.poll()); // 取出队头
System.out.println(queue.size());
System.out.println(queue.peek()); // 取出队头,但是不删除队头
System.out.println(queue.size());
// 双端队列 Deque 发音: dai ke
//Deque<String> deque = new ConcurrentLinkedDeque<>();
//deque.addFirst();
//deque.addLast();
//deque.pollFirst();
//deque.pollLast();
//deque.peekFirst();
//deque.peekLast();
}
}
阻塞队列 BlockingQueue
阻塞队列的常用实现类有 LinkedBlockingQueue
,ArrayBlockingQueue
,DelayedQueue
,TransferQueue
,SynchronousQueue
。分别对应于不同的应用场景。
经典阻塞队列 LinkedBlockingQueue
和 ArrayBlockingQueue
LinkedBlockingQueue
和 ArrayBlockingQueue
是阻塞队列的最常用实现类,用来更容易地实现 生产者/消费者模式
。
public class LinkedBlockingQueue {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 启动生产者线程生产
new Thread(() -> {
for (int j = 0; j < 100; j++) {
try {
queue.put("aaa" + j); // put 方法,给容器添加元素,如果容器已经满了,则会阻塞等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "p").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 启用消费者线程消费
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (true) {
try {
System.out.println(Thread.currentThread().getName() + ":" + queue.take()); // 从队列中拿数据,如果空了,则会阻塞等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "c" + i).start();
}
}
}
public class ArrayBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue<>(10);
for (int i = 0; i < 10; i++) {
queue.put("a" + i);
}
//queue.put("a11"); // 会阻塞
//queue.add("a11"); // 会抛出异常
//System.out.println(queue.offer("a11")); // 会返回false
System.out.println(queue.offer("a11", 1, TimeUnit.SECONDS)); // 会等待1s,返回false, 如果1s内有空闲,则添加成功后返回true
}
}
延迟队列 DelayedQueue
延迟队列 DelayedQueue 中存储的元素必须实现 Delay 接口,其中定义了 getDelay()方法;而 Delay 接口继承自 Comparable 接口,其中定义了 compareTo()方法.各方法作用如下:
- getDelay(): 规定当前元素的延时,Delay 类型的元素必须要等到其延时过期后才能从容器中取出,提前取会取不到.
- compareTo(): 规定元素在容器中的排列顺序,按照 compareTo()的结果升序排列。
Delayqueue
可以用来执行定时任务。
public class DelayQueue {
public static void main(String[] args) throws InterruptedException {
long timestamp = System.currentTimeMillis();
MyTask myTask1 = new MyTask(timestamp + 1000); // 1s后执行
MyTask myTask2 = new MyTask(timestamp + 2000);
MyTask myTask3 = new MyTask(timestamp + 1500);
MyTask myTask4 = new MyTask(timestamp + 2500);
MyTask myTask5 = new MyTask(timestamp + 500);
DelayQueue<MyTask> tasks = new DelayQueue<>();
tasks.put(myTask1);
tasks.put(myTask2);
tasks.put(myTask3);
tasks.put(myTask4);
tasks.put(myTask5);
System.out.println(tasks); // 确实按照我们排的顺序执行的
while (!tasks.isEmpty()){
System.out.println(tasks.take());
}
}
static class MyTask implements Delayed {
private long runningTime;
public MyTask(long runTime) {
this.runningTime = runTime;
}
// 这是每个元素的等待时间, 越是后加入的元素,时间等待的越长
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 这是排序规律, 执行等待时间最短的排在上面
@Override
public int compareTo(Delayed o) {
return (int) (o.getDelay(TimeUnit.MILLISECONDS) - this.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return runningTime + "";
}
}
}
程序输出如下,我们发现延迟队列中的元素按照 compareTo()
结果升序排列,且 5 个元素都被阻塞式的取出。
[1574580515467, 1574580514967, 1574580514467, 1574580513967, 1574580513467]
1574580515467
1574580514967
1574580514467
1574580513967
1574580513467
阻塞消费队列 TransferQueue
TransferQueue
继承自 BlockingQueue
,向其中添加元素的方法除了 BlockingQueue
的 add()
,offer()
,put()
之外,还有一个 transfer()
方法,该方法会使当前线程阻塞直到消费者将该线程消费为止。
transfer()
与 put()
的区别: put()
方法会阻塞直到元素成功添加进队列,transfer()
方法会阻塞直到元素成功被消费。
TransferQueue 特有的方法如下:
-
transfer(E): 阻塞当前线程直到元素 E 成功被消费者消费。
-
tryTransfer(E): 尝试将当前元素送给消费者线程消费,若没有消费者接受则返回 false 且放弃元素 E,不将其放入容器中。
-
tryTransfer(E,long,TimeUnit): 阻塞一段时间等待消费者线程消费,超时则返回 false 且放弃元素 E,不将其放入容器中。
-
hasWaitingConsumer(): 指示是否有阻塞在当前容器上的消费者线程。
-
getWaitingConsumerCount(): 返回阻塞在当前容器上的消费者线程的个数。
public class TransferQueue {
public static void main(String[] args) {
TransferQueue mq = new LinkedTransferQueue();
// 启动消费者线程,睡五秒后再来消费
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(mq.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 再让生产者线程生产
try {
mq.transfer("aaa"); // put add 都不会阻塞,会添加到容器中,只有transfer才有此种功能(等待消费者直接获取),所以transfer是有容量的
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行程序,我们发现生产者线程会一直阻塞直到五秒后 "product2"
被消费者线程消费。
零容量的阻塞消费队列 SynchronousQueue
SynchronousQueue
是一种特殊的 TransferQueue
,特殊之处在于其容量为 0.。因此对其调用 add()
,offer()
方法都会使程序发生错误(抛出异常或阻塞线程).只能对其调用 put()
方法,其内部调用 transfer()
方法,将元素直接交给消费者而不存储在容器中。
public class T09_SynchronousQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue synchronousQueue = new SynchronousQueue();
// 启动消费者线程,睡五秒后再来消费
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println(synchronousQueue.size()); // 输出0
// 启动生产者线程,使用put()方法添加元素,其内部调用transfer()方法,会阻塞等待元素被消费
new Thread(() -> {
try {
// synchronousQueue.add("product"); // SynchronousQueue容量为0,调用add()方法会报错
synchronousQueue.put("product"); // put()方法内部调用transfer()方法会阻塞等待元素成功被消费
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
该程序的输出行为与上一程序类似,生产者线程调用 put()
方法后阻塞五秒直到消费者线程消费该元素。
SynchronousQueue
应用场景: 网游的玩家匹配: 若一个用户登录,相当于给服务器的消息队列发送一个 take()
请求;若一个用户准备成功,相当于给服务器的消息队列发送一个 put()
请求。因此若玩家登陆但未准备好 或 只有一个玩家准备好 时游戏线程都会阻塞,直到两个人都准备好了,游戏线程才会被唤醒,游戏继续。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于