实现 SFTP 连接池进行大数据量文件上传

本贴最后更新于 1308 天前,其中的信息可能已经斗转星移

实现 SFTP 连接池进行大数据量文件上传

SFTP 连接池实现

主要基于 apachecommons-pool2 技术对 SFTP 连接对象进行池化。实现类:

  • SftpContractUtils (SFTP 连接对象)
  • SftpContractProperties (SFTP 连接配置)
  • SftpContractFactory (SFTP 连接对象创建工厂)
  • SftpContractPool (SFTP 连接池)
  • SftpPoolAutoConfigurationSFTP 连接池自动装配类)

SFTP 连接对象实现

JSch 是 Java Secure Channel 的缩写。 JSch 是一个 SSH2 的纯 Java 实现。它允许你连接到一个 SSH 服务器,并且可以使用端口转发, X11 转发,文件传输等。我们使用使用 JSch 实现的 SFTP 功能。

Maven 依赖(早期版本存在与服务端加密方法协商问题):

<dependency>
      <groupId>com.jcraft</groupId>
      <artifactId>jsch</artifactId>
      <version>0.1.55</version>
</dependency>

SftpContractUtils 类:

package org.test.infra.util;

import com.alibaba.fastjson.JSON;
import com.jcraft.jsch.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.test.config.SftpContractProperties;
import org.thymeleaf.util.StringUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.*;

public class SftpContractUtils {

    /** 日志记录器  */
    private static final Logger logger = LoggerFactory.getLogger(SftpContractUtils.class);

    // 设置编码格式
    private String charset = "UTF-8";

    /** Session */
    private Session session = null;
    /** Channel */
    private ChannelSftp channel = null;

    /** 规避多线程并发不断开问题 */
    //private static ThreadLocal<SftpContractUtils> sftpLocal = new ThreadLocal<>();


    private SftpContractUtils(){

    }

    /**
     * SFTP 安全文件传送协议
     */
    private SftpContractUtils(SftpContractProperties properties){
        logger.info("【{}】start reconnect [contract]sftp-server...",System.currentTimeMillis());
        login(properties);
    }

    /**
     * 是否已连接
     *
     * @return
     */
    public boolean isConnected() {
        return null != channel && channel.isConnected();
    }


    /**
     * 获取sftp客户端
     * utilType == null || utilType == 1 return icon
     * utilType == 2 return contract
     * @return
     */
    public static SftpContractUtils getSftpUtils(SftpContractProperties properties) {
        return new SftpContractUtils(properties);
    }

    /**
     * 获取本地线程存储的sftp客户端
     * utilType == null || utilType == 1 return icon
     * utilType == 2 return contract
     * @return
     */
//    public static SftpContractUtils getLocalSftpUtils(SftpContractProperties properties) throws Exception {
//        SftpContractUtils sftpUtils = sftpLocal.get();
//        if(sftpUtils == null || !sftpUtils.isConnected()){
//            sftpLocal.set(new SftpContractUtils(properties));
//        }
//        return sftpLocal.get();
//    }


    /**
     * 关闭通道
     *
     * @throws Exception
     */
    public void closeChannel() {
        if (null != channel) {
            try {
                channel.disconnect();
            } catch (Exception e) {
                logger.error("关闭SFTP通道发生异常:", e);
            }
        }
        if (null != session) {
            try {
                session.disconnect();
            } catch (Exception e) {
                logger.error("SFTP关闭 session异常:", e);
            }
        }
    }



    /**
     * 登陆SFTP服务器
     * @return boolean
     */
    public boolean login(SftpContractProperties properties) {

        try {
            JSch jsch = new JSch();
            // 用户名
            String contractUsername = properties.getUserName();
            // 密码
            String contractPassword = properties.getPassword();
            // SFTP服务器端口
            int sftpPort = properties.getPort();
            // SFTP服务器IP地址
            String sftpHost = properties.getHost();
            session = jsch.getSession(contractUsername, sftpHost, sftpPort);
            if(contractPassword != null){
                session.setPassword(contractPassword);
            }
            Properties config = new Properties();
            config.put("StrictHostKeyChecking", "no");
            session.setConfig(config);
            /**
             * 连接超时时间,单位毫秒
             */
            int sftpTimeout = 5000;
            session.setTimeout(sftpTimeout);
            session.connect();
            logger.debug("sftp session connected");

            logger.debug("opening channel");
            channel = (ChannelSftp)session.openChannel("sftp");
            channel.connect();

            logger.debug("connected successfully");
            return true;
        } catch (JSchException e) {
            logger.error("sftp login failed",e);
            return false;
        }
    }

