SSHJ SFTPException: превышено максимальное количество одновременных передач для текущего контекста

Я продолжаю получать

SFTPException: превышено максимальное количество одновременных передач для текущего контекста

при обработке файлов

Я использую последние зависимости SSHJ v0.38.0 с Spring Boot 3.0.3.

<dependency>
    <groupId>com.hierynomus</groupId>
    <artifactId>sshj</artifactId>
    <version>0.38.0</version>
</dependency>

Ниже приведен мой полный журнал ошибок и исключений.

2024-06-05T14:32:34.858+08:00 | INFO  | Now checking /043/Output/FILE//FILE2024-05-30-08-20-22.txt - com.vendor.robbijournal.service.SftpService.processFiles:154 [scheduling-1:16216]
2024-06-05T14:32:34.858+08:00 | DEBUG | Now read the content of file FILE2024-05-30-08-20-22.txt | current 6 / remaining 6 / total 12 - com.vendor.robbijournal.service.SftpService.processFiles:157 [scheduling-1:16216]
2024-06-05T14:32:34.859+08:00 | DEBUG | Opening `/043/Output/FILE/FILE2024-05-30-08-20-22.txt` - net.schmizz.sshj.sftp.SFTPClient.open:75 [scheduling-1:16216]
2024-06-05T14:32:35.097+08:00 | DEBUG | Sending close - net.schmizz.sshj.connection.channel.direct.SessionChannel.sendClose:287 [scheduling-1:16216]
2024-06-05T14:32:35.336+08:00 | DEBUG | Got chan request for `exit-status` - net.schmizz.sshj.connection.channel.direct.SessionChannel.gotChannelRequest:334 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.337+08:00 | DEBUG | Got close - net.schmizz.sshj.connection.channel.direct.SessionChannel.gotClose:220 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.339+08:00 | DEBUG | Forgetting `session` channel (#2) - net.schmizz.sshj.connection.ConnectionImpl.forget:89 [sshj-Reader-xfer.robbi.com/127.0.0.1:22-1717568411096:16216]
2024-06-05T14:32:35.340+08:00 | ERROR | We caught error : Maximum concurrent transfers exceeded for the current context - com.vendor.robbijournal.service.SftpService.fetchAndStoreFiles:137 [scheduling-1:16216]
net.schmizz.sshj.sftp.SFTPException: Maximum concurrent transfers exceeded for the current context
    at net.schmizz.sshj.sftp.Response.error(Response.java:140)
    at net.schmizz.sshj.sftp.Response.ensurePacketTypeIs(Response.java:117)
    at net.schmizz.sshj.sftp.SFTPEngine.open(SFTPEngine.java:169)
    at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:76)
    at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:81)
    at net.schmizz.sshj.sftp.SFTPClient.open(SFTPClient.java:86)
    at com.vendor.robbijournal.service.SftpService.processFiles(SftpService.java:158)
    at com.vendor.robbijournal.service.SftpService.fetchAndStoreFiles(SftpService.java:118)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:354)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:392)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:720)
    at com.vendor.robbijournal.service.SftpService$$SpringCGLIB$$0.fetchAndStoreFiles(<generated>)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.runInternal(ScheduledMethodRunnable.java:130)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.lambda$run$2(ScheduledMethodRunnable.java:124)
    at io.micrometer.observation.Observation.observe(Observation.java:499)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:124)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)

Ниже мой класс обслуживания

package com.vendor.robbijournal.service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;

import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import com.vendor.robbijournal.constant.WhichXferFileStatus;
import com.vendor.robbijournal.constant.WhichXferFileType;
import com.vendor.robbijournal.model.Flag;
import com.vendor.robbijournal.model.Sol;
import com.vendor.robbijournal.model.XferFtpFiles;
import com.vendor.robbijournal.repository.FlagRepository;
import com.vendor.robbijournal.repository.XferFtpFilesRepository;
import com.vendor.robbijournal.utils.FilePermissionUtil;
import com.vendor.robbijournal.utils.LoggerFactoryUtil;

import ch.qos.logback.classic.Logger;
import jakarta.transaction.Transactional;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.sftp.FileAttributes;
import net.schmizz.sshj.sftp.RemoteResourceInfo;
import net.schmizz.sshj.sftp.SFTPClient;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;

@Service
@Transactional
public class SftpService {

    private static final Logger LOG = LoggerFactoryUtil.getLogger();

    @Autowired
    private SSHClient sshClient;

    @Autowired
    private XferFtpFilesRepository repository;

