使用 hadoop api 获取任务日志

本贴最后更新于 1698 天前,其中的信息可能已经事过景迁

正常情况下我们可以通过开启日志聚合在 yarn webUi 上查看任务日志,但是当我们需要定制日志呈现方式时就需要使用到 hadoop 提供的 api 来获取。以下为 demo。

引入依赖

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>

代码实现:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class HadoopLogUtils {
    private static Configuration yarnConfiguration;
    private static LogCLIHelpers logCLIHelpers;
    private static final Logger LOGGER = LoggerFactory.getLogger(HadoopLogUtils.class);

    /**
     * 初始化配置
     */
    static {
        yarnConfiguration = new YarnConfiguration();

        yarnConfiguration.addResource("core-site.xml");
        yarnConfiguration.addResource("hdfs-site.xml");
        yarnConfiguration.addResource("yarn-site.xml");
        logCLIHelpers = new LogCLIHelpers();
        logCLIHelpers.setConf(yarnConfiguration);
    }

    public static Configuration getYarnConfiguration() {
        return yarnConfiguration;
    }

    /**
     * 获取容器日志
     *
     * @param appId
     * @param containerId
     * @param nodeId
     * @param jobOwner
     * @param out
     * @return
     * @throws IOException
     */
    public static int dumpAContainersLogs(String appId, String containerId, String nodeId, String jobOwner, PrintStream out, List<String> logType) throws IOException {
        Path remoteRootLogDir = new Path(getYarnConfiguration().get("yarn.nodemanager.remote-app-log-dir", "/tmp/logs"));

        String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getYarnConfiguration());
        Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner, suffix);

        RemoteIterator nodeFiles;
        try {
            Path qualifiedLogDir = FileContext.getFileContext(getYarnConfiguration()).makeQualified(remoteAppLogDir);
            nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), getYarnConfiguration()).listStatus(remoteAppLogDir);
        } catch (FileNotFoundException var16) {
            logDirNotExist(remoteAppLogDir.toString());
            return -1;
        }

        boolean foundContainerLogs = false;

        while (nodeFiles.hasNext()) {
            FileStatus thisNodeFile = (FileStatus) nodeFiles.next();
            String fileName = thisNodeFile.getPath().getName();
            if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) && !fileName.endsWith(".tmp")) {
                AggregatedLogFormat.LogReader reader = null;

                try {
                    reader = new AggregatedLogFormat.LogReader(getYarnConfiguration(), thisNodeFile.getPath());
                    if (dumpAContainerLogs(containerId, reader, out, thisNodeFile.getModificationTime(), logType) > -1) {
                        foundContainerLogs = true;
                    }
                } finally {
                    if (reader != null) {
                        reader.close();
                    }

                }
            }
        }

        if (!foundContainerLogs) {
            containerLogNotFound(containerId);
            return -1;
        } else {
            return 0;
        }
    }

    private static void logDirNotExist(String remoteAppLogDir) {
        System.out.println(remoteAppLogDir + " does not exist.");
        System.out.println("Log aggregation has not completed or is not enabled.");
    }

    private static void containerLogNotFound(String containerId) {
        System.out.println("Logs for container " + containerId + " are not present in this log-file.");
    }

    public static int dumpAContainerLogs(String containerIdStr, AggregatedLogFormat.LogReader reader, PrintStream out, long logUploadedTime, List<String> logType) throws IOException {
        AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();

        DataInputStream valueStream;
        for (valueStream = reader.next(key); valueStream != null && !key.toString().equals(containerIdStr); valueStream = reader.next(key)) {
            key = new AggregatedLogFormat.LogKey();
        }

        if (valueStream == null) {
            return -1;
        } else {
            boolean foundContainerLogs = false;

            while(true) {
                try {
                    readContainerLogs(valueStream, out, logUploadedTime, logType);
                    foundContainerLogs = true;
                } catch (EOFException var10) {
                    if (foundContainerLogs) {
                        return 0;
                    }

                    return -1;
                }
            }
        }
    }


    /**
     * 获取Containe nodeId列表
     *
     * @param appId
     * @param appOwner
     * @return
     * @throws IOException
     */
    public static Map<String, String> getContaines(String appId, String appOwner) throws IOException {
        Path remoteRootLogDir = new Path(yarnConfiguration.get(
                YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
                YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
        String user = appOwner;
        String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(yarnConfiguration);
        // TODO Change this to get a list of files from the LAS.
        Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(
                remoteRootLogDir, ConverterUtils.toApplicationId(appId), user, logDirSuffix);
        RemoteIterator<FileStatus> nodeFiles;
        Map<String, String> containerAndNodeId = new LinkedHashMap<>();
        try {
            Path qualifiedLogDir =
                    FileContext.getFileContext(yarnConfiguration).makeQualified(remoteAppLogDir);
            nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
                    yarnConfiguration).listStatus(remoteAppLogDir);
        } catch (FileNotFoundException fnf) {
            logDirNotExist(remoteAppLogDir.toString());
            return Collections.emptyMap();
        }
        boolean foundAnyLogs = false;
        while (nodeFiles.hasNext()) {
            FileStatus thisNodeFile = nodeFiles.next();
            if (!thisNodeFile.getPath().getName()
                    .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
                AggregatedLogFormat.LogReader reader =
                        new AggregatedLogFormat.LogReader(yarnConfiguration, thisNodeFile.getPath());
                try {

                    DataInputStream valueStream;
                    AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
                    valueStream = reader.next(key);

                    while (valueStream != null) {
                        // Container: container_1587284642166_0001_01_000003 on master_42757
                        containerAndNodeId.put(key.toString(), thisNodeFile.getPath().getName().replace("_", ":"));

                        foundAnyLogs = true;
                        // Next container
                        key = new AggregatedLogFormat.LogKey();
                        valueStream = reader.next(key);
                    }
                } finally {
                    reader.close();
                }
            }
        }
        if (!foundAnyLogs) {
            emptyLogDir(remoteAppLogDir.toString());
            return Collections.emptyMap();
        }
        return containerAndNodeId;
    }


    private static void emptyLogDir(String remoteAppLogDir) {
        System.out.println(remoteAppLogDir + " does not have any log files.");
    }


    private static void readContainerLogs(DataInputStream valueStream,
                                          PrintStream out, long logUploadedTime, List<String> logType) throws IOException {
        byte[] buf = new byte[65535];

        String fileType = valueStream.readUTF();
        //if (logType.contains(fileType)) {
            String fileLengthStr = valueStream.readUTF();
            long fileLength = Long.parseLong(fileLengthStr);
            out.print("LogType:");
            out.println(fileType);
            if (logUploadedTime != -1) {
                out.print("Log Upload Time:");
                out.println(Times.format(logUploadedTime));
            }
            out.print("LogLength:");
            out.println(fileLengthStr);
            out.println("Log Contents:");

            long curRead = 0;
            long pendingRead = fileLength - curRead;
            int toRead =
                    pendingRead > buf.length ? buf.length : (int) pendingRead;
            int len = valueStream.read(buf, 0, toRead);
            while (len != -1 && curRead < fileLength) {
                out.write(buf, 0, len);
                curRead += len;

                pendingRead = fileLength - curRead;
                toRead =
                        pendingRead > buf.length ? buf.length : (int) pendingRead;
                len = valueStream.read(buf, 0, toRead);
            }
            out.println("End of LogType:" + fileType);
            out.println("");
       // }
    }

    /**
     * covert appId
     * @param appId
     * @return
     */
    public static ApplicationId convert(String appId) {
        return ConverterUtils.toApplicationId(appId);
    }


}