    /**
     * 上传文件
     * <p>
     * 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
     * <table border="1">
     * <tr><td>当前目录</td><td>方法</td><td>参数:绝对路径/相对路径</td><td>上传后</td></tr>
     * <tr><td>/</td><td>uploadFile("testA","upload.txt",new FileInputStream(new File("up.txt")))</td><td>相对路径</td><td>/testA/upload.txt</td></tr>
     * <tr><td>/</td><td>uploadFile("testA/testA_B","upload.txt",new FileInputStream(new File("up.txt")))</td><td>相对路径</td><td>/testA/testA_B/upload.txt</td></tr>
     * <tr><td>/</td><td>uploadFile("/testA/testA_B","upload.txt",new FileInputStream(new File("up.txt")))</td><td>绝对路径</td><td>/testA/testA_B/upload.txt</td></tr>
     * </table>
     * </p>
     * @param pathName SFTP服务器目录
     * @param fileName 服务器上保存的文件名
     * @param input 输入文件流
     * @return boolean
     */
    public boolean uploadFile(String pathName,String fileName,InputStream input) throws SftpException {

        String currentDir = currentDir();
        if(!changeDir(pathName)){
            return false;
        }

        try {
            channel.put(input,fileName,ChannelSftp.OVERWRITE);
            if(!existFile(fileName)){
                logger.debug("upload failed");
                return false;
            }
            logger.debug("upload successful");
            return true;
        } catch (SftpException e) {
            logger.error("upload failed",e);
            return false;
        } finally {
            changeDir(currentDir);
        }
    }

    /**
     * 上传文件
     * <p>
     * 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
     * <table border="1">
     * <tr><td>当前目录</td><td>方法</td><td>参数:绝对路径/相对路径</td><td>上传后</td></tr>
     * <tr><td>/</td><td>uploadFile("testA","upload.txt","up.txt")</td><td>相对路径</td><td>/testA/upload.txt</td></tr>
     * <tr><td>/</td><td>uploadFile("testA/testA_B","upload.txt","up.txt")</td><td>相对路径</td><td>/testA/testA_B/upload.txt</td></tr>
     * <tr><td>/</td><td>uploadFile("/testA/testA_B","upload.txt","up.txt")</td><td>绝对路径</td><td>/testA/testA_B/upload.txt</td></tr>
     * </table>
     * </p>
     * @param pathName SFTP服务器目录
     * @param fileName 服务器上保存的文件名
     * @param localFile 本地文件
     * @return boolean
     */
    public boolean uploadFile(String pathName,String fileName,String localFile) throws SftpException{

        String currentDir = currentDir();
        if(!changeDir(pathName)){
            return false;
        }

        try {
            channel.put(localFile,fileName,ChannelSftp.OVERWRITE);
            if(!existFile(fileName)){
                logger.debug("upload failed");
                return false;
            }
            logger.debug("upload successful");
            return true;
        } catch (SftpException e) {
            logger.error("upload failed",e);
            return false;
        } finally {
            changeDir(currentDir);
        }
    }

    /**
     * 下载文件
     * <p>
     * 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
     * <table border="1">
     * <tr><td>当前目录</td><td>方法</td><td>参数:绝对路径/相对路径</td><td>下载后</td></tr>
     * <tr><td>/</td><td>downloadFile("testA","down.txt","D:\\downDir")</td><td>相对路径</td><td>D:\\downDir\\down.txt</td></tr>
     * <tr><td>/</td><td>downloadFile("testA/testA_B","down.txt","D:\\downDir")</td><td>相对路径</td><td>D:\\downDir\\down.txt</td></tr>
     * <tr><td>/</td><td>downloadFile("/testA/testA_B","down.txt","D:\\downDir")</td><td>绝对路径</td><td>D:\\downDir\\down.txt</td></tr>
     * </table>
     * </p>
     * @param remotePath SFTP服务器目录
     * @param fileName 服务器上需要下载的文件名
     * @param localPath 本地保存路径
     * @return boolean
     */
    public boolean downloadFile(String remotePath,String fileName,String localPath) throws SftpException{

        String currentDir = currentDir();
        if(!changeDir(remotePath)){
            return false;
        }

        try {
            String localFilePath = localPath + File.separator + fileName;
            channel.get(fileName,localFilePath);

            File localFile = new File(localFilePath);
            if(!localFile.exists()){
                logger.debug("download file failed");
                return false;
            }
            logger.debug("download successful");
            return true;
        } catch (SftpException e) {
            logger.error("download file failed",e);
            return false;
        } finally {
            changeDir(currentDir);
        }
    }