    @Autowired
    private FlagRepository flagRepository;

    @Value("${spring.profiles.active}")
    private String activeProfile;

    @Value("${spring.jpa.properties.hibernate.jdbc.time_zone}")
    private String hibernateTimeZone;

    @Value("${sftp.host}")
    private String host;

    @Value("${sftp.port}")
    private int port;

    @Value("${sftp.username}")
    private String username;

    @Value("${sftp.password}")
    private String password;

    private String[] DIRECTORIES = {  
            "/043/Output/FILE/" 
    };

    @Scheduled(fixedRate = 60000) // Run every 60 seconds
    //@Scheduled(fixedRate = 600000) // run every hour
    public void fetchAndStoreFiles() throws IOException {
        LOG.trace("Starting to fetch files from SFTP server..");
        try {
            // find Flag record SFTP_FETCHING
            List<Flag> flag = flagRepository.findByKey("SFTP_FETCHING");

            if (flag.size() == 0) {
                Flag f = new Flag();
                f.setKey("SFTP_FETCHING");
                f.setValue1("ON");
                f.setTimeZone(hibernateTimeZone);
                flagRepository.save(f);
            }

            if (flag.size() > 0) {
                Flag f = flag.get(0);
                if (f.getValue1().equalsIgnoreCase("ON")) {
                    LOG.trace("SFTP_FETCHING in progress..abort this process");
                    return; // Abort the current execution
                }
                f.setValue1("ON");
                flagRepository.save(f);
            }

            // Check if FTP session is active
            LOG.debug("sshClient.isConnected() = " + sshClient.isConnected());
            if (!sshClient.isConnected()) {
                LOG.debug("Look like not connected to SFTP server.. let reconnecting..");
                sshClient = new SSHClient();
                sshClient.addHostKeyVerifier(new PromiscuousVerifier());
                // Configure your SSH connection details here
                sshClient.connect(host, port);
                // KeyProvider keys = sshClient.loadKeys("path/to/private/key", "keyPassword");
                // sshClient.authPublickey("username", keys);
                LOG.debug("Connected to SFTP server: " + host + ":" + port + "using username: " + username+ " and password: " + password);
                sshClient.authPassword(username, password);
            }

            LOG.debug("sshClient.isAuthenticated() = " + sshClient.isAuthenticated());

            try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
                for (String directory : DIRECTORIES) {
                    processFiles(sftpClient, directory);
                }
                LOG.debug("Finish checked {} directories!",DIRECTORIES.length);
                sftpClient.close();
                LOG.debug("sftpClient.close()");
            }

            sshClient.disconnect();
            LOG.debug("Done fetching file and sshClient.isConnected() = " + sshClient.isConnected() + "! ");

            // find Flag record
            flag = flagRepository.findByKey("SFTP_FETCHING");
            if (flag.size() > 0) {
                Flag f = flag.get(0);
                f.setValue1("OFF");
                flagRepository.save(f);
            }

        } catch (Exception e) {
            LOG.error("We caught error : {}", e.getMessage(), e);
            List<Flag> flag = flagRepository.findByKey("SFTP_FETCHING");
            flag = flagRepository.findByKey("SFTP_FETCHING");
            if (flag.size() > 0) {
                Flag f = flag.get(0);
                f.setValue1("OFF");
                flagRepository.save(f);
            }
            e.printStackTrace();
        }
    }

    private void processFiles(SFTPClient sftpClient, String directory) throws IOException {
        LOG.info("Let check {}", directory);
        List<RemoteResourceInfo> files = sftpClient.ls(directory);
        int fileNumber = 0;
        for (RemoteResourceInfo entry : files) {
            LOG.info("Now checking {}/{}", directory, entry.getName());
            if (!entry.isDirectory()) {
                fileNumber = fileNumber+1;
                LOG.debug("Now read the content of file {} | current {} / remaining {} / total {}", entry.getName(), fileNumber, files.size() - fileNumber, files.size());
                try (InputStream inputStream = sftpClient.open(entry.getPath()).new RemoteFileInputStream()) {
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    /*
                     * 1024  byte = 1K
                     * 8192  byte = 8K
                     * 16384 byte = 16K
                     */
                    byte[] buffer = new byte[16384];
                    int readCount;

                    while ((readCount = inputStream.read(buffer)) > 0) {
                        byteArrayOutputStream.write(buffer, 0, readCount);
                    }
                    String fileContent = byteArrayOutputStream.toString(StandardCharsets.UTF_8);
                    // Count the number of lines in the file content
                    long lineCount = fileContent.lines().count();
                    String md5sum = DigestUtils.md5Hex(byteArrayOutputStream.toByteArray());

                    // Check if a file with the same MD5 already exists
                    Optional<XferFtpFiles> existingFile = repository.findByMd5sum(md5sum);
                    if (existingFile.isPresent()) {
                        if (entry.getName().equalsIgnoreCase(existingFile.get().getFilename())) {
                            LOG.warn("File {} with the same MD5 {} already exists, now skip this file", entry.getName(),
                                    md5sum);
                        } else {
                            LOG.warn("File {} with MD5 {} have same content as {}, now skip this file", entry.getName(),
                                    md5sum, existingFile.get().getFilename());
                        }
                        continue;
                    }

                    // Logic to determine version : Check if a file with the same filename already
                    // exists
                    List<XferFtpFiles> existingFiles = repository.findByFilename(entry.getName());
                    Integer newVersion = 1;

                    if (!existingFiles.isEmpty()) {
                        LOG.debug("Seem we have {} record with same file name {}", existingFiles.size(),
                                entry.getName());
                        List<XferFtpFiles> maxVersionOptList = repository
                                .findByFilenameOrderByVersionAsc(entry.getName());

                        for (XferFtpFiles maxVersionOpt : maxVersionOptList) {
                            newVersion = maxVersionOpt.getVersion();
                            maxVersionOpt.setStatus(WhichXferFileStatus.IGNORED_USE_NEW_VERSION);
                            repository.save(maxVersionOpt);

                            // find Sol and update the status also
                            List<Sol> solList = maxVersionOpt.getSol();
                            for (Sol sol : solList) {
                                // sol.setStatus(WhichXferFileStatus.IGNORED_USE_NEW_VERSION);
                                // repository.save(sol);
                            }
                        }
                        newVersion = newVersion + 1;
                        LOG.warn("Seems {} have an update, set new update as version {}", entry.getName(), newVersion);
                    }

                    FileAttributes attrs = entry.getAttributes();
                    String permissions = FilePermissionUtil.getPermissionString(attrs);
                    XferFtpFiles xferFtpFile = new XferFtpFiles();
                    xferFtpFile.setHost(sshClient.getRemoteHostname());
                    xferFtpFile.setPort(Integer.toString(sshClient.getRemotePort()));
                    xferFtpFile.setDirectory(directory);
                    xferFtpFile.setPermission(permissions);
                    xferFtpFile.setFilename(entry.getName());
                    xferFtpFile.setPayload(fileContent);
                    xferFtpFile.setPayloadLine((int) lineCount);
                    xferFtpFile.setMd5sum(md5sum);
                    xferFtpFile.setVersion(newVersion);
                    xferFtpFile.setTs_ftp(new Timestamp((long) attrs.getMtime() * 1000));
                    xferFtpFile.setTimeZone(hibernateTimeZone);
                    xferFtpFile.setStatus(WhichXferFileStatus.NEW);
                    xferFtpFile.setFileType(WhichXferFileType.TO_CHECK);
                    xferFtpFile.setFilesize(attrs.getSize());
                    // Save the file to DB
                    repository.save(xferFtpFile);
                }
            }
        }
    }
}

и это компонент для SSHClient

package com.vendor.robbijournal.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import net.schmizz.sshj.SSHClient;

@Configuration
public class SshClientConfig {
    @Bean
    public SSHClient sshClient() {
        SSHClient sshClient = new SSHClient();
        // Additional configuration if needed
        return sshClient;
    }
}
package com.vendor.robbijournal.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import net.schmizz.sshj.SSHClient;

@Configuration
public class SshClientConfig {
    @Bean
    public SSHClient sshClient() {
        SSHClient sshClient = new SSHClient();
        // Additional configuration if needed
        return sshClient;
    }
}
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
54
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я не знаю sshj (на самом деле и Java), но, проверив исходный код sshj, не похоже, что RemoteFileOutputStream.close действительно закрывает удаленный файл.

Это RemoteFile.close только закрывает его.

Поэтому вам нужно убедиться, что и поток, и файл закрыты:

try (
    RemoteFile remoteFile = sftpClient.open(entry.getPath());
    InputStream inputStream = remoteFile.new RemoteFileInputStream()
) {

Кажется, попытка с ресурсами работает, поскольку она автоматически закроет как удаленный файл, так и входной поток. Спасибо

Robbi Nespu 09.06.2024 23:29

Другие вопросы по теме