上代码:
import java.util.LinkedList;
import java.util.Queue;
class BlockingQ {
private Object notEmpty = new Object();
private Object notFull = new Object();
private Queue linkedList = new LinkedList();
private int maxLength = 10;
public Object take() throws InterruptedException {
synchronized (notEmpty) {
if (linkedList.size() == 0) {
notEmpty.wait();
}
synchronized (notFull) {
if (linkedList.size() == maxLength) {
notFull.notifyAll();
}
return linkedList.poll();
}
}
}
public void offer(Object object) throws InterruptedException {
synchronized (notEmpty) {
if (linkedList.size() == 0) {
notEmpty.notifyAll();
}
synchronized (notFull) {
if (linkedList.size() == maxLength) {
notFull.wait();
}
linkedList.add(object);
}
}
}
}
有点不懂, notFull.wait(); 会释放 notEmpty 吗? 谁能简单的讲讲。
更新:做了个测试,发现确实是会阻塞的,上面的阻塞队列实现有问题:
public class BlockingQTest {
public static void main(String [] as) throws InterruptedException {
BlockingQ blockingQ = new BlockingQ();
Thread offerThread = new Thread(new OfferTask(blockingQ),"offerThread");
Thread takeThread = new Thread(new TakeTask(blockingQ),"takeThread");
offerThread.start();
takeThread.start();
}
}
class TakeTask implements Runnable{
private BlockingQ blockingQ;
public TakeTask(BlockingQ blockingQ){
this.blockingQ = blockingQ;
}
@Override
public void run() {
try {
while(true) {
Object o = blockingQ.take();
System.out.println("take:" + o);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class OfferTask implements Runnable{
private BlockingQ blockingQ;
public OfferTask(BlockingQ blockingQ){
this.blockingQ = blockingQ;
}
@Override
public void run() {
try {
for (int i=0; i < 20; i++){
blockingQ.offer(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
下面这个实现是没问题的:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BlockingQ {
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
private Queue linkedList = new LinkedList();
private int maxLength = 10;
public Object take() throws InterruptedException {
lock.lock();
try {
if (linkedList.size() == 0) {
notEmpty.await();
}
if (linkedList.size() == maxLength) {
notFull.signalAll();
}
return linkedList.poll();
} finally {
lock.unlock();
}
}
public void offer(Object object) throws InterruptedException {
lock.lock();
try {
if (linkedList.size() == 0) {
notEmpty.signalAll();
}
if (linkedList.size() == maxLength) {
notFull.await();
}
linkedList.add(object);
System.out.println("offer:" + object);
} finally {
lock.unlock();
}
}
}
附上代码来源的截图:
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于