读写锁 / 阻塞队列(Juc-05)

本贴最后更新于 1444 天前,其中的信息可能已经时异事殊

读写锁简介

image.png

代码实现

大致意思就是可以被多线程同时读写的时候只能有一个线程去写!

加入读锁是为了允许别人一起读,防止其他线程写

package net.yscxy.rw;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @Author WangFuKun
 * @create 2020/11/21 16:07
 */
/*
 *独占锁(写锁)只能一个线程可以占有
 * 共享锁(读锁)多个线程可以 同时占有
 * 读-读 可以共存
 * 读-写 不能共存
 * 写-写 不能共存
 * */
public class ReentrantLockDemo {

    public static void main(String[] args) {
        MyCacheLock myCache = new MyCacheLock();

        for (int i = 1; i < 5; i++) {
            final int temp = i;
            new Thread(() -> {
                myCache.put(String.valueOf(temp), temp);
            }, String.valueOf(i)).start();
        }
        for (int i = 1; i < 5; i++) {
            final int temp = i;
            new Thread(() -> {
                myCache.get(String.valueOf(temp));
            }, String.valueOf(i)).start();
        }
    }
}

class MyCacheLock {
    private volatile Map<String, Object> map = new HashMap<>();
    //这是一把读写锁
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    //储存,也就是写入,我们写入的时候只希望只有一个线程写
    public void put(String key, Object value) {
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "写入" + key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "写入OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }

    //取,读 ,读的时候我们希望所有人都可以读
    public Object get(String key) {
        readWriteLock.readLock().lock();
        Object o = null;
        try {
            System.out.println(Thread.currentThread().getName() + "读取" + key);
            o = map.get(key);
            System.out.println(Thread.currentThread().getName() + "读取OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
        return o;
    }
}

class MyCache {
    private volatile Map<String, Object> map = new HashMap<>();

    //储存
    public void put(String key, Object value) {
        System.out.println(Thread.currentThread().getName() + "写入" + key);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName() + "写入OK");
    }

    //取,读
    public Object get(String key) {
        System.out.println(Thread.currentThread().getName() + "读取" + key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName() + "读取OK");
        return o;
    }
}

阻塞队列

image.png

image.png

阻塞队列的四组 API

方式 抛出异常 有返回值,不抛出异常 阻塞等待 超时等待
添加 add offer() put offer(,,)
移除 remove pull() take poll(,)
监测队列首 element peek()
  1. 抛出异常
  2. 不会抛出异常
  3. 阻塞,等待
  4. 超时等待

代码实现

package net.yscxy.bq;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @Author WangFuKun
 * @create 2020/11/21 17:37
 */
/*
 *阻塞队列
 * 那么什么时候会用到阻塞队列呢?
 * 多线程A要调用B,但是B还没有执行完成
 * 也就是多线程并发处理,线程池
 * */
public class Test {
    public static void main(String[] args) throws InterruptedException {
        test4();
    }

    /*抛出异常*/
    public static void test1() {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueue.add("a"));
        //查看队首元素是谁
        System.out.println(arrayBlockingQueue.element());

        System.out.println(arrayBlockingQueue.add("b"));
        System.out.println(arrayBlockingQueue.add("c"));
        //ava.lang.IllegalStateException: Queue full
        //System.out.println(arrayBlockingQueue.add("d"));

        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        //java.util.NoSuchElementException
        //System.out.println(arrayBlockingQueue.remove());
    }

    /*
     * 没有异常的方式
     * */
    public static void test2() {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueue.offer("a"));
        //查看队首元素
        System.out.println(arrayBlockingQueue.peek());
        System.out.println(arrayBlockingQueue.offer("a"));
        System.out.println(arrayBlockingQueue.offer("a"));
        //这样就不抛出异常,返回false
        System.out.println(arrayBlockingQueue.offer("a"));
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        //返回null,但是不报错
        System.out.println(arrayBlockingQueue.poll());
    }

    /*
     * 等待,阻塞(一直阻塞)
     * */
    public static void test3() throws InterruptedException {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        arrayBlockingQueue.put("a");
        arrayBlockingQueue.put("a");
        arrayBlockingQueue.put("a");
        // arrayBlockingQueue.put("a");一直等待
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        //System.out.println(arrayBlockingQueue.take());一直阻塞状态
    }

    public static void test4() throws InterruptedException {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        arrayBlockingQueue.offer("a");
        arrayBlockingQueue.offer("b");
        arrayBlockingQueue.offer("c");
        //超时退出
        //arrayBlockingQueue.offer("d", 2, TimeUnit.SECONDS);
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        //超时退出
        System.out.println(arrayBlockingQueue.poll(2, TimeUnit.SECONDS));
    }
}

同步队列

SynchronousQueue

他和其他的队列不一样,它不存储元素,put 了一个元素,必须从里面先 take 出来,否则不能再 put 进去

package net.yscxy.bq;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @Author WangFuKun
 * @create 2020/11/21 20:36
 */
/*
 * 同步队列
 * */
public class SynchronousQueueDemo {
    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "put 1");
                queue.put("1");
                System.out.println(Thread.currentThread().getName() + "put 2");
                queue.put("2");
                System.out.println(Thread.currentThread().getName() + "put 3");
                queue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "->" + queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "->" + queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "->" + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
  • JUC
    17 引用 • 3 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...