正常情况下我们可以通过开启日志聚合在 yarn webUi 上查看任务日志,但是当我们需要定制日志呈现方式时就需要使用到 hadoop 提供的 api 来获取。以下为 demo。
引入依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
代码实现:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class HadoopLogUtils {
private static Configuration yarnConfiguration;
private static LogCLIHelpers logCLIHelpers;
private static final Logger LOGGER = LoggerFactory.getLogger(HadoopLogUtils.class);
/**
* 初始化配置
*/
static {
yarnConfiguration = new YarnConfiguration();
yarnConfiguration.addResource("core-site.xml");
yarnConfiguration.addResource("hdfs-site.xml");
yarnConfiguration.addResource("yarn-site.xml");
logCLIHelpers = new LogCLIHelpers();
logCLIHelpers.setConf(yarnConfiguration);
}
public static Configuration getYarnConfiguration() {
return yarnConfiguration;
}
/**
* 获取容器日志
*
* @param appId
* @param containerId
* @param nodeId
* @param jobOwner
* @param out
* @return
* @throws IOException
*/
public static int dumpAContainersLogs(String appId, String containerId, String nodeId, String jobOwner, PrintStream out, List<String> logType) throws IOException {
Path remoteRootLogDir = new Path(getYarnConfiguration().get("yarn.nodemanager.remote-app-log-dir", "/tmp/logs"));
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getYarnConfiguration());
Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner, suffix);
RemoteIterator nodeFiles;
try {
Path qualifiedLogDir = FileContext.getFileContext(getYarnConfiguration()).makeQualified(remoteAppLogDir);
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), getYarnConfiguration()).listStatus(remoteAppLogDir);
} catch (FileNotFoundException var16) {
logDirNotExist(remoteAppLogDir.toString());
return -1;
}
boolean foundContainerLogs = false;
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = (FileStatus) nodeFiles.next();
String fileName = thisNodeFile.getPath().getName();
if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) && !fileName.endsWith(".tmp")) {
AggregatedLogFormat.LogReader reader = null;
try {
reader = new AggregatedLogFormat.LogReader(getYarnConfiguration(), thisNodeFile.getPath());
if (dumpAContainerLogs(containerId, reader, out, thisNodeFile.getModificationTime(), logType) > -1) {
foundContainerLogs = true;
}
} finally {
if (reader != null) {
reader.close();
}
}
}
}
if (!foundContainerLogs) {
containerLogNotFound(containerId);
return -1;
} else {
return 0;
}
}
private static void logDirNotExist(String remoteAppLogDir) {
System.out.println(remoteAppLogDir + " does not exist.");
System.out.println("Log aggregation has not completed or is not enabled.");
}
private static void containerLogNotFound(String containerId) {
System.out.println("Logs for container " + containerId + " are not present in this log-file.");
}
public static int dumpAContainerLogs(String containerIdStr, AggregatedLogFormat.LogReader reader, PrintStream out, long logUploadedTime, List<String> logType) throws IOException {
AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
DataInputStream valueStream;
for (valueStream = reader.next(key); valueStream != null && !key.toString().equals(containerIdStr); valueStream = reader.next(key)) {
key = new AggregatedLogFormat.LogKey();
}
if (valueStream == null) {
return -1;
} else {
boolean foundContainerLogs = false;
while(true) {
try {
readContainerLogs(valueStream, out, logUploadedTime, logType);
foundContainerLogs = true;
} catch (EOFException var10) {
if (foundContainerLogs) {
return 0;
}
return -1;
}
}
}
}
/**
* 获取Containe nodeId列表
*
* @param appId
* @param appOwner
* @return
* @throws IOException
*/
public static Map<String, String> getContaines(String appId, String appOwner) throws IOException {
Path remoteRootLogDir = new Path(yarnConfiguration.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String user = appOwner;
String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(yarnConfiguration);
// TODO Change this to get a list of files from the LAS.
Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, ConverterUtils.toApplicationId(appId), user, logDirSuffix);
RemoteIterator<FileStatus> nodeFiles;
Map<String, String> containerAndNodeId = new LinkedHashMap<>();
try {
Path qualifiedLogDir =
FileContext.getFileContext(yarnConfiguration).makeQualified(remoteAppLogDir);
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
yarnConfiguration).listStatus(remoteAppLogDir);
} catch (FileNotFoundException fnf) {
logDirNotExist(remoteAppLogDir.toString());
return Collections.emptyMap();
}
boolean foundAnyLogs = false;
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (!thisNodeFile.getPath().getName()
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(yarnConfiguration, thisNodeFile.getPath());
try {
DataInputStream valueStream;
AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
// Container: container_1587284642166_0001_01_000003 on master_42757
containerAndNodeId.put(key.toString(), thisNodeFile.getPath().getName().replace("_", ":"));
foundAnyLogs = true;
// Next container
key = new AggregatedLogFormat.LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
}
if (!foundAnyLogs) {
emptyLogDir(remoteAppLogDir.toString());
return Collections.emptyMap();
}
return containerAndNodeId;
}
private static void emptyLogDir(String remoteAppLogDir) {
System.out.println(remoteAppLogDir + " does not have any log files.");
}
private static void readContainerLogs(DataInputStream valueStream,
PrintStream out, long logUploadedTime, List<String> logType) throws IOException {
byte[] buf = new byte[65535];
String fileType = valueStream.readUTF();
//if (logType.contains(fileType)) {
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
out.print("LogType:");
out.println(fileType);
if (logUploadedTime != -1) {
out.print("Log Upload Time:");
out.println(Times.format(logUploadedTime));
}
out.print("LogLength:");
out.println(fileLengthStr);
out.println("Log Contents:");
long curRead = 0;
long pendingRead = fileLength - curRead;
int toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead;
int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < fileLength) {
out.write(buf, 0, len);
curRead += len;
pendingRead = fileLength - curRead;
toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead);
}
out.println("End of LogType:" + fileType);
out.println("");
// }
}
/**
* covert appId
* @param appId
* @return
*/
public static ApplicationId convert(String appId) {
return ConverterUtils.toApplicationId(appId);
}
}
实现效果:
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于