Java 并发编程之(二)管程 (转)

本贴最后更新于 2935 天前,其中的信息可能已经时异事殊

定义

维基百科中定义管程为:在并发编程中,管程(monitor)为一个同步结构,具有线程互斥特性,以及能够根据某些条件来阻塞线程。根据定义,管程有三个要素:同步、互斥、条件。恰好在Java的Concurrent包中ReentrantLock具有上述所有特性,可以用来实现管程。管程是一个非常实用且常见的技术,可以用来实现很多常用的并发数据结构,例如阻塞队列。

阻塞队列

 队列常用于生产者消费者模型,生产者发送消息并存储到队列,消费者从队列中取出消息。一般情况下会存在多个生产者和多个消费者,普通的队列不能保证并发的安全,因此需要用到线程安全的技术。线程安全的队列又可以分为两种:

  • 阻塞队列(BlockingQueue)
  • 非阻塞队列(NoBlockingQueue)

阻塞队列的特征为,当队列为空时会阻塞消费者,当队列满时阻塞生产者。这种机制能够平衡生产者和消费者的负载。

分析上述定义,阻塞队列具有线程安全,阻塞,条件的特性;线程安全和阻塞就意味着要实现同步以及互斥,因此,管程能够很好地满足阻塞队列的要求。

Java源码中的管程

 如下代码所示是Concurrent包中ArrayBlockingQueue的实现,队列有两种实现方式,数组和链表。Array是数组的实现,而数组通常是有界的。

ArrayBlockingQueue结构将ReentrantLock(可重入锁)作为全局锁来实现线程安全,所谓全局锁就是整个数据结构中就这么一个锁,当一个线程在访问该对象并获得锁之后,其他线程要访问该对象都得阻塞。在后续的学习中,我们会发现全局锁有一定的弊端,因为他锁住了整个对象,使得整体的并发性不高。源代码中的newCondition()操作会生成一个条件对象,条件对象具有唤醒线程和挂起线程的能力,具体的操作如第二段代码所示。

public ArrayBlockingQueue(int capacity,boolean fair)
{ if(capacity <=0){
throw new IllegalArgumentException();
}
this.items =newObject[capacity];
lock=newReentrantLock(fair); notEmpty =lock.newCondition(); notFull =lock.newCondition();
}

 第二段代码是ArrayBlockingQueue的put操作,put操作的第一步就是获得全局锁,然后判断队列当前元素是否已满,如果队列慢就会使用notFull条件变量将线程挂起,否则就会调用enqueue函数进行入队操作。入队操作同时会使用notEmpty条件变量来唤醒一个被notEmpty阻塞的的线程(take操作会调用notEmpty来阻塞线程)。

public void put(E e)throwsInterruptedException{
        checkNotNull(e);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
while(count == items.length) notFull.await();//挂起线程,当其他线程调用notFull.signal()或者notFull.signlAll()时,该线程唤醒。 enqueue(e);
}finally{
lock.unlock();
}}
private void enqueue(E x){
//assert lock.getHoldCount() == 1;
//assert items[putIndex] == null;
finalObject[] items =this.items; items[putIndex]= x;
if(++putIndex == items.length) putIndex =0; count++; notEmpty.signal();//唤醒一个被notEmpty阻塞的线程,因为此时队列已经非空了
}

实例

 下面我为了熟悉管程的使用,会亲自造轮子体会一下。

package Monitors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue<E>{
private ReentrantLocklock;//全局锁
private Condition notFull;//条件变量,用来监控队列是否已满
private Condition notEmpty;//条件变量,用来监控队列是否为空
private final int capacity;//容量,这是一个定容的队列
private int head;//队列头部哨兵
private int tail;//队列尾部哨兵
private int count;//当然元素个数
private E[] data;//保存元素的数组
/*初始化阻塞队列*/
public BlockingQueue(int c){
this.lock=newReentrantLock();
this.notEmpty=lock.newCondition();
this.notFull=lock.newCondition();
this.capacity=c;
this.count=0;
this.head=0;
this.tail=0; data=(E[]) newObject[c+1];
}
/*向队列中添加一个元素 */
public void put(E e){
lock.lock();
try{
while(this.count==this.capacity)//判断队列是否已满 notFull.await();//挂起线程 ail++; count++;
if(tail==this.capacity+1) tail=0; data[tail]=e;//插入元素 notEmpty.signal();//通知其他线程,队列不为空
}catch(InterruptedException e1){
// TODO Auto-generated catch block e1.printStackTrace();
}finally{
lock.unlock();
}}
/* 获取队列容量 */
public int getCap(){returnthis.capacity;}
/* 判读队列是否为空 */
public boolean isEmpty(){lock.lock();try{return count==0?true:false;}finally{lock.unlock();}}
/*判断队列是否已满*/
public boolean isFull(){lock.lock();try{return count==capacity?true:false;}finally{lock.unlock();}}
/* 从队列中取出一个元素 */
public E take() throws InterruptedException{
lock.lock();
try{while(this.count==0)//判断队列是否为空 notEmpty.await();//阻塞队列 head++; count--;
if(head==this.capacity+1)head=0; E x=data[head];//获得头部元素 notFull.signal();//通知其他线程,队列未满return x;
}finally{
lock.unlock();
}}
}

 在上锁所有操作中,几乎每个操作都在如下的结构体中进行,为了防止代码异常退出而导致锁没有被释放,必须使用finally关键字确保锁的释放。该代码块的作用和synchronized同步块作用类似,在进入同步块之前都要先获得锁,然后进行同步操作;在退出同步块的时候都要释放锁并再次同步。

lock.lock();
try{
return count==capacity?true:false;
}finally{
lock.unlock();
}

 

 
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖
  • 并发
    75 引用 • 73 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...