    /**
     * 切换工作目录
     * <p>
     * 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
     * <table border="1">
     * <tr><td>当前目录</td><td>方法</td><td>参数(绝对路径/相对路径)</td><td>切换后的目录</td></tr>
     * <tr><td>/</td><td>changeDir("testA")</td><td>相对路径</td><td>/testA/</td></tr>
     * <tr><td>/</td><td>changeDir("testA/testA_B")</td><td>相对路径</td><td>/testA/testA_B/</td></tr>
     * <tr><td>/</td><td>changeDir("/testA")</td><td>绝对路径</td><td>/testA/</td></tr>
     * <tr><td>/testA/testA_B/</td><td>changeDir("/testA")</td><td>绝对路径</td><td>/testA/</td></tr>
     * </table>
     * </p>
     * @param pathName 路径
     * @return boolean
     */
    public boolean changeDir(String pathName) throws SftpException{
        if(pathName == null || pathName.trim().equals("")){
            logger.error("invalid pathName");
            return false;
        }
        logger.debug("changeDir【{}】,before:directory successfully changed,current dir=" + channel.pwd(),pathName);
        try {
            channel.cd(pathName.replaceAll("\\\\", "/"));
        } catch (Exception e1) {
            logger.error("cd dir error", e1);
            pathName = pathName.replaceAll("\\\\", "/");
            String[] folders = pathName.split( "/" );
            for ( String folder : folders ) {
                if ( folder.length() > 0 ) {
                    try {
                        channel.cd( folder );
                    }
                    catch ( SftpException e ) {
                        channel.mkdir( folder );
                        channel.cd( folder );
                    }
                }
            }
        }

        logger.debug("changeDir【{}】,after:directory successfully changed,current dir=" + channel.pwd(),pathName);
        return true;

    }


    /**
     * 切换到上一级目录
     * <p>
     * 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
     * <table border="1">
     * <tr><td>当前目录</td><td>方法</td><td>切换后的目录</td></tr>
     * <tr><td>/testA/</td><td>changeToParentDir()</td><td>/</td></tr>
     * <tr><td>/testA/testA_B/</td><td>changeToParentDir()</td><td>/testA/</td></tr>
     * </table>
     * </p>
     * @return boolean
     */
    public boolean changeToParentDir() throws SftpException{
        return changeDir("..");
    }

    /**
     * 切换到根目录
     * @return boolean
     */
    public boolean changeToHomeDir() throws SftpException{
        String homeDir = null;
        homeDir = channel.getHome();
        return changeDir(homeDir);
    }

    /**
     * 创建目录
     * <p>
     * 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
     * <table border="1">
     * <tr><td>当前目录</td><td>方法</td><td>参数(绝对路径/相对路径)</td><td>创建成功后的目录</td></tr>
     * <tr><td>/testA/testA_B/</td><td>makeDir("testA_B_C")</td><td>相对路径</td><td>/testA/testA_B/testA_B_C/</td></tr>
     * <tr><td>/</td><td>makeDir("/testA/testA_B/testA_B_D")</td><td>绝对路径</td><td>/testA/testA_B/testA_B_D/</td></tr>
     * </table>
     * <br/>
     * <b>注意</b>,当<b>中间目录不存在</b>的情况下,不能够使用绝对路径的方式期望创建中间目录及目标目录。
     * 例如makeDir("/testNOEXIST1/testNOEXIST2/testNOEXIST3"),这是错误的。
     * </p>
     * @param dirName 目录
     * @return boolean
     */
    public boolean makeDir(String dirName){
        try {
            channel.mkdir(dirName);
            logger.debug("directory successfully created,dir=" + dirName);
            return true;
        } catch (SftpException e) {
            logger.error("failed to create directory", e);
            return false;
        }
    }

