Spring Integration, как динамически устанавливать локальный/удаленный каталог на основе сообщения для отправки на удаленный sftp или получения файлов из него

В настоящее время я пытаюсь внести изменения в проект, который использует интеграцию Spring для отправки и получения файлов с/на SFTP-сервер.

Изменения заключаются в добавлении других SFTP-серверов, выборе правильного сервера с условиями и обработке файлов таким же образом.

В настоящее время я борюсь с двумя вещами.

Первый :

У меня есть канал, сообщения которого содержат заголовок с удаленным каталогом, и мне нужно получить доступ к нужному сеансу SFTP. Но чтобы сделать сессию, мне нужны правильные свойства. (либо config1, либо config2, которые определены в моем application.yml) Я не уверен, как передать эту информацию моему ServiceActivator. (Второй TODO в моем коде)

Второй :

Мне нужно получить файлы с нескольких SFTP-серверов и сохранить эти файлы в нескольких локальных каталогах. Путь между удаленным и локальным доступом не одинаков и определяется в свойствах config1 и config2 так же, как я описал в своей первой проблеме. Я думаю, что правильно делегирую сеанс SFTP, но не знаю, как настроить localDirectory на основе сеанса SFTP. (Первый TODO в моем коде)

Если кто-то может мне немного помочь, я буду очень признателен.

Заранее спасибо.

Вот мой код:

    SftpConfig.Sftp1 sftpConfig1;
    SftpConfig.Sftp2 sftpConfig2;

    @Bean
    @BridgeTo
    public MessageChannel uploadChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public ExpressionParser spelExpressionParser() {
        return new SpelExpressionParser();
    }

    public SessionFactory<ChannelSftp.LsEntry> getSftpSession(SftpConfig sftp) {

        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftp.getHost());
        factory.setPort(sftp.getPort());
        factory.setUser(sftp.getUser());
        factory.setPassword(sftp.getPassword());
        factory.setAllowUnknownKeys(true);
        factory.setTimeout(sftp.getTimeout());
        log.info("timeout is set to: {}", sftp.getTimeout());

        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSf() {

        Map<Object, SessionFactory<ChannelSftp.LsEntry>> mapSession = new HashMap<>();
        mapSession.put("config1", getSftpSession(sftpConfig1));
        mapSession.put("config2", getSftpSession(sftpConfig2));

        SessionFactoryLocator<ChannelSftp.LsEntry> sessionFactoryLocator = new DefaultSessionFactoryLocator<>(mapSession);

        return new DelegatingSessionFactory<>(sessionFactoryLocator);
    }

    @Bean
    public RotatingServerAdvice advice() {

        List<RotationPolicy.KeyDirectory> keyDirectories = sftpConfig1.getCodes().stream()
                .map(code -> new RotationPolicy.KeyDirectory("config1", sftpConfig1.getReaderDirectory() + SEPARATOR + code))
                .collect(Collectors.toList());

        keyDirectories.addAll(sftpConfig2.getCodes().stream()
                .map(code -> new RotationPolicy.KeyDirectory("config2", sftpConfig2.getReaderDirectory() + SEPARATOR + code))
                .collect(Collectors.toList()));

        return new RotatingServerAdvice(delegatingSf(), keyDirectories);
    }

    @Bean
    public IntegrationFlow sftpIntegrationFlow() {
        return IntegrationFlows.from(
                        Sftp.inboundAdapter(delegatingSf())
                                .filter(new SftpSimplePatternFileListFilter("*.csv"))
                                .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                                .localFilter(new AbstractFileListFilter<File>() {
                                    @Override
                                    public boolean accept(final File file) {
                                        return file.getName().endsWith(".csv");
                                    }
                                })
                                .deleteRemoteFiles(false)
                                .temporaryFileSuffix(".new")
                                .localDirectory(new File()) // TODO dynamic local directory based on sftp session
                                .remoteDirectory("."),
                                    e -> e.poller(Pollers.fixedDelay(1, MINUTES).advice(advice()).advice(logNoFileFoundAdvice())))
                .log(LoggingHandler.Level.INFO, "[SFTP]", m -> "Received file: " + m.getHeaders().get(FileHeaders.FILENAME))
                .channel("filesReceptionChannel")
                .enrichHeaders(h -> h.header("errorChannel", "errorChannel"))
                .get();
    }

    @Bean
    public MethodInterceptor logNoFileFoundAdvice() {
        return invocation -> {
            Object result = invocation.proceed();
            if (result == null) {
                log.info("[SFTP] No files found");
            }
            return result;
        };
    }

    @Bean
    public SftpRemoteFileTemplate sftpTemplate() {
        return new SftpRemoteFileTemplate(sftpSession());
    }

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSession() {
        return getSftpSession(); // TODO dynamic sftp session based on message received in serviceActivator bellow
    }

    @Bean
    @ServiceActivator(inputChannel = "uploadChannel")
    public MessageHandler uploadHandler() {
        return getFtpMessageHandler(sftpSession());
    }

    public MessageHandler getFtpMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSession) {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSession);
        handler.setRemoteDirectoryExpressionString("headers['remoteDirectory']");
        handler.setFileNameGenerator(message -> {
            if (message.getPayload() instanceof File) {
                return ((File) message.getPayload()).getName();
            } else {
                throw new IllegalArgumentException("File expected as payload.");
            }
        });
        handler.setUseTemporaryFileName(false);
        return handler;
    }

РЕДАКТИРОВАТЬ :

С вашей помощью мне удалось исправить первую часть, так что большое спасибо.

Чтобы это исправить, я сначала добавил фабрику по умолчанию в метод delegatingSf().

Затем я использовал delegatingSf() в serviceActivator вместо sftpSession(), который удалил.

