通过JAVA API远程操作HDFS文件系统工具类
package com.huatec.edu.cloud.hdata.core.ooziejob.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.io.IOUtils;
import org.springframework.http.ResponseEntity;import java.io.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/**
@ Auther: houwenjie
@ Date: 2018/9/8 10:57
@ Description:远程操纵 HDFS 工具类
*/
public class HDFSIOUtil {private static Configuration conf =new Configuration();
private static FileSystem fs =null;
private static Date date2 = new Date();
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");//24 小时制private HDFSIOUtil(){}
/**
- @ return:
- @ auther: houwenjie
- @ date: 2018/9/3 14:59
- @ Description: 读取 hdfs 上面文件
*/
public static String readFile(String filePath) throws IOException, InterruptedException {
FSDataInputStream in = null;
Path srcPath = new Path(filePath);
try {
fs = srcPath.getFileSystem(conf);
in = fs.open(srcPath);
InputStreamReader isr = new InputStreamReader(in, "utf-8");
BufferedReader br = new BufferedReader(isr);
String line;
StringBuffer stringBuffer = new StringBuffer();
while ((line = br.readLine()) != null) {
stringBuffer.append(line);
stringBuffer.append("\r\n");
}
return stringBuffer.toString();
// IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}/**
@ auther: houwenjie
@ date: 2018/9/8 11:21
@ Description: 将文件上传到 hdfs 上传路径需要指定文件名
*/
public static void copyFileToHDFS(File file, String HDFSPath) throws IOException, InterruptedException {Path path = new Path(HDFSPath);
fs = path.getFileSystem(conf);InputStream in = new BufferedInputStream(new FileInputStream(file));
OutputStream out = fs.create(new Path(HDFSPath));
IOUtils.copyBytes(in, out, 4096, true);
in.close();
}/**
- 获取 hdfs 路径下的文件列表
- @param srcpath
- @return
*/
public static List getFileList(String srcpath) throws IOException {
Path path = new Path(srcpath);
fs = path.getFileSystem(conf);
Date date2 = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");//24 小时制
List files = new ArrayList();
HDFSFileEntity hdfsFileEntity = null;
if (fs.exists(path) && fs.isDirectory(path)) {
for (FileStatus status : fs.listStatus(path)) {
hdfsFileEntity= new HDFSFileEntity();
// String s=(status.isDirectory() ? "d" : "-")+status.getPermission()+","+status.getLen()+","+simpleDateFormat.format(date2)+","+status.getPath();
hdfsFileEntity.setPermission((status.isDirectory() ? "d" : "-")+status.getPermission());
hdfsFileEntity.setSize(status.getLen());
hdfsFileEntity.setModification_time(simpleDateFormat.format(date2));
hdfsFileEntity.setPath(status.getPath().toString());
hdfsFileEntity.setBlock_replication((int)status.getReplication());
hdfsFileEntity.setOwner(status.getOwner());
hdfsFileEntity.setGroup(status.getGroup());
hdfsFileEntity.setBlocksize(status.getBlockSize());
files.add(hdfsFileEntity);
}
}
//fs.close();
return files;
}public static void mkdir(String path) throws IOException {
Path srcPath = new Path(path);
FileSystem fs = srcPath.getFileSystem(conf);
boolean isok = fs.mkdirs(srcPath);
if (isok) {
System.out.println("create dir ok!");
} else {
System.out.println("create dir failure");
}
//fs.close();
}
/**
*
@ return:
@ auther: houwenjie
@ date: 2018/9/10 14:34
@ Description: 根据路径递归查询路径下的文件
/
public static List getFileSystem(String srcpath,List files) throws IOException {
Path path = new Path(srcpath);
fs = path.getFileSystem(conf);
if (fs.exists(path) && fs.isDirectory(path)) {
for (FileStatus status : fs.listStatus(path)) {
files.add((status.isDirectory() ? "d" : "-")+status.getPermission()+","+status.getLen()+","+simpleDateFormat.format(date2)+","+status.getPath());
getFileSystem(status.getPath().toString(),files);
}
}
return files;
}
/*@ auther: houwenjie
@ date: 2018/9/12 11:21
@ Description: 判断给定的路径是否是文件
/
public static boolean isFile( String HDFSPath) throws IOException {
Path path = new Path(HDFSPath);
fs = path.getFileSystem(conf);
boolean isFile = fs.isFile(path);
return isFile;
}
/*如果是高可用集群,判断是否是 active 的 namenode
@ return:
@ auther: houwenjie
@ date: 2018/10/16 13:45
@ Description:
*/
public static String getActiveNameNode(String HDFSPath) {try {
Path path = new Path(HDFSPath);
fs = path.getFileSystem(conf);
InetSocketAddress active = HAUtil.getAddressOfActive(fs);
InetAddress address = active.getAddress();
return "" + address.getHostAddress() + ":" + active.getPort();
} catch (Exception e) {
// e.printStackTrace();
return null;
}
}}
HDFSFileEntity
实体类
package com.huatec.edu.cloud.hdata.core.ooziejob.utils;
import org.apache.hadoop.fs.Path;
/**
@ Auther: houwenjie
@ Date: 2018/10/15 14:08
@ Description:
*/
public class HDFSFileEntity {
private String path;
private long size;
private int block_replication;
private long blocksize;
private String modification_time;
private String permission;
private String owner;
private String group;@Override
public String toString() {
return "HDFSFileEntity{" +
"path='" + path + ''' +
", size=" + size +
", block_replication=" + block_replication +
", blocksize=" + blocksize +
", modification_time='" + modification_time + ''' +
", permission='" + permission + ''' +
", owner='" + owner + ''' +
", group='" + group + ''' +
'}';
}public String getPath() {
return path;
}public void setPath(String path) {
this.path = path;
}public long getSize() {
return size;
}public void setSize(long size) {
this.size = size;
}public int getBlock_replication() {
return block_replication;
}public void setBlock_replication(int block_replication) {
this.block_replication = block_replication;
}public long getBlocksize() {
return blocksize;
}public void setBlocksize(long blocksize) {
this.blocksize = blocksize;
}public String getModification_time() {
return modification_time;
}public void setModification_time(String modification_time) {
this.modification_time = modification_time;
}public String getPermission() {
return permission;
}public void setPermission(String permission) {
this.permission = permission;
}public String getOwner() {
return owner;
}public void setOwner(String owner) {
this.owner = owner;
}public String getGroup() {
return group;
}public void setGroup(String group) {
this.group = group;
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于