    /**
     * 删除文件夹
     * @param dirName
     * @return boolean
     */
    @SuppressWarnings("unchecked")
    public boolean delDir(String dirName) throws SftpException{
        if(!changeDir(dirName)){
            return false;
        }

        Vector<ChannelSftp.LsEntry> list = null;
        try {
            list = channel.ls(channel.pwd());
        } catch (SftpException e) {
            logger.error("can not list directory",e);
            return false;
        }

        for(ChannelSftp.LsEntry entry : list){
            String fileName = entry.getFilename();
            if(!fileName.equals(".") && !fileName.equals("..")){
                if(entry.getAttrs().isDir()){
                    delDir(fileName);
                } else {
                    delFile(fileName);
                }
            }
        }

        if(!changeToParentDir()){
            return false;
        }

        try {
            channel.rmdir(dirName);
            logger.debug("directory " + dirName + " successfully deleted");
            return true;
        } catch (SftpException e) {
            logger.error("failed to delete directory " + dirName,e);
            return false;
        }
    }

    /**
     * 删除文件
     * @param fileName 文件名
     * @return boolean
     */
    public boolean delFile(String fileName){
        if(fileName == null || fileName.trim().equals("")){
            logger.debug("invalid filename");
            return false;
        }

        try {
            channel.rm(fileName);
            logger.debug("file " + fileName + " successfully deleted");
            return true;
        } catch (SftpException e) {
            logger.error("failed to delete file " + fileName,e);
            return false;
        }
    }

    /**
     * 当前目录下文件及文件夹名称列表
     * @return String[]
     */
    public String[] ls(){
        return list(Filter.ALL);
    }

    /**
     * 当前目录下文件及文件夹名称列表
     * @return String[]
     */
    public Optional<ChannelSftp.LsEntry> lsFile(String pathName, String fileName) throws SftpException {
        if(!changeDir(pathName)){
            logger.debug(" changeDir to pathName [{}] 失败",pathName);
            return Optional.empty();
        };
        Vector<ChannelSftp.LsEntry> list = null;
        try {
            //ls方法会返回两个特殊的目录,当前目录(.)和父目录(..)
            list = channel.ls(channel.pwd());
        } catch (SftpException e) {
            logger.error("can not list directory",e);
            return Optional.empty();
        }

        for(ChannelSftp.LsEntry entry : list){
            if(StringUtils.equals(entry.getFilename(), fileName)){
                return Optional.of(entry);
            }
        }
        return Optional.empty();
    }

    /**
     * 指定目录下文件及文件夹名称列表
     * @return String[]
     */
    public String[] ls(String pathName) throws SftpException{
        String currentDir = currentDir();
        logger.debug("currentDir:{} , ls pathName:{} ",currentDir,pathName);
        if(!changeDir(pathName)){
            logger.debug(" changeDir to pathName [{}] 失败",pathName);
            return new String[0];
        };
        String[] result = list(Filter.ALL);
        if(!changeDir(currentDir)){
            logger.debug(" changeDir to currentDir [{}] 失败",currentDir);
            return new String[0];
        }
        logger.debug(" ls pathName result:{} ",JSON.toJSONString(result));
        return result;
    }

    /**
     * 当前目录下文件名称列表
     * @return String[]
     */
    public String[] lsFiles(){
        return list(Filter.FILE);
    }

    /**
     * 指定目录下文件名称列表
     * @return String[]
     */
    public String[] lsFiles(String pathName) throws SftpException{
        String currentDir = currentDir();
        if(!changeDir(pathName)){
            return new String[0];
        };
        String[] result = list(Filter.FILE);
        if(!changeDir(currentDir)){
            return new String[0];
        }
        return result;
    }

    /**
     * 当前目录下文件夹名称列表
     * @return String[]
     */
    public String[] lsDirs(){
        return list(Filter.DIR);
    }

    /**
     * 指定目录下文件夹名称列表
     * @return String[]
     */
    public String[] lsDirs(String pathName) throws SftpException{
        String currentDir = currentDir();
        if(!changeDir(pathName)){
            return new String[0];
        };
        String[] result = list(Filter.DIR);
        if(!changeDir(currentDir)){
            return new String[0];
        }
        return result;
    }

    /**
     * 当前目录是否存在文件或文件夹
     * @param name 名称
     * @return boolean
     */
    public boolean exist(String name){
        return exist(ls(), name);
    }

