实现 SFTP
连接池进行大数据量文件上传
SFTP
连接池实现
主要基于 apache
的 commons-pool2 技术对 SFTP
连接对象进行池化。实现类:
SftpContractUtils
(SFTP
连接对象)SftpContractProperties
(SFTP
连接配置)SftpContractFactory
(SFTP
连接对象创建工厂)SftpContractPool
(SFTP
连接池)SftpPoolAutoConfiguration
(SFTP
连接池自动装配类)
SFTP
连接对象实现
JSch
是 Java Secure Channel 的缩写。 JSch
是一个 SSH2
的纯 Java 实现。它允许你连接到一个 SSH 服务器,并且可以使用端口转发, X11
转发,文件传输等。我们使用使用 JSch
实现的 SFTP
功能。
Maven 依赖(早期版本存在与服务端加密方法协商问题):
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.55</version>
</dependency>
SftpContractUtils
类:
package org.test.infra.util;
import com.alibaba.fastjson.JSON;
import com.jcraft.jsch.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.test.config.SftpContractProperties;
import org.thymeleaf.util.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.*;
public class SftpContractUtils {
/** 日志记录器 */
private static final Logger logger = LoggerFactory.getLogger(SftpContractUtils.class);
// 设置编码格式
private String charset = "UTF-8";
/** Session */
private Session session = null;
/** Channel */
private ChannelSftp channel = null;
/** 规避多线程并发不断开问题 */
//private static ThreadLocal<SftpContractUtils> sftpLocal = new ThreadLocal<>();
private SftpContractUtils(){
}
/**
* SFTP 安全文件传送协议
*/
private SftpContractUtils(SftpContractProperties properties){
logger.info("【{}】start reconnect [contract]sftp-server...",System.currentTimeMillis());
login(properties);
}
/**
* 是否已连接
*
* @return
*/
public boolean isConnected() {
return null != channel && channel.isConnected();
}
/**
* 获取sftp客户端
* utilType == null || utilType == 1 return icon
* utilType == 2 return contract
* @return
*/
public static SftpContractUtils getSftpUtils(SftpContractProperties properties) {
return new SftpContractUtils(properties);
}
/**
* 获取本地线程存储的sftp客户端
* utilType == null || utilType == 1 return icon
* utilType == 2 return contract
* @return
*/
// public static SftpContractUtils getLocalSftpUtils(SftpContractProperties properties) throws Exception {
// SftpContractUtils sftpUtils = sftpLocal.get();
// if(sftpUtils == null || !sftpUtils.isConnected()){
// sftpLocal.set(new SftpContractUtils(properties));
// }
// return sftpLocal.get();
// }
/**
* 关闭通道
*
* @throws Exception
*/
public void closeChannel() {
if (null != channel) {
try {
channel.disconnect();
} catch (Exception e) {
logger.error("关闭SFTP通道发生异常:", e);
}
}
if (null != session) {
try {
session.disconnect();
} catch (Exception e) {
logger.error("SFTP关闭 session异常:", e);
}
}
}
/**
* 登陆SFTP服务器
* @return boolean
*/
public boolean login(SftpContractProperties properties) {
try {
JSch jsch = new JSch();
// 用户名
String contractUsername = properties.getUserName();
// 密码
String contractPassword = properties.getPassword();
// SFTP服务器端口
int sftpPort = properties.getPort();
// SFTP服务器IP地址
String sftpHost = properties.getHost();
session = jsch.getSession(contractUsername, sftpHost, sftpPort);
if(contractPassword != null){
session.setPassword(contractPassword);
}
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
/**
* 连接超时时间,单位毫秒
*/
int sftpTimeout = 5000;
session.setTimeout(sftpTimeout);
session.connect();
logger.debug("sftp session connected");
logger.debug("opening channel");
channel = (ChannelSftp)session.openChannel("sftp");
channel.connect();
logger.debug("connected successfully");
return true;
} catch (JSchException e) {
logger.error("sftp login failed",e);
return false;
}
}
/**
* 上传文件
* <p>
* 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
* <table border="1">
* <tr><td>当前目录</td><td>方法</td><td>参数:绝对路径/相对路径</td><td>上传后</td></tr>
* <tr><td>/</td><td>uploadFile("testA","upload.txt",new FileInputStream(new File("up.txt")))</td><td>相对路径</td><td>/testA/upload.txt</td></tr>
* <tr><td>/</td><td>uploadFile("testA/testA_B","upload.txt",new FileInputStream(new File("up.txt")))</td><td>相对路径</td><td>/testA/testA_B/upload.txt</td></tr>
* <tr><td>/</td><td>uploadFile("/testA/testA_B","upload.txt",new FileInputStream(new File("up.txt")))</td><td>绝对路径</td><td>/testA/testA_B/upload.txt</td></tr>
* </table>
* </p>
* @param pathName SFTP服务器目录
* @param fileName 服务器上保存的文件名
* @param input 输入文件流
* @return boolean
*/
public boolean uploadFile(String pathName,String fileName,InputStream input) throws SftpException {
String currentDir = currentDir();
if(!changeDir(pathName)){
return false;
}
try {
channel.put(input,fileName,ChannelSftp.OVERWRITE);
if(!existFile(fileName)){
logger.debug("upload failed");
return false;
}
logger.debug("upload successful");
return true;
} catch (SftpException e) {
logger.error("upload failed",e);
return false;
} finally {
changeDir(currentDir);
}
}
/**
* 上传文件
* <p>
* 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
* <table border="1">
* <tr><td>当前目录</td><td>方法</td><td>参数:绝对路径/相对路径</td><td>上传后</td></tr>
* <tr><td>/</td><td>uploadFile("testA","upload.txt","up.txt")</td><td>相对路径</td><td>/testA/upload.txt</td></tr>
* <tr><td>/</td><td>uploadFile("testA/testA_B","upload.txt","up.txt")</td><td>相对路径</td><td>/testA/testA_B/upload.txt</td></tr>
* <tr><td>/</td><td>uploadFile("/testA/testA_B","upload.txt","up.txt")</td><td>绝对路径</td><td>/testA/testA_B/upload.txt</td></tr>
* </table>
* </p>
* @param pathName SFTP服务器目录
* @param fileName 服务器上保存的文件名
* @param localFile 本地文件
* @return boolean
*/
public boolean uploadFile(String pathName,String fileName,String localFile) throws SftpException{
String currentDir = currentDir();
if(!changeDir(pathName)){
return false;
}
try {
channel.put(localFile,fileName,ChannelSftp.OVERWRITE);
if(!existFile(fileName)){
logger.debug("upload failed");
return false;
}
logger.debug("upload successful");
return true;
} catch (SftpException e) {
logger.error("upload failed",e);
return false;
} finally {
changeDir(currentDir);
}
}
/**
* 下载文件
* <p>
* 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
* <table border="1">
* <tr><td>当前目录</td><td>方法</td><td>参数:绝对路径/相对路径</td><td>下载后</td></tr>
* <tr><td>/</td><td>downloadFile("testA","down.txt","D:\\downDir")</td><td>相对路径</td><td>D:\\downDir\\down.txt</td></tr>
* <tr><td>/</td><td>downloadFile("testA/testA_B","down.txt","D:\\downDir")</td><td>相对路径</td><td>D:\\downDir\\down.txt</td></tr>
* <tr><td>/</td><td>downloadFile("/testA/testA_B","down.txt","D:\\downDir")</td><td>绝对路径</td><td>D:\\downDir\\down.txt</td></tr>
* </table>
* </p>
* @param remotePath SFTP服务器目录
* @param fileName 服务器上需要下载的文件名
* @param localPath 本地保存路径
* @return boolean
*/
public boolean downloadFile(String remotePath,String fileName,String localPath) throws SftpException{
String currentDir = currentDir();
if(!changeDir(remotePath)){
return false;
}
try {
String localFilePath = localPath + File.separator + fileName;
channel.get(fileName,localFilePath);
File localFile = new File(localFilePath);
if(!localFile.exists()){
logger.debug("download file failed");
return false;
}
logger.debug("download successful");
return true;
} catch (SftpException e) {
logger.error("download file failed",e);
return false;
} finally {
changeDir(currentDir);
}
}
/**
* 切换工作目录
* <p>
* 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
* <table border="1">
* <tr><td>当前目录</td><td>方法</td><td>参数(绝对路径/相对路径)</td><td>切换后的目录</td></tr>
* <tr><td>/</td><td>changeDir("testA")</td><td>相对路径</td><td>/testA/</td></tr>
* <tr><td>/</td><td>changeDir("testA/testA_B")</td><td>相对路径</td><td>/testA/testA_B/</td></tr>
* <tr><td>/</td><td>changeDir("/testA")</td><td>绝对路径</td><td>/testA/</td></tr>
* <tr><td>/testA/testA_B/</td><td>changeDir("/testA")</td><td>绝对路径</td><td>/testA/</td></tr>
* </table>
* </p>
* @param pathName 路径
* @return boolean
*/
public boolean changeDir(String pathName) throws SftpException{
if(pathName == null || pathName.trim().equals("")){
logger.error("invalid pathName");
return false;
}
logger.debug("changeDir【{}】,before:directory successfully changed,current dir=" + channel.pwd(),pathName);
try {
channel.cd(pathName.replaceAll("\\\\", "/"));
} catch (Exception e1) {
logger.error("cd dir error", e1);
pathName = pathName.replaceAll("\\\\", "/");
String[] folders = pathName.split( "/" );
for ( String folder : folders ) {
if ( folder.length() > 0 ) {
try {
channel.cd( folder );
}
catch ( SftpException e ) {
channel.mkdir( folder );
channel.cd( folder );
}
}
}
}
logger.debug("changeDir【{}】,after:directory successfully changed,current dir=" + channel.pwd(),pathName);
return true;
}
/**
* 切换到上一级目录
* <p>
* 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
* <table border="1">
* <tr><td>当前目录</td><td>方法</td><td>切换后的目录</td></tr>
* <tr><td>/testA/</td><td>changeToParentDir()</td><td>/</td></tr>
* <tr><td>/testA/testA_B/</td><td>changeToParentDir()</td><td>/testA/</td></tr>
* </table>
* </p>
* @return boolean
*/
public boolean changeToParentDir() throws SftpException{
return changeDir("..");
}
/**
* 切换到根目录
* @return boolean
*/
public boolean changeToHomeDir() throws SftpException{
String homeDir = null;
homeDir = channel.getHome();
return changeDir(homeDir);
}
/**
* 创建目录
* <p>
* 使用示例,SFTP服务器上的目录结构如下:/testA/testA_B/
* <table border="1">
* <tr><td>当前目录</td><td>方法</td><td>参数(绝对路径/相对路径)</td><td>创建成功后的目录</td></tr>
* <tr><td>/testA/testA_B/</td><td>makeDir("testA_B_C")</td><td>相对路径</td><td>/testA/testA_B/testA_B_C/</td></tr>
* <tr><td>/</td><td>makeDir("/testA/testA_B/testA_B_D")</td><td>绝对路径</td><td>/testA/testA_B/testA_B_D/</td></tr>
* </table>
* <br/>
* <b>注意</b>,当<b>中间目录不存在</b>的情况下,不能够使用绝对路径的方式期望创建中间目录及目标目录。
* 例如makeDir("/testNOEXIST1/testNOEXIST2/testNOEXIST3"),这是错误的。
* </p>
* @param dirName 目录
* @return boolean
*/
public boolean makeDir(String dirName){
try {
channel.mkdir(dirName);
logger.debug("directory successfully created,dir=" + dirName);
return true;
} catch (SftpException e) {
logger.error("failed to create directory", e);
return false;
}
}
/**
* 删除文件夹
* @param dirName
* @return boolean
*/
@SuppressWarnings("unchecked")
public boolean delDir(String dirName) throws SftpException{
if(!changeDir(dirName)){
return false;
}
Vector<ChannelSftp.LsEntry> list = null;
try {
list = channel.ls(channel.pwd());
} catch (SftpException e) {
logger.error("can not list directory",e);
return false;
}
for(ChannelSftp.LsEntry entry : list){
String fileName = entry.getFilename();
if(!fileName.equals(".") && !fileName.equals("..")){
if(entry.getAttrs().isDir()){
delDir(fileName);
} else {
delFile(fileName);
}
}
}
if(!changeToParentDir()){
return false;
}
try {
channel.rmdir(dirName);
logger.debug("directory " + dirName + " successfully deleted");
return true;
} catch (SftpException e) {
logger.error("failed to delete directory " + dirName,e);
return false;
}
}
/**
* 删除文件
* @param fileName 文件名
* @return boolean
*/
public boolean delFile(String fileName){
if(fileName == null || fileName.trim().equals("")){
logger.debug("invalid filename");
return false;
}
try {
channel.rm(fileName);
logger.debug("file " + fileName + " successfully deleted");
return true;
} catch (SftpException e) {
logger.error("failed to delete file " + fileName,e);
return false;
}
}
/**
* 当前目录下文件及文件夹名称列表
* @return String[]
*/
public String[] ls(){
return list(Filter.ALL);
}
/**
* 当前目录下文件及文件夹名称列表
* @return String[]
*/
public Optional<ChannelSftp.LsEntry> lsFile(String pathName, String fileName) throws SftpException {
if(!changeDir(pathName)){
logger.debug(" changeDir to pathName [{}] 失败",pathName);
return Optional.empty();
};
Vector<ChannelSftp.LsEntry> list = null;
try {
//ls方法会返回两个特殊的目录,当前目录(.)和父目录(..)
list = channel.ls(channel.pwd());
} catch (SftpException e) {
logger.error("can not list directory",e);
return Optional.empty();
}
for(ChannelSftp.LsEntry entry : list){
if(StringUtils.equals(entry.getFilename(), fileName)){
return Optional.of(entry);
}
}
return Optional.empty();
}
/**
* 指定目录下文件及文件夹名称列表
* @return String[]
*/
public String[] ls(String pathName) throws SftpException{
String currentDir = currentDir();
logger.debug("currentDir:{} , ls pathName:{} ",currentDir,pathName);
if(!changeDir(pathName)){
logger.debug(" changeDir to pathName [{}] 失败",pathName);
return new String[0];
};
String[] result = list(Filter.ALL);
if(!changeDir(currentDir)){
logger.debug(" changeDir to currentDir [{}] 失败",currentDir);
return new String[0];
}
logger.debug(" ls pathName result:{} ",JSON.toJSONString(result));
return result;
}
/**
* 当前目录下文件名称列表
* @return String[]
*/
public String[] lsFiles(){
return list(Filter.FILE);
}
/**
* 指定目录下文件名称列表
* @return String[]
*/
public String[] lsFiles(String pathName) throws SftpException{
String currentDir = currentDir();
if(!changeDir(pathName)){
return new String[0];
};
String[] result = list(Filter.FILE);
if(!changeDir(currentDir)){
return new String[0];
}
return result;
}
/**
* 当前目录下文件夹名称列表
* @return String[]
*/
public String[] lsDirs(){
return list(Filter.DIR);
}
/**
* 指定目录下文件夹名称列表
* @return String[]
*/
public String[] lsDirs(String pathName) throws SftpException{
String currentDir = currentDir();
if(!changeDir(pathName)){
return new String[0];
};
String[] result = list(Filter.DIR);
if(!changeDir(currentDir)){
return new String[0];
}
return result;
}
/**
* 当前目录是否存在文件或文件夹
* @param name 名称
* @return boolean
*/
public boolean exist(String name){
return exist(ls(), name);
}
/**
* 指定目录下,是否存在文件或文件夹
* @param path 目录
* @param name 名称
* @return boolean
*/
public boolean exist(String path,String name) throws SftpException{
return exist(ls(path),name);
}
/**
* 当前目录是否存在文件
* @param name 文件名
* @return boolean
*/
public boolean existFile(String name){
return exist(lsFiles(),name);
}
/**
* 指定目录下,是否存在文件
* @param path 目录
* @param name 文件名
* @return boolean
*/
public boolean existFile(String path,String name) throws SftpException{
return exist(lsFiles(path), name);
}
/**
* 当前目录是否存在文件夹
* @param name 文件夹名称
* @return boolean
*/
public boolean existDir(String name){
return exist(lsDirs(), name);
}
/**
* 指定目录下,是否存在文件夹
* @param path 目录
* @param name 文家夹名称
* @return boolean
*/
public boolean existDir(String path,String name) throws SftpException{
return exist(lsDirs(path), name);
}
/**
* 当前工作目录
* @return String
*/
public String currentDir() throws SftpException {
return channel.pwd();
}
/**
* 登出
*/
public void logout(){
if(channel != null){
channel.quit();
channel.disconnect();
}
if(session != null){
session.disconnect();
}
logger.debug("logout successfully");
}
//------private method ------
/** 枚举,用于过滤文件和文件夹 */
private enum Filter {/** 文件及文件夹 */ ALL ,/** 文件 */ FILE ,/** 文件夹 */ DIR };
/**
* 列出当前目录下的文件及文件夹
* @param filter 过滤参数
* @return String[]
*/
@SuppressWarnings("unchecked")
private String[] list(Filter filter){
Vector<ChannelSftp.LsEntry> list = null;
try {
//ls方法会返回两个特殊的目录,当前目录(.)和父目录(..)
list = channel.ls(channel.pwd());
} catch (SftpException e) {
logger.error("can not list directory",e);
return new String[0];
}
List<String> resultList = new ArrayList<String>();
for(ChannelSftp.LsEntry entry : list){
if(filter(entry, filter)){
resultList.add(entry.getFilename());
}
}
return resultList.toArray(new String[0]);
}
/**
* 判断是否是否过滤条件
* @param entry LsEntry
* @param f 过滤参数
* @return boolean
*/
private boolean filter(ChannelSftp.LsEntry entry, Filter f){
if(f.equals(Filter.ALL)){
return !".".equals(entry.getFilename()) && !"..".equals(entry.getFilename());
} else if(f.equals(Filter.FILE)){
return !".".equals(entry.getFilename()) && !"..".equals(entry.getFilename()) && !entry.getAttrs().isDir();
} else if(f.equals(Filter.DIR)){
return !".".equals(entry.getFilename()) && !"..".equals(entry.getFilename()) && entry.getAttrs().isDir();
}
return false;
}
/**
* 根目录
* @return String
*/
private String homeDir() throws SftpException {
return channel.getHome();
}
/**
* 判断字符串是否存在于数组中
* @param strArr 字符串数组
* @param str 字符串
* @return boolean
*/
private boolean exist(String[] strArr,String str){
if(strArr == null || strArr.length == 0){
return false;
}
if(str == null || "".equals(str.trim())){
return false;
}
for(String s : strArr){
if(s.equalsIgnoreCase(str)){
return true;
}
}
return false;
}
/**
* 远程打包zip,目标目录下的所有文件
* @param zipPathName 要打包的目录
* @param zipFileName 打包的文件名
* @return
* @throws SftpException
*/
public Boolean zipRemoteFiles(String zipPathName,String zipFileName) throws Exception{
if(!changeDir(zipPathName)){
return false;
}
String[] files = lsFiles();
if(files == null || files.length == 0){
return null;
}
String currentDir = currentDir();
changeDir(currentDir);
String comd = "zip -r -o -q "+currentDir+"/"+zipFileName+" "+currentDir+"/*";
String cmdRtn = execCmd(comd);
logger.info("currentDir:{} execCmd[{}] return:【{}】",currentDir,comd,cmdRtn);
return true;
}
/**
* 执行一条命令
*/
public String execCmd(String command) {
InputStream in = null;
BufferedReader reader = null;
Channel channel = null;
String buf = null;
try {
channel = session.openChannel("exec");
((ChannelExec) channel).setCommand(command);
channel.setInputStream(null);
((ChannelExec) channel).setErrStream(System.err);
channel.connect();
in = channel.getInputStream();
reader = new BufferedReader(new InputStreamReader(in, Charset.forName(charset)));
while ((buf = reader.readLine()) != null) {
System.out.println(buf);
}
channel.disconnect();
} catch (Exception e) {
logger.error(" SFTP execCmd ERROR:",e);
}
return buf;
}
}
SFTP
连接配置
配置 IP
,用户名,密码,连接池相关配置
package org.test.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = SftpContractProperties.PROJECT_PREFIX)
public class SftpContractProperties {
public static final String PROJECT_PREFIX = "hzero.sftp";
/**
* 用户名
*/
private String userName;
/**
* 密码
*/
private String password;
/**
* SFTP服务器端口
*/
private int port;
/**
* SFTP服务器IP地址
*/
private String host;
private SftpContractPoolProperties poolProperties;
public static class SftpContractPoolProperties {
private String poolPrefix = "hzero.sftp";
/**
* 最大空闲
*/
private int maxIdle = 5;
/**
* 最大总数
*/
private int maxTotal = 10;
/**
* 最小空闲
*/
private int minIdle = 0;
/**
* 初始化连接数
*/
private int initialSize = 3;
public String getPoolPrefix() {
return poolPrefix;
}
public SftpContractPoolProperties setPoolPrefix(String poolPrefix) {
this.poolPrefix = poolPrefix;
return this;
}
public int getMaxIdle() {
return maxIdle;
}
public SftpContractPoolProperties setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
return this;
}
public int getMaxTotal() {
return maxTotal;
}
public SftpContractPoolProperties setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
return this;
}
public int getMinIdle() {
return minIdle;
}
public SftpContractPoolProperties setMinIdle(int minIdle) {
this.minIdle = minIdle;
return this;
}
public int getInitialSize() {
return initialSize;
}
public SftpContractPoolProperties setInitialSize(int initialSize) {
this.initialSize = initialSize;
return this;
}
}
public String getUserName() {
return userName;
}
public SftpContractProperties setUserName(String userName) {
this.userName = userName;
return this;
}
public String getPassword() {
return password;
}
public SftpContractProperties setPassword(String password) {
this.password = password;
return this;
}
public int getPort() {
return port;
}
public SftpContractProperties setPort(int port) {
this.port = port;
return this;
}
public String getHost() {
return host;
}
public SftpContractProperties setHost(String host) {
this.host = host;
return this;
}
public SftpContractPoolProperties getPoolProperties() {
return poolProperties;
}
public SftpContractProperties setPoolProperties(SftpContractPoolProperties poolProperties) {
this.poolProperties = poolProperties;
return this;
}
}
SFTP
连接对象创建工厂
继承 commons-pool2
的 BasePooledObjectFactory
类,实现 create
, wrap
等方法就可以实现一个对象创建工厂。
package org.test.infra.util;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.test.config.SftpContractProperties;
public class SftpContractFactory extends BasePooledObjectFactory<SftpContractUtils> {
private SftpContractProperties properties;
public SftpContractFactory(SftpContractProperties properties){
this.properties = properties;
}
@Override
public SftpContractUtils create() throws Exception {
// 创建新对象
return SftpContractUtils.getSftpUtils(properties);
}
@Override
public PooledObject<SftpContractUtils> wrap(SftpContractUtils sftpContractUtils) {
// 池化对象
return new DefaultPooledObject<>(sftpContractUtils);
}
@Override
public void passivateObject(PooledObject<SftpContractUtils> p) throws Exception {
// 将对象返回池时进行的操作,此处将工作目录设置为根目录
p.getObject().changeToHomeDir();
}
@Override
public void destroyObject(PooledObject<SftpContractUtils> p) throws Exception {
p.getObject().closeChannel();
}
@Override
public boolean validateObject(PooledObject<SftpContractUtils> p) {
return p.getObject().isConnected();
}
}
SFTP
连接池
继承 GenericObjectPool
,连接池将工厂对象与配置对象结合。 GenericObjectPool
中有 borrowObject
, returnObject
等方法供我们获取,回归对象。
package org.test.infra.util;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.AbandonedConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
public class SftpContractPool extends GenericObjectPool<SftpContractUtils> {
public SftpContractPool(PooledObjectFactory<SftpContractUtils> factory) {
super(factory);
}
public SftpContractPool(PooledObjectFactory<SftpContractUtils> factory, GenericObjectPoolConfig config) {
super(factory, config);
}
public SftpContractPool(PooledObjectFactory<SftpContractUtils> factory, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) {
super(factory, config, abandonedConfig);
}
}
SFTP
连接池自动装配类
package org.test.autoconfigure;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.test.config.SftpContractProperties;
import org.stest.infra.util.SftpContractFactory;
import org.test.infra.util.SftpContractPool;
@EnableConfigurationProperties(SftpContractProperties.class)
@Configuration
public class SftpPoolAutoConfiguration {
private SftpContractPool pool;
private SftpContractProperties properties;
@Autowired
public SftpPoolAutoConfiguration(SftpContractProperties properties) {
this.properties = properties;
}
@ConditionalOnClass({SftpContractFactory.class})
@Bean
protected SftpContractPool faceSDKPool() {
SftpContractFactory faceSDKFactory = new SftpContractFactory(properties);
//设置对象池的相关参数
SftpContractProperties.SftpContractPoolProperties poolProperties = properties.getPoolProperties();
if (poolProperties == null) {
poolProperties = new SftpContractProperties.SftpContractPoolProperties();
}
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxIdle(poolProperties.getMaxIdle());
poolConfig.setMaxTotal(poolProperties.getMaxTotal());
poolConfig.setMinIdle(poolProperties.getMinIdle());
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);
poolConfig.setTimeBetweenEvictionRunsMillis(1000 * 60 * 30);
//一定要关闭jmx,不然springboot启动会报已经注册了某个jmx的错误
poolConfig.setJmxEnabled(false);
//新建一个对象池,传入对象工厂和配置
pool = new SftpContractPool(faceSDKFactory, poolConfig);
return pool;
}
}
通过管道流进行文件上传
实例:
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
new Thread(
() -> {
generate(out);
}
).start();
SftpContractUtils sftpContractUtils = sftpContractPool.borrowObject();
String fileName = "test1.txt";
String path = "/test"
boolean uploadFileSuccess = sftpContractUtils.uploadFile(path, fileName, in);
sftpContractPool.returnObject(sftpContractUtils);
注意:
管道输入流应该连接到管道输出流;管道输入流提供要写入管道输出流的所有数据字节。通常,数据由某个线程从
PipedInputStream
对象读取, 并由其他线程将其写入到相应的PipedOutputStream
。不建议对这两个对象尝试使用单个线程,因为这样可能死锁线程。管道输入流包含一个缓冲区,可在缓冲区限定的 范 围内将读操作和写操作分离开。如果向连接管道输出流提供数据字节的线程不再存在,则认为该管道已损坏。
PipedInputStream
的缓冲区大小默认是 1024 字节,管道的读写操作是互相阻塞的,当缓冲区为空时,读操作阻塞;当缓冲区满时,写操作阻塞。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于