JUC 多线程进阶
1.什么是 JUC
源码 + 官方文档
JUC 是 java util concurrent
业务:普通的线程代码 Thread
Runnable:** 没有返回值、效率相比于Callable 相对较低!**
2.线程和进程
线程与进程的关系
进程是一个程序的执行;一个进程可以包含多个线程,线程是程序执行的最小单位(资源调度的最小单位)
java 默认有几个线程?
2 个,main与GC
线程:开了一个进程 Typore,写字,自动保存(线程负责的,输入)
对于 Java 而言:Thread、Runnable、Callable
Java 可以开启线程吗?
答:不能,调用 Thread.start()方法实质上是调用的本地方法(native 修饰),底层是 C++ 来实现的,因为 Java 无法操作硬件。
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//本地方法,底层C++,Java无法直接操作硬件
private native void start0();
并发、并行
并发(多线程操作同一个资源)
- CPU 一核,模拟出来多条线程。快速交替
并行(多个人一起干事)
- CPU 多核,多个线程可以同时执行。
3.Lock 锁
在真正的多线程开发中(公司),线程就是一个单独的资源类,没有任何附属的操作。
这里以卖票的 demo 来进行讲解
传统 synchronized 解决
本质:排队
tryLock()
),尝试获取可被中断的锁( lockInterruptibly()
) ,以及尝试获取可以超时( tryLock(long, TimeUnit)
)。
LOCK 中声明的所有方法:
Modifier and Type
Method
Description
void
lock()
获得锁。
void
lockInterruptibly()
获取锁定,除非当前线程是 interrupted 。
Condition
newCondition()
返回一个新 Condition 绑定到该实例 Lock 实例。
boolean
tryLock()
只有在调用时才可以获得锁。
boolean
tryLock(long time, TimeUnit unit)
如果在给定的等待时间内是空闲的,并且当前的线程尚未得到 interrupted,则获取该锁。
void
unlock()
释放锁。
实现类**:**
所有已知实现类:
- ReentrantLock** (可重入锁---常用)**
- ReentrantReadWriteLock.ReadLock** (读锁)**
- ReentrantReadWriteLock.WriteLock(写锁)
ReentrantLock 源码:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
NonfairSync:非公平锁,可以插队
FairSync:公平锁,先来后到
4.生产者和消费者问题
线程之间的通信问题:生产者和消费者问题。 等待换新,通知唤醒
线程交替执行 A B 操作统一变量 num = 0
A num+1
B num-1
面试的:
单例模式、排序算法、生产者和消费者问题、死锁
JUC 版本的生产者和消费者问题
传统的:Synchronized wait notifyAll/notify
新版:Lock await signal
官方文档:
public interface Condition
Condition
因素出 Object
监视器方法( wait
, notify
和notifyAll
)成不同的对象,以得到具有多个等待集的每个对象,通过将它们与使用任意的组合的效果Lock
个**实现。 **Lock
替换 synchronized
**方法和语句的使用, **Condition
取代了对象监视器方法的使用。
条件(也称为条件队列或条件变量** )为一个线程暂停执行(“等待”)提供了一种方法,直到另一个线程通知某些状态现在可能为真。 因为访问此共享状态信息发生在不同的线程中,所以它必须被保护,因此某种形式的锁与该条件相关联。 等待条件的关键属性是它原子地释放相关的锁并挂起当前线程,就像**Object.wait
** 。**
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock(); try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally { lock.unlock(); }
}
public Object take() throws InterruptedException {
lock.lock(); try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally { lock.unlock(); }
}
}
代码实现:
5.8 锁现象
8 锁,就是关于锁的八个问题
顺序:发信息在前,打电话在后
- 标准情况下,两个线程先打印 发信息还是打电话?
- send()方法延迟 4s,两个线程先打印发信息 还是打电话?
- 在加入普通方法 hello 后,两个线程先打印发信息 还是 hello?
- 两个对象,2 个同步方法, 发信息还是 打电话?
- 增加两个静态的同步方法,只有一个对象,先打印 发短信还是打电话?
- 两个对象,增加两个静态的同步方法,只有一个对象,先打印 发短信还是打电话?
- static 方法的 send,同步方法 call,只有一个对象,先打印 发短信还是打电话?
- static 方法的 send,同步方法 call,只有两个对象,先打印 发短信还是打电话?
回复:
问题1,2: 按照顺序执行,原因是因为这两个方法都是加了锁的,而synchronized是修饰在方法上面的,所以锁的是这个方法的调用对象,在这里两个方法都是同一个调用对象,所以谁先拿到锁谁先执行。
问题3: 不是同步的方法,不考虑锁的因素。这里其实还是应该先执行发信息,只不过因为sleep了,而hello不需要去等待这个锁,所以先执行的hello。
问题4:
问题5: 先打印发短信,再打印打电话。因为在加入static修饰后,变成了静态方法,在类初始化的时候就会执行,相当于类一加载就有了!锁的是Class对象(模板)
问题6: 先打印发短信,再打印打电话。因为锁的是Class对象,类模板是同一个,所以谁先拿到锁谁先执行,这里就是顺序执行。
问题7: 先打印打电话,再打印发信息。因为锁的对象不一样,发信息方法锁的对象是Class对象,而打电话方法锁的是类调用对象。锁不一样,不需要去等待锁的释放。
问题8: 同上。
小结:
- new this 具体的一个类对象
- static Class 唯一的一个模板
- 在主线程与子线程中的代码执行顺序问题
- 当主线程代码段在(一个或多个)子线程中间时,首先执行的还是主线程代码段。因为刚开始时,只有主线程在使用 CPU 的执行权,因为其他两个线程还没有被创建,这时主线程的代码就自上而下的去执行。在线程都创建完成后,此时就存在多个线程了,而线程的执行需要抢到 CPU 资源去执行,所以在后续就是谁先抢到 CPU 资源谁先执行了。
6.集合类不安全
List 不安全
hashSet 底层是什么?
public HashSet() {
map = new HashMap<>();
}
// add set 本质就是 map key 是无法重复的!
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
private static final Object PRESENT = new Object(); //不变的值
Map 不安全
/**
* The default initial capacity - MUST be a power of two.
*/
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 默认容量
/**
* The maximum capacity, used if a higher value is implicitly specified
* by either of the constructors with arguments.
* MUST be a power of two <= 1<<30.
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* The load factor used when none specified in constructor.
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f; //加载因子
7.Callable
@FunctionalInterface
public interface Callable<V>
返回结果并可能引发异常的任务。 实现者定义一个没有参数的单一方法,称为 call
** 。**
Callable
接口类似于Runnable
** ,因为它们都是为其实例可能由另一个线程执行的类设计的。 然而,A **Runnable
不返回结果,也不能抛出被检查的异常。
该Executors
类包含的实用方法,从其他普通形式转换为 Callable
类。
- 有返回值
- 可以跑出异常
- 方法不同,run()/call()
- 通过 FutureTask 来实现
- 一个
FutureTask
可以用来包装Callable
或Runnable
对象。因为 FutureTask
实现 Runnable
,一 FutureTask
可以提交执行Executor
**。**** **
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
代码实现
8.常用辅助类
-
CountDownLatch(减法计数器)
****官方解释:****CountDownLatch 是一个同步的辅助类,允许一个或多个线程,等待其他一组线程完成操作,再继续执行。
可以理解为倒计时锁**。就比如玩 LOL 匹配/排位。一场对局的开始必须等待所有人加载 100% 才能开始,所以就算你是外星人,**是小霸王,你也得等着。
demo 01
原理:
=countDown();=** **** **** 数量减 1**
=await()=** ****等待计数器归零,然后再向下执行**
当计数器归零时,=await()=**方法就会被唤醒,继续执行。**
-
CyclicBarrier(加法计数器)
****官方解释:****CyclicBarrier 是一个同步的辅助类,允许一组线程相互之间等待,达到一个共同点,再继续执行。
可以把 CyclicBarrier 看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。常用于多线程分组计算。比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务;通俗点可以理解为一个公会副本奖励,需要公会所有人成员都完成至少在线一小时才能发布奖励。
// 计数,并执行一个线程
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
Semaphore
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
计数信号量**。从概念上讲,一个信号量维护一组允许。每个 **acquire()
**块如果必要的许可证前,是可用的,然后把它。每个 **release()
**添加许可,潜在收购方释放阻塞。然而,不使用实际允许的对象; **Semaphore
**只是计数的数量和相应的行为。 **
信号量通常是用来限制线程的数量比可以访问一些(物理或逻辑)资源。例如,这里是一个类使用一个信号量来控制访问的项目池:
9.读写锁
ReadWriteLock
**所有已知实现类: **
ReentrantReadWriteLock
public interface ReadWriteLock
**一个 **ReadWriteLock
**保持一对一联系 locks
,只读操作和书写。的 **read lock
**可能被多个线程同时举行的读者,只要没有作家。 **write lock
**是独家的。 **
读的时候可以被多线程同时读,写的时候只能由一个线程去写。
- 读写锁允许访问共享数据比用互斥锁允许更多的并发级别。它利用的事实,而只有一个单一的线程在一个时间(一个作家线程)可以修改共享数据,在许多情况下,任何数量的线程可以同时读取数据(因此读者线程)。在理论上,并发的读写锁的使用允许的增加将导致在一个互斥锁的使用性能的改进。在实践中,这种增加的并发性只会完全实现一个多处理器,然后,只有当共享数据的访问模式是合适的。
10.阻塞队列
-
阻塞队列(FIFO)
- 写入:如果队列满了,就必须阻塞等待(取出)
- 取:如果队列是空的,必须阻塞等待生产(写入)
InInterface BlockingQueue****
-
- 参数类型 **
E
-元素举行此集合中的类型 **
- All Superinterfaces: **
Collection , Iterable , Queue **
- All Known Subinterfaces: **
BlockingDeque , TransferQueue **
- 所有已知实现类: **
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue **
什么情况下会使用阻塞队列:
- 多线程并发处理
- 线程池
四组 API
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
方式
抛出异常
有返回值
阻塞等待
超时等待
添加
add()
offer()
put()
offer(,,)
移除
remove()
poll()
take()
poll(,)
检测队首元素
element()
peek()
-
-
SynchronousQueue 同步队列
没有容量
进去一个元素,必须等待元素取出来过后,才能 put 进去
11.线程池
线程池必会:三大方法、七大参数、四种拒绝策略
池化技术
程序的运行,本质:占用系统资源。我们需要做的是优化资源的使用!
常见:线程池、连接池、对象池
池化技术**:事先准备好一些资源,有人要用就直接来这里拿,用完过后回收。**
线程池的好处:
- 降低资源的消耗
- 提高相应的速度
- 方便管理
=线程复用、可以控制最大并发数、管理线程=
三大方法
- Executors.newSingleThreadExecutor();
- Executors.newFixedThreadPool(线程数量);
- Executors.newCachedThreadPool();
四种拒绝策略
-
new ThreadPoolExecutor.AbortPolicy()
如果超出了最大处理量(maxImumPoolSize + capacity),则不进行处理,抛出异常。
-
new ThreadPoolExecutor.CallerRunsPolicy()
哪来的去哪里。
-
new ThreadPoolExecutor.DiscardOldestPolicy(由调用线程处理该任务)
(放弃最旧的策略)队列满了,尝试去和最早的竞争。也不会抛出异常
-
new ThreadPoolExecutor.DiscardPolicy()
队列满了,直接丢掉任务,不会抛出异常。(解决不了问题,就解决提出问题的人)
小结
最大线程倒地如何定义?(调优)
- CPU 密集型
- 看电脑配置,几核就是定义几,可以保持 CPU 型效率最高
- IO 密集
- 判断你的程序中,十分耗 IO 的线程,然后设置大于这个数的线程
12.四大函数式接口
作用:简化底层编程模型
- lambda 表达式
- 链式编程
- 函数式接口
- Stream 流式计算
函数式接口
只有只一个方法的接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
//简化编程模型,在新版本的框架底层中大量应用
//foreach(消费者类型的函数式接口)
四大函数式接口
- Consumer
- Function
- Predicate
- Supplier
代码测试
1.Function
@FunctionalInterface
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
}
3.Consumer
@FunctionalInterface
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
13.Stream 流式计算
什么是 Stream 流式计算
大数据:存储 + 计算
集合、MySQL 本质就是存储东西的;计算都是交给流来操作的!
Interface Stream****
- 参数类型 **
T
-流中的元素的类型 **
All Superinterfaces: **
AutoCloseable, BaseStream<t, Stream < T > > **
14.ForkJoin
什么是 ForkJoin
ForkJoin 在 JDK1.7,并行执行任务!提高效率,大数据量
大数据:Map Reduce(把大任务拆分成小任务)
FockJoin 特点:工作窃取
这个里面维护的都是双端队列
假设 2 个线程 A 和 B。
** **A 线程有 4 个任务,当前执行到第 2 个任务,B 线程也有 4 个任务,当前已执行完。这时候 B 线程不会一直等待 A 线程执行完,而是会转过去执行 A 线程未执行的任务,这就是工作窃取。
FockJoin 测试
执行
execute(ForkJoinTask<?> task)
Class ForkJoinTask****
- java.lang.Object** **
- java.util.concurrent.ForkJoinTask****** **
- All Implemented Interfaces: **
Serializable, Future **
已知直接子类: **
CountedCompleter, RecursiveAction, RecursiveTask **
RecursiveAction:递归事件(没有返回值)
RecursiveTask:递归任务(有返回值)
ForkJoinDemo
运行结果
普通累加求和结果:500000000500000000
求和时间:6083
----------------------------------------------------
ForkJoin求和结果:500000000500000000
求和时间:3876
----------------------------------------------------
Stream 并行流求和结果:500000000500000000
求和时间:243
Process finished with exit code 0
15.异步回调
Future 设计初衷
对将来的某个时间的结果进行建模
这一节不是很懂,等我理解了再来写吧。
16.JMM
谈谈你对 Volatile 的理解
Volatile 是 Java 虚拟机提供的轻量级的同步机制(和 Synchronized 差不多,但是没有他那么强)
- 保证可见性
- 不保证原子性
- 禁止指令重排
什么是 JMM
JMM: Java 内存模型,不存在的东西。是一个概念、约定
关于 JMM 的一些同步的约定:
- 线程解锁前,必须把共享变量=立刻=**刷回主存。**
- 线程加锁前,必须读取主存中的最新值到自己的工作内存中。
- 加锁和解锁是同一把锁。
线程工作内存、主内存
内存交互操作
** 内存交互操作有**=8 种=**,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于 double 和 long 类型的变量来说,load、store、read 和 write 操作在某些平台上允许例外)**
-
- lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的 load 动作使用
- load (载入):作用于工作内存的变量,它把 read 操作从主存中变量放入工作内存中
- use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
- assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
- store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的 write 使用
- write (写入):作用于主内存中的变量,它把 store 操作从工作内存中得到的变量的值放入主内存的变量中
** JMM 对这**=八种指令=**的使用,制定了如下**=**规则**=**:**
-
- 不允许 read 和 load、store 和 write 操作之一单独出现。即使用了 read 必须 load,使用了 store 必须 write
- 不允许线程丢弃他最近的 assign 操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有 assign 的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施 use、store 操作之前,必须经过 assign 和 load 操作
- 一个变量同一时间只有一个线程能对其进行 lock。多次 lock 后,必须执行相同次数的 unlock 才能解锁
- 如果对一个变量进行 lock 操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新 load 或 assign 操作初始化变量的值
- 如果一个变量没有被 lock,就不能对其进行 unlock 操作。也不能 unlock 一个被其他线程锁住的变量
- 对一个变量进行 unlock 操作之前,必须把此变量同步回主内存
** JMM 对这八种操作规则和对**volatile 的一些特殊规则就能确定哪里操作是线程安全,哪些操作是线程不安全的了。但是这些规则实在复杂,很难在实践中直接分析。所以一般我们也不会通过上述规则进行分析。更多的时候,使用 java 的 happen-before 规则来进行分析。
问题:程序不知道主内存的值已经被修改过了
17.Volatile
保证可见性
private static volatile AtomicInteger number = new AtomicInteger();
public static void add (){
// number++;
//AtomicInteger +1操作
number.getAndIncrement();
}
问题:不使用 synchronized、Lock 锁,如何解决原子性操作?
答:使用原子类操作
原子类为何这么牛逼
** 这些类的底层都是直接和操作系统内存挂钩****。在内存中修改值!Unsafe 类是一个很特殊的存在。**
指令重排
**概念:**你写的程序,计算机并不是按照你写的那样执行的。
源代码 → 编译器优化的重拍 → 指令并行也可能重排 → 内存系统也会重排 → 执行
=系统处理器在指令重排的时候,会考虑数据之间的依赖性=
volatile 可以避免指令重排
在加入 volatile 后,系统会自动生成一个=内存屏障=**。CPU 指令。作用:**
- 保证特定的操作的执行顺序。
- 可以保证某些变量的内存可见性(利用这些特性,volatile 实现了可见性)。
18.彻底玩转单例模式
最重要的一点:构造器私有化
饿汉模式
这种方法可能会浪费大量空间
静态内部类
反射尝试暴力破解
Exception in thread "main" java.lang.IllegalArgumentException: Cannot reflectively create enum objects
at java.lang.reflect.Constructor.newInstance(Constructor.java:417)
at cn.fyyice.juc.single.EnumSingle.main(EnumSingle.java:22)
Process finished with exit code 1
枚举类不能被破解,实属牛批
19.深入理解 CAS
什么是 CAS
//compareAndSet 比较并交换
//如果我期望的值达到了,那么就更新,否则不更新
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
20.原子引用
带版本号的操作(类似乐观锁操作)
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
** **Integer 使用了对象缓存机制,默认范围-128~127,推荐侍弄静态工厂方法 valueOf 获取对象实例,而不是 new,因为 valueOf 使用缓存,而 new 一定会创建新的对象分配新的内存空间。
21.各种锁的理解
公平锁、非公平锁
公平锁:非常公平,线程顺序执行,先来后到,不能插队
非公平锁:非常不公平,可以插队(默认都是非公平锁)
通过构造器来创建
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可重入锁
又称为递归锁。
拿到了外面的锁之后,就可以拿到里面的锁(自动获得的)
Synnchronized 锁
自旋锁
SpinLock(核心 CAS)
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
自己写一个自旋锁
结果
A--->myLock
B--->myLock
A--->myUnLock
B--->myUnLock
Process finished with exit code 0
死锁
多个线程相互争抢资源。
总结
** **视频学习大概花了 5 天时间,有些地方仍然有些问题,还需要多多理解呀!
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于