    /**
     * 指定目录下,是否存在文件或文件夹
     * @param path 目录
     * @param name 名称
     * @return boolean
     */
    public boolean exist(String path,String name) throws SftpException{
        return exist(ls(path),name);
    }

    /**
     * 当前目录是否存在文件
     * @param name 文件名
     * @return boolean
     */
    public boolean existFile(String name){
        return exist(lsFiles(),name);
    }

    /**
     * 指定目录下,是否存在文件
     * @param path 目录
     * @param name 文件名
     * @return boolean
     */
    public boolean existFile(String path,String name) throws SftpException{
        return exist(lsFiles(path), name);
    }

    /**
     * 当前目录是否存在文件夹
     * @param name 文件夹名称
     * @return boolean
     */
    public boolean existDir(String name){
        return exist(lsDirs(), name);
    }

    /**
     * 指定目录下,是否存在文件夹
     * @param path 目录
     * @param name 文家夹名称
     * @return boolean
     */
    public boolean existDir(String path,String name) throws SftpException{
        return exist(lsDirs(path), name);
    }

    /**
     * 当前工作目录
     * @return String
     */
    public String currentDir() throws SftpException {
        return channel.pwd();
    }

    /**
     * 登出
     */
    public void logout(){
        if(channel != null){
            channel.quit();
            channel.disconnect();
        }
        if(session != null){
            session.disconnect();
        }
        logger.debug("logout successfully");
    }


    //------private method ------

    /** 枚举,用于过滤文件和文件夹  */
    private enum Filter {/** 文件及文件夹 */ ALL ,/** 文件 */ FILE ,/** 文件夹 */ DIR };

    /**
     * 列出当前目录下的文件及文件夹
     * @param filter 过滤参数
     * @return String[]
     */
    @SuppressWarnings("unchecked")
    private String[] list(Filter filter){
        Vector<ChannelSftp.LsEntry> list = null;
        try {
            //ls方法会返回两个特殊的目录,当前目录(.)和父目录(..)
            list = channel.ls(channel.pwd());
        } catch (SftpException e) {
            logger.error("can not list directory",e);
            return new String[0];
        }

        List<String> resultList = new ArrayList<String>();
        for(ChannelSftp.LsEntry entry : list){
            if(filter(entry, filter)){
                resultList.add(entry.getFilename());
            }
        }
        return resultList.toArray(new String[0]);
    }

    /**
     * 判断是否是否过滤条件
     * @param entry LsEntry
     * @param f 过滤参数
     * @return boolean
     */
    private boolean filter(ChannelSftp.LsEntry entry, Filter f){
        if(f.equals(Filter.ALL)){
            return !".".equals(entry.getFilename()) && !"..".equals(entry.getFilename());
        } else if(f.equals(Filter.FILE)){
            return !".".equals(entry.getFilename()) && !"..".equals(entry.getFilename()) && !entry.getAttrs().isDir();
        } else if(f.equals(Filter.DIR)){
            return !".".equals(entry.getFilename()) && !"..".equals(entry.getFilename()) && entry.getAttrs().isDir();
        }
        return false;
    }

    /**
     * 根目录
     * @return String
     */
    private String homeDir() throws SftpException {
        return channel.getHome();
    }

    /**
     * 判断字符串是否存在于数组中
     * @param strArr 字符串数组
     * @param str 字符串
     * @return boolean
     */
    private boolean exist(String[] strArr,String str){
        if(strArr == null || strArr.length == 0){
            return false;
        }
        if(str == null || "".equals(str.trim())){
            return false;
        }
        for(String s : strArr){
            if(s.equalsIgnoreCase(str)){
                return true;
            }
        }
        return false;
    }

    /**
     * 远程打包zip,目标目录下的所有文件
     * @param zipPathName 要打包的目录
     * @param zipFileName 打包的文件名
     * @return
     * @throws SftpException
     */
    public Boolean zipRemoteFiles(String zipPathName,String zipFileName) throws Exception{

        if(!changeDir(zipPathName)){
            return false;
        }
        String[] files = lsFiles();
        if(files == null || files.length == 0){
            return null;
        }
        String currentDir = currentDir();
        changeDir(currentDir);
        String comd = "zip -r -o -q "+currentDir+"/"+zipFileName+" "+currentDir+"/*";
        String cmdRtn = execCmd(comd);
        logger.info("currentDir:{} execCmd[{}] return:【{}】",currentDir,comd,cmdRtn);
        return true;
    }


