前言
以前在写爬虫的时候,要么是直接使用 HTTPClient 循环去抓取,这样有个弊端,就是人物是阻塞的,后一个任务必须等待上一个任务结束。后面使用了 Thread 线程去执行任务,有多少个任务就开多少条线程,但是频繁线程的创建与销毁又造成了不必要的资源浪费。于是我们用到了线程池,同时使用到 Guava 的并发类 ListenableFuture。
单线程
直接开一条线程去爬虫
new Thread(() -> { //爬虫任务 });
多线程
当然想爬多个页面也可以使用这样的方式开多条线程。。。
while (条件) { new Thread(() -> { //爬虫任务 }); }
那你就 out 太多了,new Thread 的弊端如下:
a. 每次 new Thread 新建对象性能差。
b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或 oom。
c. 缺乏更多功能,如定时执行、定期执行、线程中断。
相比 new Thread,Java 提供的四种线程池的好处在于:
a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
c. 提供定时执行、定期执行、单线程、并发数控制等功能。
线程池
无执行结果
无返回结果是实现了 Runable 接口,重写的 run 方法返回值为 void,线程执行完毕无返回结果。同时线程池 exeute 方法也是无返回值的。
//创建条三个线程的线程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); //提交3个任务 for (int i = 0; i < 3; i++) { fixedThreadPool.execute(new Thread(() -> { //爬虫任务 }) ); }
有执行结果
线程实现 Calable 接口,重写 cal 方法,并且在类上定义返回值泛型。将结果再 call 方法中返回即可。同时使用线程池有返回值的 Submit 方法。
public static void main(String[] args) throws ExecutionException, InterruptedException { //创建条三个线程的线程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); //提交3个任务 for (int i = 0; i < 3; i++) { //无返回值 fixedThreadPool.execute(new Thread(() -> { //爬虫任务 }) ); //有返回值 Future<String> future = fixedThreadPool.submit(new Crawler()); System.out.println(future.get()); } } static class Crawler implements Callable<String> { @Override public String call() { //执行爬虫任务 return "成功"; } }
输出
上面的线程池创建方式是有很多弊端的,一般使用都会根据自己的场景配置线程池。
这里就不细写了,网上很多文章讲都非常不错。下面贴两篇
Guava 线程池实战
google Guava 包的 ListenableFuture 解析
接口
传统 JDK 中的 Future 通过异步的方式计算返回结果:在多线程运算中可能或者可能在没有结束返回结果,Future 是运行中的多线程的一个引用句柄,确保在服务执行返回一个 Result。
ListenableFuture 可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在 JDK concurrent 中的 Future 是不支持的。
ListenableFuture 中的基础方法是 addListener(Runnable, Executor), 该方法会在多线程运算完的时候,指定的 Runnable 参数传入的对象会被指定的 Executor 执行。
添加回调(Callbacks)
多数用户喜欢使用 Futures.addCallback(ListenableFuture, FutureCallback, Executor)的方式, 或者 另外一个版本 version(译者注:addCallback(ListenableFuture future,FutureCallback callback)),默认是采用 MoreExecutors.sameThreadExecutor()线程池, 为了简化使用,Callback 采用轻量级的设计. FutureCallback 中实现了两个方法:
- onSuccess(V),在 Future 成功的时候执行,根据 Future 结果来判断。
- onFailure(Throwable), 在 Future 失败的时候执行,根据 Future 结果来判断。
爬虫妹子图全站图片
public class meizitu { //保存路径 static String path = "E:\\\\meizitu\\\\"; //图片地址 static String page = "http://meizitu.com/a/more_%d.html"; //起始页 static int pageCouont = 1; //当前可用CPU数 static final int PROCESSORS = Runtime.getRuntime().availableProcessors(); //线程名 static ThreadFactory threadName = new ThreadFactoryBuilder().setNameFormat("当前线程-%d").build(); /** * 合理配置线程池核心数: * 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1 * 如果是IO密集型任务,参考值可以设置为2*NCPU || 3*NCPU * * 最大线程池为CPU*5 * * 最大线程池-核心线程的线程,空闲200毫秒,没有任务则收回。 * * 工作队列最大1000任务 * * 超过则采用丢弃策略:任务并抛出RejectedExecutionException异常。 * * 线程池基本知识:https://www.cnblogs.com/dolphin0520/p/3932921.html */ static ThreadPoolExecutor executor = new ThreadPoolExecutor(PROCESSORS * 3, PROCESSORS * 5, 200L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000), threadName, new ThreadPoolExecutor.AbortPolicy()); //guava并发 static ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor); public static void main(String[] args) throws IOException { while (true) { //初始化httpClient CloseableHttpClient httpClient = HttpClients.createDefault(); //格式化请求的图片页面地址 String url = String.format(page, pageCouont); //请求地址 CloseableHttpResponse httpResponse = httpClient.execute(new HttpGet(url)); //如果该页不是404则表示页面有数据 if (404 == httpResponse.getStatusLine().getStatusCode()) { break; } String html = EntityUtils.toString(httpResponse.getEntity()); //解析每个图片的URL Elements item = Jsoup.parse(html).getElementsByClass("wp-item"); for (Element element : item) { String href = element.getElementsByTag("a").attr("href"); //任务结果回调 把每个图片的URL提交到线程池 Futures.addCallback(guavaExecutor.submit(new Crawler(href, pageCouont)), new FutureCallback<String>() { @Override public void onSuccess(@Nullable String result) { //任务执行成功 System.out.println(result); } @Override public void onFailure(Throwable t) { //执行失败 System.out.println(t.getMessage()); } }, executor); } pageCouont++; } System.out.println("任务页数:" + pageCouont); } static class Crawler implements Callable<String> { private String url; private int pageCount; public Crawler(String url, int pageCount) { this.url = url; this.pageCount = pageCount; } @Override public String call() { FileOutputStream output = null; ArrayList<String> list = Lists.newArrayList(); try { /** * 以下是分析图片地址并下载代码 */ String body = HttpRequest.get(this.url).body(); Document parse = Jsoup.parse(body); String title = parse.title(); Element picture = parse.getElementById("picture"); Elements imgs = picture.getElementsByTag("img"); for (Element e : imgs) { String src = e.attr("src"); String alt = e.attr("alt"); byte[] bytes = HttpRequest.get(src).bytes(); //本来是打算获取到文章标题和图片的介绍保存为图片名字的,获取到的页面是乱码,暂时未解决。 //String imgPath = path.concat("第" + this.pageCount + "页").concat("\\").concat(title).concat("\\").concat(alt); String fileDirs = path.concat("第" + this.pageCount + "页"); String imgPath = fileDirs.concat("\\\\").concat(System.currentTimeMillis() + ".jpg"); File file = new File(imgPath); if (!new File(fileDirs).exists()) { file.mkdirs(); } File storeFile = new File(imgPath); output = new FileOutputStream(storeFile); //得到网络资源的字节数组,并写入文件 output.write(bytes); list.add(imgPath); String s = "CPU数:" + PROCESSORS + ", 当前线程:" + Thread.currentThread().getName() + ", 线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size() + ",已执行任务数目:" + executor.getCompletedTaskCount() + ",请求页面:" + url + ",地址:" + imgPath; System.out.println(s); } return "任务执行成功:" + url + ":" + JSON.toJSONString(list); } catch (IOException e) { throw new RuntimeException("任务异常:" + this.url); } finally { if (Objects.equals(output, null)) { try { output.close(); } catch (IOException e) { throw new RuntimeException("任务异常:" + this.url); } } } } } }
结果
这里设置的工作队列有点小了,差点满了。最大 36 线程在执行,满了会抛异常,一定要合理设置该大小!!!不然任务会丢弃
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于