Наконец, в методе, который отправляет сообщение на мой uploadChannel, я использовал delegatingSf.setThreadKey(mapSessionKey) для динамического выбора правильной фабрики в методе, который выводит сообщение на мой uploadChannel. И конечно delegatingSf.clearThreadKey() сразу после отправки.

Я постараюсь сделать вторую часть функциональной, а код исправления выложу после.

Обновлено еще раз:

Я использовал localFileNameExpression, как было предложено, и динамически восстановил localDirectory, но меня это не устраивает, поскольку, на мой взгляд, это не очень хороший способ сделать это. Я использовал remoteDirectory для идентификации SFTP-сервера, но если у двух серверов однажды будет одинаковый путь, он не будет работать. Но пока это работает, так что спасибо за помощь.

SftpConfig.Sftp1 sftpConfig1;
SftpConfig.Sftp2 sftpConfig2;

@Bean
@BridgeTo
public MessageChannel uploadChannel() {
    return new PublishSubscribeChannel();
}

@Bean
public ExpressionParser spelExpressionParser() {
    return new SpelExpressionParser();
}

public SessionFactory<ChannelSftp.LsEntry> getSftpSession(SftpConfig sftp) {

    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(sftp.getHost());
    factory.setPort(sftp.getPort());
    factory.setUser(sftp.getUser());
    factory.setPassword(sftp.getPassword());
    factory.setAllowUnknownKeys(true);
    factory.setTimeout(sftp.getTimeout());
    log.info("timeout is set to: {}", sftp.getTimeout());

    return new CachingSessionFactory<>(factory);
}

@Bean
public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSf() {

    Map<Object, SessionFactory<ChannelSftp.LsEntry>> mapSession = new HashMap<>();
    mapSession.put("config1", getSftpSession(sftpConfig1));
    mapSession.put("config2", getSftpSession(sftpConfig2));

    SessionFactoryLocator<ChannelSftp.LsEntry> sessionFactoryLocator = new DefaultSessionFactoryLocator<>(mapSession, mapSession.get("config1"));

    return new DelegatingSessionFactory<>(sessionFactoryLocator);
}

@Bean
public RotatingServerAdvice advice() {

    List<RotationPolicy.KeyDirectory> keyDirectories = sftpConfig1.getCodes().stream()
            .map(code -> new RotationPolicy.KeyDirectory("config1", sftpConfig1.getReaderDirectory() + SEPARATOR + code))
            .collect(Collectors.toList());

    keyDirectories.addAll(sftpConfig2.getCodes().stream()
            .map(code -> new RotationPolicy.KeyDirectory("config2", sftpConfig2.getReaderDirectory() + SEPARATOR + code))
            .collect(Collectors.toList()));

    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

@Bean
public IntegrationFlow sftpIntegrationFlow() {
    return IntegrationFlows.from(
                    Sftp.inboundAdapter(delegatingSf())
                            .filter(new SftpSimplePatternFileListFilter("*.csv"))
                            .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                            .localFilter(new AbstractFileListFilter<File>() {
                                @Override
                                public boolean accept(final File file) {
                                    return file.getName().endsWith(".csv");
                                }
                            })
                            .deleteRemoteFiles(false)
                            .temporaryFileSuffix(".new")
                            .localFilenameExpression("@sftpEIPConfig.getLocalDirectoryReader(#remoteDirectory) + #this")
                            .localDirectory(new File("/"))
                            .remoteDirectory("."),
                                e -> e.poller(Pollers.fixedDelay(1, MINUTES).advice(advice()).advice(logNoFileFoundAdvice())))
            .log(LoggingHandler.Level.INFO, "[SFTP]", m -> "Received file: " + m.getHeaders().get(FileHeaders.FILENAME))
            .channel("filesReceptionChannel")
            .enrichHeaders(h -> h.header("errorChannel", "errorChannel"))
            .get();
}

@Bean
public MethodInterceptor logNoFileFoundAdvice() {
    return invocation -> {
        Object result = invocation.proceed();
        if (result == null) {
            log.info("[SFTP] No files found");
        }
        return result;
    };
}

@Bean
public SftpRemoteFileTemplate sftpTemplate() {
    return new SftpRemoteFileTemplate(delegatingSf());
}

@Bean
@ServiceActivator(inputChannel = "uploadChannel")
public MessageHandler uploadHandler() {
    return getFtpMessageHandler(delegatingSf());
}

public MessageHandler getFtpMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSession) {
    SftpMessageHandler handler = new SftpMessageHandler(sftpSession);
    handler.setRemoteDirectoryExpressionString("headers['remoteDirectory']");
    handler.setFileNameGenerator(message -> {
        if (message.getPayload() instanceof File) {
            return ((File) message.getPayload()).getName();
        } else {
            throw new IllegalArgumentException("File expected as payload.");
        }
    });
    handler.setUseTemporaryFileName(false);
    return handler;
}
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
53
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

localDirectory нельзя изменить на AbstractInboundFileSynchronizingMessageSource. Вы можете использовать localFilenameExpression() для динамического создания каталога. Хотя localDirectory может быть просто корнем — /.

Для отправки на разные SFTP-серверы обязательно используйте DelegatingSessionFactory. Дополнительную информацию см. в документации: https://docs.spring.io/spring-integration/reference/sftp/dsf.html . А также: https://docs.spring.io/spring-integration/reference/handler-advice/context-holder.html

Большое спасибо ! Мне удалось отправить файл по двум разным SFTP с помощью делегирующейSessionFactory. Я не знаю, почему я не попробовал это, учитывая, что я уже использовал его для чтения с этих SFTP. Теперь мне нужно изменить часть inboundAdapter для localDir. Я обновлю пост первым решением. Еще раз спасибо :)

Djd0 08.07.2024 18:14

Другие вопросы по теме