    /**
     * 执行一条命令
     */
    public String execCmd(String command) {
        InputStream in = null;
        BufferedReader reader = null;
        Channel channel = null;

        String buf = null;
        try {
            channel = session.openChannel("exec");
            ((ChannelExec) channel).setCommand(command);
            channel.setInputStream(null);
            ((ChannelExec) channel).setErrStream(System.err);
            channel.connect();
            in = channel.getInputStream();
            reader = new BufferedReader(new InputStreamReader(in, Charset.forName(charset)));

            while ((buf = reader.readLine()) != null) {
                System.out.println(buf);
            }
            channel.disconnect();
        } catch (Exception e) {
            logger.error(" SFTP execCmd ERROR:",e);
        }
        return buf;
    }
}

SFTP 连接配置

配置 IP ,用户名,密码,连接池相关配置

package org.test.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = SftpContractProperties.PROJECT_PREFIX)
public class SftpContractProperties {
    public static final String PROJECT_PREFIX = "hzero.sftp";
    /**
     * 用户名
     */
    private String userName;
    /**
     * 密码
     */
    private String password;
    /**
     * SFTP服务器端口
     */
    private int port;
    /**
     * SFTP服务器IP地址
     */
    private String host;

    private SftpContractPoolProperties poolProperties;

    public static class SftpContractPoolProperties {
        private String poolPrefix = "hzero.sftp";
        /**
         * 最大空闲
         */
        private int maxIdle = 5;
        /**
         * 最大总数
         */
        private int maxTotal = 10;
        /**
         * 最小空闲
         */
        private int minIdle = 0;

        /**
         * 初始化连接数
         */
        private int initialSize = 3;

        public String getPoolPrefix() {
            return poolPrefix;
        }

        public SftpContractPoolProperties setPoolPrefix(String poolPrefix) {
            this.poolPrefix = poolPrefix;
            return this;
        }

        public int getMaxIdle() {
            return maxIdle;
        }

        public SftpContractPoolProperties setMaxIdle(int maxIdle) {
            this.maxIdle = maxIdle;
            return this;
        }

        public int getMaxTotal() {
            return maxTotal;
        }

        public SftpContractPoolProperties setMaxTotal(int maxTotal) {
            this.maxTotal = maxTotal;
            return this;
        }

        public int getMinIdle() {
            return minIdle;
        }

        public SftpContractPoolProperties setMinIdle(int minIdle) {
            this.minIdle = minIdle;
            return this;
        }

        public int getInitialSize() {
            return initialSize;
        }

        public SftpContractPoolProperties setInitialSize(int initialSize) {
            this.initialSize = initialSize;
            return this;
        }
    }

    public String getUserName() {
        return userName;
    }

    public SftpContractProperties setUserName(String userName) {
        this.userName = userName;
        return this;
    }

    public String getPassword() {
        return password;
    }

    public SftpContractProperties setPassword(String password) {
        this.password = password;
        return this;
    }

    public int getPort() {
        return port;
    }

    public SftpContractProperties setPort(int port) {
        this.port = port;
        return this;
    }

    public String getHost() {
        return host;
    }

    public SftpContractProperties setHost(String host) {
        this.host = host;
        return this;
    }

    public SftpContractPoolProperties getPoolProperties() {
        return poolProperties;
    }

    public SftpContractProperties setPoolProperties(SftpContractPoolProperties poolProperties) {
        this.poolProperties = poolProperties;
        return this;
    }
}

SFTP 连接对象创建工厂

继承 commons-pool2BasePooledObjectFactory 类,实现 create , wrap 等方法就可以实现一个对象创建工厂。

package org.test.infra.util;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.test.config.SftpContractProperties;


public class SftpContractFactory extends BasePooledObjectFactory<SftpContractUtils> {
    private SftpContractProperties properties;

    public SftpContractFactory(SftpContractProperties properties){
        this.properties = properties;
    }

    @Override
    public SftpContractUtils create() throws Exception {
        // 创建新对象
        return SftpContractUtils.getSftpUtils(properties);
    }

    @Override
    public PooledObject<SftpContractUtils> wrap(SftpContractUtils sftpContractUtils) {
        // 池化对象
        return new DefaultPooledObject<>(sftpContractUtils);
    }

