前言
以前在写爬虫的时候,要么是直接使用 HTTPClient 循环去抓取,这样有个弊端,就是人物是阻塞的,后一个任务必须等待上一个任务结束。后面使用了 Thread 线程去执行任务,有多少个任务就开多少条线程,但是频繁线程的创建与销毁又造成了不必要的资源浪费。于是我们用到了线程池,同时使用到 Guava 的并发类 ListenableFuture。
单线程
直接开一条线程去爬虫
new Thread(() -> {
//爬虫任务
});
多线程
当然想爬多个页面也可以使用这样的方式开多条线程。。。
while (条件) {
new Thread(() -> {
//爬虫任务
});
}
那你就 out 太多了,new Thread 的弊端如下:
a. 每次 new Thread 新建对象性能差。
b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或 oom。
c. 缺乏更多功能,如定时执行、定期执行、线程中断。
相比 new Thread,Java 提供的四种线程池的好处在于:
a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
c. 提供定时执行、定期执行、单线程、并发数控制等功能。
线程池
无执行结果
无返回结果是实现了 Runable 接口,重写的 run 方法返回值为 void,线程执行完毕无返回结果。同时线程池 exeute 方法也是无返回值的。
//创建条三个线程的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
//提交3个任务
for (int i = 0; i < 3; i++) {
fixedThreadPool.execute(new Thread(() -> {
//爬虫任务
})
);
}
有执行结果
线程实现 Calable 接口,重写 cal 方法,并且在类上定义返回值泛型。将结果再 call 方法中返回即可。同时使用线程池有返回值的 Submit 方法。
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建条三个线程的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
//提交3个任务
for (int i = 0; i < 3; i++) {
//无返回值
fixedThreadPool.execute(new Thread(() -> {
//爬虫任务
})
);
//有返回值
Future<String> future = fixedThreadPool.submit(new Crawler());
System.out.println(future.get());
}
}
static class Crawler implements Callable<String> {
@Override
public String call() {
//执行爬虫任务
return "成功";
}
}
输出
上面的线程池创建方式是有很多弊端的,一般使用都会根据自己的场景配置线程池。
这里就不细写了,网上很多文章讲都非常不错。下面贴两篇
Guava 线程池实战
google Guava 包的 ListenableFuture 解析
接口
传统 JDK 中的 Future 通过异步的方式计算返回结果:在多线程运算中可能或者可能在没有结束返回结果,Future 是运行中的多线程的一个引用句柄,确保在服务执行返回一个 Result。
ListenableFuture 可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在 JDK concurrent 中的 Future 是不支持的。
ListenableFuture 中的基础方法是 addListener(Runnable, Executor), 该方法会在多线程运算完的时候,指定的 Runnable 参数传入的对象会被指定的 Executor 执行。
添加回调(Callbacks)
多数用户喜欢使用 Futures.addCallback(ListenableFuture, FutureCallback, Executor)的方式, 或者 另外一个版本 version(译者注:addCallback(ListenableFuture future,FutureCallback callback)),默认是采用 MoreExecutors.sameThreadExecutor()线程池, 为了简化使用,Callback 采用轻量级的设计. FutureCallback 中实现了两个方法:
- onSuccess(V),在 Future 成功的时候执行,根据 Future 结果来判断。
- onFailure(Throwable), 在 Future 失败的时候执行,根据 Future 结果来判断。
爬虫妹子图全站图片
public class meizitu {
//保存路径
static String path = "E:\\\\meizitu\\\\";
//图片地址
static String page = "http://meizitu.com/a/more_%d.html";
//起始页
static int pageCouont = 1;
//当前可用CPU数
static final int PROCESSORS = Runtime.getRuntime().availableProcessors();
//线程名
static ThreadFactory threadName = new ThreadFactoryBuilder().setNameFormat("当前线程-%d").build();
/**
* 合理配置线程池核心数:
* 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
* 如果是IO密集型任务,参考值可以设置为2*NCPU || 3*NCPU
*
* 最大线程池为CPU*5
*
* 最大线程池-核心线程的线程,空闲200毫秒,没有任务则收回。
*
* 工作队列最大1000任务
*
* 超过则采用丢弃策略:任务并抛出RejectedExecutionException异常。
*
* 线程池基本知识:https://www.cnblogs.com/dolphin0520/p/3932921.html
*/ static ThreadPoolExecutor executor = new ThreadPoolExecutor(PROCESSORS * 3, PROCESSORS * 5, 200L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000), threadName, new ThreadPoolExecutor.AbortPolicy());
//guava并发
static ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor);
public static void main(String[] args) throws IOException {
while (true) {
//初始化httpClient
CloseableHttpClient httpClient = HttpClients.createDefault();
//格式化请求的图片页面地址
String url = String.format(page, pageCouont);
//请求地址
CloseableHttpResponse httpResponse = httpClient.execute(new HttpGet(url));
//如果该页不是404则表示页面有数据
if (404 == httpResponse.getStatusLine().getStatusCode()) {
break;
}
String html = EntityUtils.toString(httpResponse.getEntity());
//解析每个图片的URL
Elements item = Jsoup.parse(html).getElementsByClass("wp-item");
for (Element element : item) {
String href = element.getElementsByTag("a").attr("href");
//任务结果回调 把每个图片的URL提交到线程池
Futures.addCallback(guavaExecutor.submit(new Crawler(href, pageCouont)), new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
//任务执行成功
System.out.println(result);
}
@Override
public void onFailure(Throwable t) {
//执行失败
System.out.println(t.getMessage());
}
}, executor);
}
pageCouont++;
}
System.out.println("任务页数:" + pageCouont);
}
static class Crawler implements Callable<String> {
private String url;
private int pageCount;
public Crawler(String url, int pageCount) {
this.url = url;
this.pageCount = pageCount;
}
@Override
public String call() {
FileOutputStream output = null;
ArrayList<String> list = Lists.newArrayList();
try {
/**
* 以下是分析图片地址并下载代码
*/
String body = HttpRequest.get(this.url).body();
Document parse = Jsoup.parse(body);
String title = parse.title();
Element picture = parse.getElementById("picture");
Elements imgs = picture.getElementsByTag("img");
for (Element e : imgs) {
String src = e.attr("src");
String alt = e.attr("alt");
byte[] bytes = HttpRequest.get(src).bytes();
//本来是打算获取到文章标题和图片的介绍保存为图片名字的,获取到的页面是乱码,暂时未解决。
//String imgPath = path.concat("第" + this.pageCount + "页").concat("\\").concat(title).concat("\\").concat(alt);
String fileDirs = path.concat("第" + this.pageCount + "页");
String imgPath = fileDirs.concat("\\\\").concat(System.currentTimeMillis() + ".jpg");
File file = new File(imgPath);
if (!new File(fileDirs).exists()) {
file.mkdirs();
}
File storeFile = new File(imgPath);
output = new FileOutputStream(storeFile);
//得到网络资源的字节数组,并写入文件
output.write(bytes);
list.add(imgPath);
String s = "CPU数:" + PROCESSORS + ", 当前线程:" + Thread.currentThread().getName() + ", 线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
executor.getQueue().size() + ",已执行任务数目:" + executor.getCompletedTaskCount() + ",请求页面:" + url + ",地址:" + imgPath;
System.out.println(s);
}
return "任务执行成功:" + url + ":" + JSON.toJSONString(list);
} catch (IOException e) {
throw new RuntimeException("任务异常:" + this.url);
} finally {
if (Objects.equals(output, null)) {
try {
output.close();
} catch (IOException e) {
throw new RuntimeException("任务异常:" + this.url);
}
} }
} }
}
结果
这里设置的工作队列有点小了,差点满了。最大 36 线程在执行,满了会抛异常,一定要合理设置该大小!!!不然任务会丢弃
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于