实现效果:

image20200428221031425.pngimage20200428221152282.png

源码地址: https://github.com/arrayMi/hadoop_learning

  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    86 引用 • 122 回帖 • 627 关注
  • Yarn
    11 引用 • 5 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • PostgreSQL

    PostgreSQL 是一款功能强大的企业级数据库系统,在 BSD 开源许可证下发布。

    22 引用 • 22 回帖
  • Wide

    Wide 是一款基于 Web 的 Go 语言 IDE。通过浏览器就可以进行 Go 开发,并有代码自动完成、查看表达式、编译反馈、Lint、实时结果输出等功能。

    欢迎访问我们运维的实例: https://wide.b3log.org

    30 引用 • 218 回帖 • 635 关注
  • 脑图

    脑图又叫思维导图,是表达发散性思维的有效图形思维工具 ,它简单却又很有效,是一种实用性的思维工具。

    30 引用 • 96 回帖
  • 运维

    互联网运维工作,以服务为中心,以稳定、安全、高效为三个基本点,确保公司的互联网业务能够 7×24 小时为用户提供高质量的服务。

    149 引用 • 257 回帖
  • SVN

    SVN 是 Subversion 的简称,是一个开放源代码的版本控制系统,相较于 RCS、CVS,它采用了分支管理系统,它的设计目标就是取代 CVS。

    29 引用 • 98 回帖 • 694 关注
  • LaTeX

    LaTeX(音译“拉泰赫”)是一种基于 ΤΕΧ 的排版系统,由美国计算机学家莱斯利·兰伯特(Leslie Lamport)在 20 世纪 80 年代初期开发,利用这种格式,即使使用者没有排版和程序设计的知识也可以充分发挥由 TeX 所提供的强大功能,能在几天,甚至几小时内生成很多具有书籍质量的印刷品。对于生成复杂表格和数学公式,这一点表现得尤为突出。因此它非常适用于生成高印刷质量的科技和数学类文档。

    12 引用 • 54 回帖 • 49 关注
  • Gzip

    gzip (GNU zip)是 GNU 自由软件的文件压缩程序。我们在 Linux 中经常会用到后缀为 .gz 的文件,它们就是 Gzip 格式的。现今已经成为互联网上使用非常普遍的一种数据压缩格式,或者说一种文件格式。

    9 引用 • 12 回帖 • 147 关注
  • IBM

    IBM(国际商业机器公司)或万国商业机器公司,简称 IBM(International Business Machines Corporation),总公司在纽约州阿蒙克市。1911 年托马斯·沃森创立于美国,是全球最大的信息技术和业务解决方案公司,拥有全球雇员 30 多万人,业务遍及 160 多个国家和地区。

    17 引用 • 53 回帖 • 140 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    36 引用 • 35 回帖
  • 倾城之链
    23 引用 • 66 回帖 • 138 关注
  • OpenStack

    OpenStack 是一个云操作系统,通过数据中心可控制大型的计算、存储、网络等资源池。所有的管理通过前端界面管理员就可以完成,同样也可以通过 Web 接口让最终用户部署资源。

    10 引用 • 1 关注
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    286 引用 • 248 回帖 • 44 关注
  • OkHttp

    OkHttp 是一款 HTTP & HTTP/2 客户端库,专为 Android 和 Java 应用打造。

    16 引用 • 6 回帖 • 76 关注
  • WiFiDog

    WiFiDog 是一套开源的无线热点认证管理工具,主要功能包括:位置相关的内容递送;用户认证和授权;集中式网络监控。

    1 引用 • 7 回帖 • 591 关注
  • Vditor

    Vditor 是一款浏览器端的 Markdown 编辑器,支持所见即所得、即时渲染(类似 Typora)和分屏预览模式。它使用 TypeScript 实现,支持原生 JavaScript、Vue、React 和 Angular。

    354 引用 • 1823 回帖 • 1 关注
  • 单点登录

    单点登录(Single Sign On)是目前比较流行的企业业务整合的解决方案之一。SSO 的定义是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。

    9 引用 • 25 回帖
  • WordPress

    WordPress 是一个使用 PHP 语言开发的博客平台,用户可以在支持 PHP 和 MySQL 数据库的服务器上架设自己的博客。也可以把 WordPress 当作一个内容管理系统(CMS)来使用。WordPress 是一个免费的开源项目,在 GNU 通用公共许可证(GPLv2)下授权发布。

    66 引用 • 114 回帖 • 223 关注
  • SendCloud

    SendCloud 由搜狐武汉研发中心孵化的项目,是致力于为开发者提供高质量的触发邮件服务的云端邮件发送平台,为开发者提供便利的 API 接口来调用服务,让邮件准确迅速到达用户收件箱并获得强大的追踪数据。

    2 引用 • 8 回帖 • 486 关注
  • 深度学习

    深度学习(Deep Learning)是机器学习的分支,是一种试图使用包含复杂结构或由多重非线性变换构成的多个处理层对数据进行高层抽象的算法。

    53 引用 • 40 回帖 • 2 关注
  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    545 引用 • 672 回帖
  • GAE

    Google App Engine(GAE)是 Google 管理的数据中心中用于 WEB 应用程序的开发和托管的平台。2008 年 4 月 发布第一个测试版本。目前支持 Python、Java 和 Go 开发部署。全球已有数十万的开发者在其上开发了众多的应用。

    14 引用 • 42 回帖 • 780 关注
  • 心情

    心是产生任何想法的源泉,心本体会陷入到对自己本体不能理解的状态中,因为心能产生任何想法,不能分出对错,不能分出自己。

    59 引用 • 369 回帖
  • Scala

    Scala 是一门多范式的编程语言,集成面向对象编程和函数式编程的各种特性。

    13 引用 • 11 回帖 • 138 关注
  • 域名

    域名(Domain Name),简称域名、网域,是由一串用点分隔的名字组成的 Internet 上某一台计算机或计算机组的名称,用于在数据传输时标识计算机的电子方位(有时也指地理位置)。

    43 引用 • 208 回帖
  • Openfire

    Openfire 是开源的、基于可拓展通讯和表示协议 (XMPP)、采用 Java 编程语言开发的实时协作服务器。Openfire 的效率很高,单台服务器可支持上万并发用户。

    6 引用 • 7 回帖 • 101 关注
  • 电影

    这是一个不能说的秘密。

    121 引用 • 604 回帖 • 1 关注
  • SQLite

    SQLite 是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是全世界使用最为广泛的数据库引擎。

    5 引用 • 7 回帖 • 1 关注