В настоящее время я пытаюсь внести изменения в проект, который использует интеграцию Spring для отправки и получения файлов с/на SFTP-сервер.
Изменения заключаются в добавлении других SFTP-серверов, выборе правильного сервера с условиями и обработке файлов таким же образом.
В настоящее время я борюсь с двумя вещами.
Первый :
У меня есть канал, сообщения которого содержат заголовок с удаленным каталогом, и мне нужно получить доступ к нужному сеансу SFTP. Но чтобы сделать сессию, мне нужны правильные свойства. (либо config1, либо config2, которые определены в моем application.yml) Я не уверен, как передать эту информацию моему ServiceActivator. (Второй TODO в моем коде)
Второй :
Мне нужно получить файлы с нескольких SFTP-серверов и сохранить эти файлы в нескольких локальных каталогах. Путь между удаленным и локальным доступом не одинаков и определяется в свойствах config1 и config2 так же, как я описал в своей первой проблеме. Я думаю, что правильно делегирую сеанс SFTP, но не знаю, как настроить localDirectory на основе сеанса SFTP. (Первый TODO в моем коде)
Если кто-то может мне немного помочь, я буду очень признателен.
Заранее спасибо.
Вот мой код:
SftpConfig.Sftp1 sftpConfig1;
SftpConfig.Sftp2 sftpConfig2;
@Bean
@BridgeTo
public MessageChannel uploadChannel() {
return new PublishSubscribeChannel();
}
@Bean
public ExpressionParser spelExpressionParser() {
return new SpelExpressionParser();
}
public SessionFactory<ChannelSftp.LsEntry> getSftpSession(SftpConfig sftp) {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftp.getHost());
factory.setPort(sftp.getPort());
factory.setUser(sftp.getUser());
factory.setPassword(sftp.getPassword());
factory.setAllowUnknownKeys(true);
factory.setTimeout(sftp.getTimeout());
log.info("timeout is set to: {}", sftp.getTimeout());
return new CachingSessionFactory<>(factory);
}
@Bean
public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSf() {
Map<Object, SessionFactory<ChannelSftp.LsEntry>> mapSession = new HashMap<>();
mapSession.put("config1", getSftpSession(sftpConfig1));
mapSession.put("config2", getSftpSession(sftpConfig2));
SessionFactoryLocator<ChannelSftp.LsEntry> sessionFactoryLocator = new DefaultSessionFactoryLocator<>(mapSession);
return new DelegatingSessionFactory<>(sessionFactoryLocator);
}
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = sftpConfig1.getCodes().stream()
.map(code -> new RotationPolicy.KeyDirectory("config1", sftpConfig1.getReaderDirectory() + SEPARATOR + code))
.collect(Collectors.toList());
keyDirectories.addAll(sftpConfig2.getCodes().stream()
.map(code -> new RotationPolicy.KeyDirectory("config2", sftpConfig2.getReaderDirectory() + SEPARATOR + code))
.collect(Collectors.toList()));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
@Bean
public IntegrationFlow sftpIntegrationFlow() {
return IntegrationFlows.from(
Sftp.inboundAdapter(delegatingSf())
.filter(new SftpSimplePatternFileListFilter("*.csv"))
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localFilter(new AbstractFileListFilter<File>() {
@Override
public boolean accept(final File file) {
return file.getName().endsWith(".csv");
}
})
.deleteRemoteFiles(false)
.temporaryFileSuffix(".new")
.localDirectory(new File()) // TODO dynamic local directory based on sftp session
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1, MINUTES).advice(advice()).advice(logNoFileFoundAdvice())))
.log(LoggingHandler.Level.INFO, "[SFTP]", m -> "Received file: " + m.getHeaders().get(FileHeaders.FILENAME))
.channel("filesReceptionChannel")
.enrichHeaders(h -> h.header("errorChannel", "errorChannel"))
.get();
}
@Bean
public MethodInterceptor logNoFileFoundAdvice() {
return invocation -> {
Object result = invocation.proceed();
if (result == null) {
log.info("[SFTP] No files found");
}
return result;
};
}
@Bean
public SftpRemoteFileTemplate sftpTemplate() {
return new SftpRemoteFileTemplate(sftpSession());
}
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSession() {
return getSftpSession(); // TODO dynamic sftp session based on message received in serviceActivator bellow
}
@Bean
@ServiceActivator(inputChannel = "uploadChannel")
public MessageHandler uploadHandler() {
return getFtpMessageHandler(sftpSession());
}
public MessageHandler getFtpMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSession) {
SftpMessageHandler handler = new SftpMessageHandler(sftpSession);
handler.setRemoteDirectoryExpressionString("headers['remoteDirectory']");
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof File) {
return ((File) message.getPayload()).getName();
} else {
throw new IllegalArgumentException("File expected as payload.");
}
});
handler.setUseTemporaryFileName(false);
return handler;
}
РЕДАКТИРОВАТЬ :
С вашей помощью мне удалось исправить первую часть, так что большое спасибо.
Чтобы это исправить, я сначала добавил фабрику по умолчанию в метод delegatingSf()
.
Затем я использовал delegatingSf()
в serviceActivator
вместо sftpSession()
, который удалил.
Наконец, в методе, который отправляет сообщение на мой uploadChannel
, я использовал delegatingSf.setThreadKey(mapSessionKey)
для динамического выбора правильной фабрики в методе, который выводит сообщение на мой uploadChannel
. И конечно delegatingSf.clearThreadKey()
сразу после отправки.
Я постараюсь сделать вторую часть функциональной, а код исправления выложу после.
Обновлено еще раз:
Я использовал localFileNameExpression
, как было предложено, и динамически восстановил localDirectory
, но меня это не устраивает, поскольку, на мой взгляд, это не очень хороший способ сделать это.
Я использовал remoteDirectory
для идентификации SFTP-сервера, но если у двух серверов однажды будет одинаковый путь, он не будет работать.
Но пока это работает, так что спасибо за помощь.
SftpConfig.Sftp1 sftpConfig1;
SftpConfig.Sftp2 sftpConfig2;
@Bean
@BridgeTo
public MessageChannel uploadChannel() {
return new PublishSubscribeChannel();
}
@Bean
public ExpressionParser spelExpressionParser() {
return new SpelExpressionParser();
}
public SessionFactory<ChannelSftp.LsEntry> getSftpSession(SftpConfig sftp) {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftp.getHost());
factory.setPort(sftp.getPort());
factory.setUser(sftp.getUser());
factory.setPassword(sftp.getPassword());
factory.setAllowUnknownKeys(true);
factory.setTimeout(sftp.getTimeout());
log.info("timeout is set to: {}", sftp.getTimeout());
return new CachingSessionFactory<>(factory);
}
@Bean
public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSf() {
Map<Object, SessionFactory<ChannelSftp.LsEntry>> mapSession = new HashMap<>();
mapSession.put("config1", getSftpSession(sftpConfig1));
mapSession.put("config2", getSftpSession(sftpConfig2));
SessionFactoryLocator<ChannelSftp.LsEntry> sessionFactoryLocator = new DefaultSessionFactoryLocator<>(mapSession, mapSession.get("config1"));
return new DelegatingSessionFactory<>(sessionFactoryLocator);
}
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = sftpConfig1.getCodes().stream()
.map(code -> new RotationPolicy.KeyDirectory("config1", sftpConfig1.getReaderDirectory() + SEPARATOR + code))
.collect(Collectors.toList());
keyDirectories.addAll(sftpConfig2.getCodes().stream()
.map(code -> new RotationPolicy.KeyDirectory("config2", sftpConfig2.getReaderDirectory() + SEPARATOR + code))
.collect(Collectors.toList()));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
@Bean
public IntegrationFlow sftpIntegrationFlow() {
return IntegrationFlows.from(
Sftp.inboundAdapter(delegatingSf())
.filter(new SftpSimplePatternFileListFilter("*.csv"))
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localFilter(new AbstractFileListFilter<File>() {
@Override
public boolean accept(final File file) {
return file.getName().endsWith(".csv");
}
})
.deleteRemoteFiles(false)
.temporaryFileSuffix(".new")
.localFilenameExpression("@sftpEIPConfig.getLocalDirectoryReader(#remoteDirectory) + #this")
.localDirectory(new File("/"))
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1, MINUTES).advice(advice()).advice(logNoFileFoundAdvice())))
.log(LoggingHandler.Level.INFO, "[SFTP]", m -> "Received file: " + m.getHeaders().get(FileHeaders.FILENAME))
.channel("filesReceptionChannel")
.enrichHeaders(h -> h.header("errorChannel", "errorChannel"))
.get();
}
@Bean
public MethodInterceptor logNoFileFoundAdvice() {
return invocation -> {
Object result = invocation.proceed();
if (result == null) {
log.info("[SFTP] No files found");
}
return result;
};
}
@Bean
public SftpRemoteFileTemplate sftpTemplate() {
return new SftpRemoteFileTemplate(delegatingSf());
}
@Bean
@ServiceActivator(inputChannel = "uploadChannel")
public MessageHandler uploadHandler() {
return getFtpMessageHandler(delegatingSf());
}
public MessageHandler getFtpMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSession) {
SftpMessageHandler handler = new SftpMessageHandler(sftpSession);
handler.setRemoteDirectoryExpressionString("headers['remoteDirectory']");
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof File) {
return ((File) message.getPayload()).getName();
} else {
throw new IllegalArgumentException("File expected as payload.");
}
});
handler.setUseTemporaryFileName(false);
return handler;
}
localDirectory
нельзя изменить на AbstractInboundFileSynchronizingMessageSource
. Вы можете использовать localFilenameExpression()
для динамического создания каталога. Хотя localDirectory
может быть просто корнем — /
.
Для отправки на разные SFTP-серверы обязательно используйте DelegatingSessionFactory
. Дополнительную информацию см. в документации: https://docs.spring.io/spring-integration/reference/sftp/dsf.html . А также: https://docs.spring.io/spring-integration/reference/handler-advice/context-holder.html
Большое спасибо ! Мне удалось отправить файл по двум разным SFTP с помощью делегирующейSessionFactory. Я не знаю, почему я не попробовал это, учитывая, что я уже использовал его для чтения с этих SFTP. Теперь мне нужно изменить часть inboundAdapter для localDir. Я обновлю пост первым решением. Еще раз спасибо :)