    @Override
    public void passivateObject(PooledObject<SftpContractUtils> p) throws Exception {
        // 将对象返回池时进行的操作,此处将工作目录设置为根目录
        p.getObject().changeToHomeDir();
    }

    @Override
    public void destroyObject(PooledObject<SftpContractUtils> p) throws Exception {
        p.getObject().closeChannel();
    }

    @Override
    public boolean validateObject(PooledObject<SftpContractUtils> p) {
        return p.getObject().isConnected();
    }
}

SFTP 连接池

继承 GenericObjectPool ,连接池将工厂对象与配置对象结合。 GenericObjectPool 中有 borrowObject , returnObject 等方法供我们获取,回归对象。

package org.test.infra.util;

import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.AbandonedConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class SftpContractPool extends GenericObjectPool<SftpContractUtils> {
    public SftpContractPool(PooledObjectFactory<SftpContractUtils> factory) {
        super(factory);
    }

    public SftpContractPool(PooledObjectFactory<SftpContractUtils> factory, GenericObjectPoolConfig config) {
        super(factory, config);
    }

    public SftpContractPool(PooledObjectFactory<SftpContractUtils> factory, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) {
        super(factory, config, abandonedConfig);
    }
}

SFTP 连接池自动装配类

package org.test.autoconfigure;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.test.config.SftpContractProperties;
import org.stest.infra.util.SftpContractFactory;
import org.test.infra.util.SftpContractPool;

@EnableConfigurationProperties(SftpContractProperties.class)
@Configuration
public class SftpPoolAutoConfiguration {
    private SftpContractPool pool;
    private SftpContractProperties properties;

    @Autowired
    public SftpPoolAutoConfiguration(SftpContractProperties properties) {
        this.properties = properties;
    }

    @ConditionalOnClass({SftpContractFactory.class})
    @Bean
    protected SftpContractPool faceSDKPool() {
        SftpContractFactory faceSDKFactory = new SftpContractFactory(properties);
        //设置对象池的相关参数
        SftpContractProperties.SftpContractPoolProperties poolProperties = properties.getPoolProperties();
        if (poolProperties == null) {
            poolProperties = new SftpContractProperties.SftpContractPoolProperties();
        }
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        poolConfig.setMaxIdle(poolProperties.getMaxIdle());
        poolConfig.setMaxTotal(poolProperties.getMaxTotal());
        poolConfig.setMinIdle(poolProperties.getMinIdle());
        poolConfig.setBlockWhenExhausted(true);
        poolConfig.setTestOnBorrow(true);
        poolConfig.setTestOnReturn(true);
        poolConfig.setTestWhileIdle(true);
        poolConfig.setTimeBetweenEvictionRunsMillis(1000 * 60 * 30);
        //一定要关闭jmx,不然springboot启动会报已经注册了某个jmx的错误
        poolConfig.setJmxEnabled(false);

        //新建一个对象池,传入对象工厂和配置
        pool = new SftpContractPool(faceSDKFactory, poolConfig);

        return pool;
    }
}

通过管道流进行文件上传

实例:

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
new Thread(
        () -> {
            generate(out);
        }
).start();
SftpContractUtils sftpContractUtils = sftpContractPool.borrowObject();
String fileName = "test1.txt";
String path = "/test"
boolean uploadFileSuccess = sftpContractUtils.uploadFile(path, fileName, in);
sftpContractPool.returnObject(sftpContractUtils);

注意:

管道输入流应该连接到管道输出流;管道输入流提供要写入管道输出流的所有数据字节。通常,数据由某个线程从 PipedInputStream 对象读取, 并由其他线程将其写入到相应的 PipedOutputStream 。不建议对这两个对象尝试使用单个线程,因为这样可能死锁线程。管道输入流包含一个缓冲区,可在缓冲区限定的 范 围内将读操作和写操作分离开。如果向连接管道输出流提供数据字节的线程不再存在,则认为该管道已损坏。

PipedInputStream 的缓冲区大小默认是 1024 字节,管道的读写操作是互相阻塞的,当缓冲区为空时,读操作阻塞;当缓冲区满时,写操作阻塞。

  • 程序员

    程序员是从事程序开发、程序维护的专业人员。

    565 引用 • 3532 回帖
1 操作
sigeisment 在 2021-04-07 10:46:50 更新了该帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...