Я продолжаю получать
SFTPException: превышено максимальное количество одновременных передач для текущего контекста
при обработке файлов
Я использую последние зависимости SSHJ v0.38.0 с Spring Boot 3.0.3.
<dependency>
<groupId>com.hierynomus</groupId>
<artifactId>sshj</artifactId>
<version>0.38.0</version>
</dependency>
Ниже приведен мой полный журнал ошибок и исключений.
2024-06-05T14:32:34.858+08:00 | INFO | Now checking /043/Output/FILE//FILE2024-05-30-08-20-22.txt - com.vendor.robbijournal.service.SftpService.processFiles:154 [scheduling-1:16216]
2024-06-05T14:32:34.858+08:00 | DEBUG | Now read the content of file FILE2024-05-30-08-20-22.txt | current 6 / remaining 6 / total 12 - com.vendor.robbijournal.service.SftpService.processFiles:157 [scheduling-1:16216]
2024-06-05T14:32:34.859+08:00 | DEBUG | Opening `/043/Output/FILE/FILE2024-05-30-08-20-22.txt` - net.schmizz.sshj.sftp.SFTPClient.open:75 [scheduling-1:16216]
2024-06-05T14:32:35.097+08:00 | DEBUG | Sending close - net.schmizz.sshj.connection.channel.direct.SessionChannel.sendClose:287 [scheduling-1:16216]
2024-06-05T14:32:35.336+08:00 | DEBUG | Got chan request for `exit-status` - net.schmizz.sshj.connection.channel.direct.SessionChannel.gotChannelRequest:334 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.337+08:00 | DEBUG | Got close - net.schmizz.sshj.connection.channel.direct.SessionChannel.gotClose:220 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.339+08:00 | DEBUG | Forgetting `session` channel (#2) - net.schmizz.sshj.connection.ConnectionImpl.forget:89 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.340+08:00 | ERROR | We caught error : Maximum concurrent transfers exceeded for the current context - com.vendor.robbijournal.service.SftpService.fetchAndStoreFiles:137 [scheduling-1:16216]
net.schmizz.sshj.sftp.SFTPException: Maximum concurrent transfers exceeded for the current context
at net.schmizz.sshj.sftp.Response.error(Response.java:140)
at net.schmizz.sshj.sftp.Response.ensurePacketTypeIs(Response.java:117)
at net.schmizz.sshj.sftp.SFTPEngine.open(SFTPEngine.java:169)
at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:76)
at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:81)
at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:86)
at com.vendor.robbijournal.service.SftpService.processFiles(SftpService.java:158)
at com.vendor.robbijournal.service.SftpService.fetchAndStoreFiles(SftpService.java:118)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:354)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:392)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:720)
at com.vendor.robbijournal.service.SftpService$$SpringCGLIB$$0.fetchAndStoreFiles(<generated>)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.springframework.scheduling.support.ScheduledMethodRunnable.runInternal(ScheduledMethodRunnable.java:130)
at org.springframework.scheduling.support.ScheduledMethodRunnable.lambda$run$2(ScheduledMethodRunnable.java:124)
at io.micrometer.observation.Observation.observe(Observation.java:499)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:124)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Ниже мой класс обслуживания
package com.vendor.robbijournal.service;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.vendor.robbijournal.constant.WhichXferFileStatus;
import com.vendor.robbijournal.constant.WhichXferFileType;
import com.vendor.robbijournal.model.Flag;
import com.vendor.robbijournal.model.Sol;
import com.vendor.robbijournal.model.XferFtpFiles;
import com.vendor.robbijournal.repository.FlagRepository;
import com.vendor.robbijournal.repository.XferFtpFilesRepository;
import com.vendor.robbijournal.utils.FilePermissionUtil;
import com.vendor.robbijournal.utils.LoggerFactoryUtil;
import ch.qos.logback.classic.Logger;
import jakarta.transaction.Transactional;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.sftp.FileAttributes;
import net.schmizz.sshj.sftp.RemoteResourceInfo;
import net.schmizz.sshj.sftp.SFTPClient;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
@Service
@Transactional
public class SftpService {
private static final Logger LOG = LoggerFactoryUtil.getLogger();
@Autowired
private SSHClient sshClient;
@Autowired
private XferFtpFilesRepository repository;
@Autowired
private FlagRepository flagRepository;
@Value("${spring.profiles.active}")
private String activeProfile;
@Value("${spring.jpa.properties.hibernate.jdbc.time_zone}")
private String hibernateTimeZone;
@Value("${sftp.host}")
private String host;
@Value("${sftp.port}")
private int port;
@Value("${sftp.username}")
private String username;
@Value("${sftp.password}")
private String password;
private String[] DIRECTORIES = {
"/043/Output/FILE/"
};
@Scheduled(fixedRate = 60000) // Run every 60 seconds
//@Scheduled(fixedRate = 600000) // run every hour
public void fetchAndStoreFiles() throws IOException {
LOG.trace("Starting to fetch files from SFTP server..");
try {
// find Flag record SFTP_FETCHING
List<Flag> flag = flagRepository.findByKey("SFTP_FETCHING");
if (flag.size() == 0) {
Flag f = new Flag();
f.setKey("SFTP_FETCHING");
f.setValue1("ON");
f.setTimeZone(hibernateTimeZone);
flagRepository.save(f);
}
if (flag.size() > 0) {
Flag f = flag.get(0);
if (f.getValue1().equalsIgnoreCase("ON")) {
LOG.trace("SFTP_FETCHING in progress..abort this process");
return; // Abort the current execution
}
f.setValue1("ON");
flagRepository.save(f);
}
// Check if FTP session is active
LOG.debug("sshClient.isConnected() = " + sshClient.isConnected());
if (!sshClient.isConnected()) {
LOG.debug("Look like not connected to SFTP server.. let reconnecting..");
sshClient = new SSHClient();
sshClient.addHostKeyVerifier(new PromiscuousVerifier());
// Configure your SSH connection details here
sshClient.connect(host, port);
// KeyProvider keys = sshClient.loadKeys("path/to/private/key", "keyPassword");
// sshClient.authPublickey("username", keys);
LOG.debug("Connected to SFTP server: " + host + ":" + port + "using username: " + username+ " and password: " + password);
sshClient.authPassword(username, password);
}
LOG.debug("sshClient.isAuthenticated() = " + sshClient.isAuthenticated());
try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
for (String directory : DIRECTORIES) {
processFiles(sftpClient, directory);
}
LOG.debug("Finish checked {} directories!",DIRECTORIES.length);
sftpClient.close();
LOG.debug("sftpClient.close()");
}
sshClient.disconnect();
LOG.debug("Done fetching file and sshClient.isConnected() = " + sshClient.isConnected() + "! ");
// find Flag record
flag = flagRepository.findByKey("SFTP_FETCHING");
if (flag.size() > 0) {
Flag f = flag.get(0);
f.setValue1("OFF");
flagRepository.save(f);
}
} catch (Exception e) {
LOG.error("We caught error : {}", e.getMessage(), e);
List<Flag> flag = flagRepository.findByKey("SFTP_FETCHING");
flag = flagRepository.findByKey("SFTP_FETCHING");
if (flag.size() > 0) {
Flag f = flag.get(0);
f.setValue1("OFF");
flagRepository.save(f);
}
e.printStackTrace();
}
}
private void processFiles(SFTPClient sftpClient, String directory) throws IOException {
LOG.info("Let check {}", directory);
List<RemoteResourceInfo> files = sftpClient.ls(directory);
int fileNumber = 0;
for (RemoteResourceInfo entry : files) {
LOG.info("Now checking {}/{}", directory, entry.getName());
if (!entry.isDirectory()) {
fileNumber = fileNumber+1;
LOG.debug("Now read the content of file {} | current {} / remaining {} / total {}", entry.getName(), fileNumber, files.size() - fileNumber, files.size());
try (InputStream inputStream = sftpClient.open(entry.getPath()).new RemoteFileInputStream()) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
/*
* 1024 byte = 1K
* 8192 byte = 8K
* 16384 byte = 16K
*/
byte[] buffer = new byte[16384];
int readCount;
while ((readCount = inputStream.read(buffer)) > 0) {
byteArrayOutputStream.write(buffer, 0, readCount);
}
String fileContent = byteArrayOutputStream.toString(StandardCharsets.UTF_8);
// Count the number of lines in the file content
long lineCount = fileContent.lines().count();
String md5sum = DigestUtils.md5Hex(byteArrayOutputStream.toByteArray());
// Check if a file with the same MD5 already exists
Optional<XferFtpFiles> existingFile = repository.findByMd5sum(md5sum);
if (existingFile.isPresent()) {
if (entry.getName().equalsIgnoreCase(existingFile.get().getFilename())) {
LOG.warn("File {} with the same MD5 {} already exists, now skip this file", entry.getName(),
md5sum);
} else {
LOG.warn("File {} with MD5 {} have same content as {}, now skip this file", entry.getName(),
md5sum, existingFile.get().getFilename());
}
continue;
}
// Logic to determine version : Check if a file with the same filename already
// exists
List<XferFtpFiles> existingFiles = repository.findByFilename(entry.getName());
Integer newVersion = 1;
if (!existingFiles.isEmpty()) {
LOG.debug("Seem we have {} record with same file name {}", existingFiles.size(),
entry.getName());
List<XferFtpFiles> maxVersionOptList = repository
.findByFilenameOrderByVersionAsc(entry.getName());
for (XferFtpFiles maxVersionOpt : maxVersionOptList) {
newVersion = maxVersionOpt.getVersion();
maxVersionOpt.setStatus(WhichXferFileStatus.IGNORED_USE_NEW_VERSION);
repository.save(maxVersionOpt);
// find Sol and update the status also
List<Sol> solList = maxVersionOpt.getSol();
for (Sol sol : solList) {
// sol.setStatus(WhichXferFileStatus.IGNORED_USE_NEW_VERSION);
// repository.save(sol);
}
}
newVersion = newVersion + 1;
LOG.warn("Seems {} have an update, set new update as version {}", entry.getName(), newVersion);
}
FileAttributes attrs = entry.getAttributes();
String permissions = FilePermissionUtil.getPermissionString(attrs);
XferFtpFiles xferFtpFile = new XferFtpFiles();
xferFtpFile.setHost(sshClient.getRemoteHostname());
xferFtpFile.setPort(Integer.toString(sshClient.getRemotePort()));
xferFtpFile.setDirectory(directory);
xferFtpFile.setPermission(permissions);
xferFtpFile.setFilename(entry.getName());
xferFtpFile.setPayload(fileContent);
xferFtpFile.setPayloadLine((int) lineCount);
xferFtpFile.setMd5sum(md5sum);
xferFtpFile.setVersion(newVersion);
xferFtpFile.setTs_ftp(new Timestamp((long) attrs.getMtime() * 1000));
xferFtpFile.setTimeZone(hibernateTimeZone);
xferFtpFile.setStatus(WhichXferFileStatus.NEW);
xferFtpFile.setFileType(WhichXferFileType.TO_CHECK);
xferFtpFile.setFilesize(attrs.getSize());
// Save the file to DB
repository.save(xferFtpFile);
}
}
}
}
}
и это компонент для SSHClient
package com.vendor.robbijournal.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import net.schmizz.sshj.SSHClient;
@Configuration
public class SshClientConfig {
@Bean
public SSHClient sshClient() {
SSHClient sshClient = new SSHClient();
// Additional configuration if needed
return sshClient;
}
}
package com.vendor.robbijournal.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import net.schmizz.sshj.SSHClient;
@Configuration
public class SshClientConfig {
@Bean
public SSHClient sshClient() {
SSHClient sshClient = new SSHClient();
// Additional configuration if needed
return sshClient;
}
}
Я не знаю sshj (на самом деле и Java), но, проверив исходный код sshj, не похоже, что RemoteFileOutputStream.close
действительно закрывает удаленный файл.
Это RemoteFile.close
только закрывает его.
Поэтому вам нужно убедиться, что и поток, и файл закрыты:
try (
RemoteFile remoteFile = sftpClient.open(entry.getPath());
InputStream inputStream = remoteFile.new RemoteFileInputStream()
) {
Кажется, попытка с ресурсами работает, поскольку она автоматически закроет как удаленный файл, так и входной поток. Спасибо