使用 hadoop api 获取任务日志

本贴最后更新于 1911 天前,其中的信息可能已经事过景迁

正常情况下我们可以通过开启日志聚合在 yarn webUi 上查看任务日志,但是当我们需要定制日志呈现方式时就需要使用到 hadoop 提供的 api 来获取。以下为 demo。

引入依赖

<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency>

代码实现:

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Times; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.DataInputStream; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; public class HadoopLogUtils { private static Configuration yarnConfiguration; private static LogCLIHelpers logCLIHelpers; private static final Logger LOGGER = LoggerFactory.getLogger(HadoopLogUtils.class); /** * 初始化配置 */ static { yarnConfiguration = new YarnConfiguration(); yarnConfiguration.addResource("core-site.xml"); yarnConfiguration.addResource("hdfs-site.xml"); yarnConfiguration.addResource("yarn-site.xml"); logCLIHelpers = new LogCLIHelpers(); logCLIHelpers.setConf(yarnConfiguration); } public static Configuration getYarnConfiguration() { return yarnConfiguration; } /** * 获取容器日志 * * @param appId * @param containerId * @param nodeId * @param jobOwner * @param out * @return * @throws IOException */ public static int dumpAContainersLogs(String appId, String containerId, String nodeId, String jobOwner, PrintStream out, List<String> logType) throws IOException { Path remoteRootLogDir = new Path(getYarnConfiguration().get("yarn.nodemanager.remote-app-log-dir", "/tmp/logs")); String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getYarnConfiguration()); Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner, suffix); RemoteIterator nodeFiles; try { Path qualifiedLogDir = FileContext.getFileContext(getYarnConfiguration()).makeQualified(remoteAppLogDir); nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), getYarnConfiguration()).listStatus(remoteAppLogDir); } catch (FileNotFoundException var16) { logDirNotExist(remoteAppLogDir.toString()); return -1; } boolean foundContainerLogs = false; while (nodeFiles.hasNext()) { FileStatus thisNodeFile = (FileStatus) nodeFiles.next(); String fileName = thisNodeFile.getPath().getName(); if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) && !fileName.endsWith(".tmp")) { AggregatedLogFormat.LogReader reader = null; try { reader = new AggregatedLogFormat.LogReader(getYarnConfiguration(), thisNodeFile.getPath()); if (dumpAContainerLogs(containerId, reader, out, thisNodeFile.getModificationTime(), logType) > -1) { foundContainerLogs = true; } } finally { if (reader != null) { reader.close(); } } } } if (!foundContainerLogs) { containerLogNotFound(containerId); return -1; } else { return 0; } } private static void logDirNotExist(String remoteAppLogDir) { System.out.println(remoteAppLogDir + " does not exist."); System.out.println("Log aggregation has not completed or is not enabled."); } private static void containerLogNotFound(String containerId) { System.out.println("Logs for container " + containerId + " are not present in this log-file."); } public static int dumpAContainerLogs(String containerIdStr, AggregatedLogFormat.LogReader reader, PrintStream out, long logUploadedTime, List<String> logType) throws IOException { AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); DataInputStream valueStream; for (valueStream = reader.next(key); valueStream != null && !key.toString().equals(containerIdStr); valueStream = reader.next(key)) { key = new AggregatedLogFormat.LogKey(); } if (valueStream == null) { return -1; } else { boolean foundContainerLogs = false; while(true) { try { readContainerLogs(valueStream, out, logUploadedTime, logType); foundContainerLogs = true; } catch (EOFException var10) { if (foundContainerLogs) { return 0; } return -1; } } } } /** * 获取Containe nodeId列表 * * @param appId * @param appOwner * @return * @throws IOException */ public static Map<String, String> getContaines(String appId, String appOwner) throws IOException { Path remoteRootLogDir = new Path(yarnConfiguration.get( YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); String user = appOwner; String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(yarnConfiguration); // TODO Change this to get a list of files from the LAS. Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir( remoteRootLogDir, ConverterUtils.toApplicationId(appId), user, logDirSuffix); RemoteIterator<FileStatus> nodeFiles; Map<String, String> containerAndNodeId = new LinkedHashMap<>(); try { Path qualifiedLogDir = FileContext.getFileContext(yarnConfiguration).makeQualified(remoteAppLogDir); nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), yarnConfiguration).listStatus(remoteAppLogDir); } catch (FileNotFoundException fnf) { logDirNotExist(remoteAppLogDir.toString()); return Collections.emptyMap(); } boolean foundAnyLogs = false; while (nodeFiles.hasNext()) { FileStatus thisNodeFile = nodeFiles.next(); if (!thisNodeFile.getPath().getName() .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(yarnConfiguration, thisNodeFile.getPath()); try { DataInputStream valueStream; AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); valueStream = reader.next(key); while (valueStream != null) { // Container: container_1587284642166_0001_01_000003 on master_42757 containerAndNodeId.put(key.toString(), thisNodeFile.getPath().getName().replace("_", ":")); foundAnyLogs = true; // Next container key = new AggregatedLogFormat.LogKey(); valueStream = reader.next(key); } } finally { reader.close(); } } } if (!foundAnyLogs) { emptyLogDir(remoteAppLogDir.toString()); return Collections.emptyMap(); } return containerAndNodeId; } private static void emptyLogDir(String remoteAppLogDir) { System.out.println(remoteAppLogDir + " does not have any log files."); } private static void readContainerLogs(DataInputStream valueStream, PrintStream out, long logUploadedTime, List<String> logType) throws IOException { byte[] buf = new byte[65535]; String fileType = valueStream.readUTF(); //if (logType.contains(fileType)) { String fileLengthStr = valueStream.readUTF(); long fileLength = Long.parseLong(fileLengthStr); out.print("LogType:"); out.println(fileType); if (logUploadedTime != -1) { out.print("Log Upload Time:"); out.println(Times.format(logUploadedTime)); } out.print("LogLength:"); out.println(fileLengthStr); out.println("Log Contents:"); long curRead = 0; long pendingRead = fileLength - curRead; int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; int len = valueStream.read(buf, 0, toRead); while (len != -1 && curRead < fileLength) { out.write(buf, 0, len); curRead += len; pendingRead = fileLength - curRead; toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; len = valueStream.read(buf, 0, toRead); } out.println("End of LogType:" + fileType); out.println(""); // } } /** * covert appId * @param appId * @return */ public static ApplicationId convert(String appId) { return ConverterUtils.toApplicationId(appId); } }

实现效果:

image20200428221031425.pngimage20200428221152282.png

源码地址: https://github.com/arrayMi/hadoop_learning

  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    93 引用 • 122 回帖 • 613 关注
  • Yarn
    13 引用 • 5 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Anytype
    3 引用 • 31 回帖 • 28 关注
  • TensorFlow

    TensorFlow 是一个采用数据流图(data flow graphs),用于数值计算的开源软件库。节点(Nodes)在图中表示数学操作,图中的线(edges)则表示在节点间相互联系的多维数据数组,即张量(tensor)。

    20 引用 • 19 回帖 • 4 关注
  • Rust

    Rust 是一门赋予每个人构建可靠且高效软件能力的语言。Rust 由 Mozilla 开发,最早发布于 2014 年 9 月。

    59 引用 • 22 回帖 • 7 关注
  • Love2D

    Love2D 是一个开源的, 跨平台的 2D 游戏引擎。使用纯 Lua 脚本来进行游戏开发。目前支持的平台有 Windows, Mac OS X, Linux, Android 和 iOS。

    14 引用 • 53 回帖 • 565 关注
  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1062 引用 • 3455 回帖 • 154 关注
  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 663 关注
  • GAE

    Google App Engine(GAE)是 Google 管理的数据中心中用于 WEB 应用程序的开发和托管的平台。2008 年 4 月 发布第一个测试版本。目前支持 Python、Java 和 Go 开发部署。全球已有数十万的开发者在其上开发了众多的应用。

    14 引用 • 42 回帖 • 824 关注
  • SOHO

    为成为自由职业者在家办公而努力吧!

    7 引用 • 55 回帖 • 3 关注
  • Docker

    Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的操作系统上。容器完全使用沙箱机制,几乎没有性能开销,可以很容易地在机器和数据中心中运行。

    498 引用 • 934 回帖
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    284 引用 • 248 回帖 • 1 关注
  • 浅吟主题

    Jeffrey Chen 制作的思源笔记主题,项目仓库:https://github.com/TCOTC/Whisper

    1 引用 • 31 回帖
  • 爬虫

    网络爬虫(Spider、Crawler),是一种按照一定的规则,自动地抓取万维网信息的程序。

    106 引用 • 275 回帖
  • OneNote
    1 引用 • 3 回帖
  • RIP

    愿逝者安息!

    8 引用 • 92 回帖 • 407 关注
  • SSL

    SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS 与 SSL 在传输层对网络连接进行加密。

    70 引用 • 193 回帖 • 408 关注
  • 微服务

    微服务架构是一种架构模式,它提倡将单一应用划分成一组小的服务。服务之间互相协调,互相配合,为用户提供最终价值。每个服务运行在独立的进程中。服务于服务之间才用轻量级的通信机制互相沟通。每个服务都围绕着具体业务构建,能够被独立的部署。

    96 引用 • 155 回帖
  • Mac

    Mac 是苹果公司自 1984 年起以“Macintosh”开始开发的个人消费型计算机,如:iMac、Mac mini、Macbook Air、Macbook Pro、Macbook、Mac Pro 等计算机。

    167 引用 • 597 回帖 • 3 关注
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    142 引用 • 442 回帖
  • Hexo

    Hexo 是一款快速、简洁且高效的博客框架,使用 Node.js 编写。

    22 引用 • 148 回帖 • 12 关注
  • 房星科技

    房星网,我们不和没有钱的程序员谈理想,我们要让程序员又有理想又有钱。我们有雄厚的房地产行业线下资源,遍布昆明全城的 100 家门店、四千地产经纪人是我们坚实的后盾。

    6 引用 • 141 回帖 • 615 关注
  • flomo

    flomo 是新一代 「卡片笔记」 ,专注在碎片化时代,促进你的记录,帮你积累更多知识资产。

    6 引用 • 143 回帖
  • H2

    H2 是一个开源的嵌入式数据库引擎,采用 Java 语言编写,不受平台的限制,同时 H2 提供了一个十分方便的 web 控制台用于操作和管理数据库内容。H2 还提供兼容模式,可以兼容一些主流的数据库,因此采用 H2 作为开发期的数据库非常方便。

    11 引用 • 54 回帖 • 672 关注
  • Swagger

    Swagger 是一款非常流行的 API 开发工具,它遵循 OpenAPI Specification(这是一种通用的、和编程语言无关的 API 描述规范)。Swagger 贯穿整个 API 生命周期,如 API 的设计、编写文档、测试和部署。

    26 引用 • 35 回帖 • 2 关注
  • Angular

    AngularAngularJS 的新版本。

    26 引用 • 66 回帖 • 562 关注
  • LaTeX

    LaTeX(音译“拉泰赫”)是一种基于 ΤΕΧ 的排版系统,由美国计算机学家莱斯利·兰伯特(Leslie Lamport)在 20 世纪 80 年代初期开发,利用这种格式,即使使用者没有排版和程序设计的知识也可以充分发挥由 TeX 所提供的强大功能,能在几天,甚至几小时内生成很多具有书籍质量的印刷品。对于生成复杂表格和数学公式,这一点表现得尤为突出。因此它非常适用于生成高印刷质量的科技和数学类文档。

    12 引用 • 59 回帖 • 1 关注
  • OpenCV
    15 引用 • 36 回帖
  • 支付宝

    支付宝是全球领先的独立第三方支付平台,致力于为广大用户提供安全快速的电子支付/网上支付/安全支付/手机支付体验,及转账收款/水电煤缴费/信用卡还款/AA 收款等生活服务应用。

    29 引用 • 347 回帖