Мое требование - переместить файлы из каталога ввода в каталог вывода. В настоящее время я получаю XML-файл, разбираю его, обрабатываю и хочу перейти в новую папку. Я использую SPring boot 2.0, Spring INtegration 5. Код прилагается. Эти потоки интеграции обрабатывают файл, но после обработки не перемещают файл в новый каталог.
Не могли бы вы сообщить мне, чего не хватает и как это исправить?
Журналы
2018-04-06 15:55:16.473[0;39m [32mDEBUG[0;39m [35m6364[0;39m [2m---[0;39m [2m[ask-scheduler-1][0;39m [36mo.s.i.handler.ServiceActivatingHandler [0;39m [2m:[0;39m handler 'ServiceActivator for [org.springframework.integration.handler.BeanNameMessageProcessor@33a55bd8] (org.springframework.integration.handler.ServiceActivatingHandler#0)' produced no reply for request Message: GenericMessage [payload=Producers {id: -2147483648, parent-id: 0}, headers = {file_originalFile=C:\slim\OBDF\Entire_IMO_hierarchy.xml, id=3ee00fca-1f2b-be84-742a-b5c6edfaf42a, file_name=Entire_IMO_hierarchy.xml, file_relativePath=Entire_IMO_hierarchy.xml, timestamp=1523055316426}]
2018-04-06 15:55:16.475[0;39m [32mDEBUG[0;39m [35m6364[0;39m ---[0;39m [ask-scheduler-1][0;39m [36mo.s.integration.channel.DirectChannel [0;39m :[0;39m postSend (sent=true) on channel 'slimflow.channel#1', message: GenericMessage [payload=Producers {id: -2147483648, parent-id: 0}, headers = {file_originalFile=C:\slim\OBDF\Entire_IMO_hierarchy.xml, id=3ee00fca-1f2b-be84-742a-b5c6edfaf42a, file_name=Entire_IMO_hierarchy.xml, file_relativePath=Entire_IMO_hierarchy.xml, timestamp=1523055316426}]
2018-04-06 15:55:16.480[0;39m [32mDEBUG[0;39m [35m6364[0;39m ---[0;39m [ask-scheduler-1][0;39m [36mo.s.integration.channel.DirectChannel [0;39m :[0;39m postSend (sent=true) on channel 'slimflow.channel#0', message: GenericMessage [payload=C:\slim\OBDF\Entire_IMO_hierarchy.xml, headers = {file_originalFile=C:\slim\OBDF\Entire_IMO_hierarchy.xml, id=0f673954-bceb-6e64-0d47-639522002569, file_name=Entire_IMO_hierarchy.xml, file_relativePath=Entire_IMO_hierarchy.xml, timestamp=1523055316320}]
Конфигурация потока интеграции
import java.io.File;
import java.util.function.Function;
import javax.xml.bind.JAXBException;
import javax.xml.stream.XMLStreamException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.ChainFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.transformer.PayloadTypeConvertingTransformer;
import org.springframework.messaging.MessageHandler;
@Configuration
@EnableIntegration
public class SlimIntegrationConfig {
@Value("${input.directory}")
private String inputDir;
@Value("${outputDir.directory}")
private String outputDir;
@Value("${input.scan.frequency: 100000}")
private long scanFrequency;
@Autowired
private XmlBeanExtractor<Producers> xmlBeanExtractor;
@Bean
public MessageSource<File> inputFileSource() {
FileReadingMessageSource src = new FileReadingMessageSource(
(f1, f2) -> Long.valueOf(f1.lastModified()).compareTo(f2.lastModified()));
src.setDirectory(new File(inputDir));
src.setAutoCreateDirectory(true);
ChainFileListFilter<File> chainFileListFilter = new ChainFileListFilter<>();
chainFileListFilter.addFilter(new AcceptOnceFileListFilter<>() );
chainFileListFilter.addFilter(new RegexPatternFileListFilter("(?i)^.+\\.xml$"));
src.setFilter(chainFileListFilter);
return src;
}
@Bean
public DirectChannel outputChannel() {
return new DirectChannel();
}
@Bean
public MessageHandler fileOutboundChannelAdapter() {
FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(outputDir));
adapter.setDeleteSourceFiles(true);
adapter.setAutoCreateDirectory(true);
adapter.setExpectReply(false);
return adapter;
}
@Bean
PayloadTypeConvertingTransformer<File, Producers> xmlBeanTranformer() {
PayloadTypeConvertingTransformer<File, Producers> tranformer = new PayloadTypeConvertingTransformer<>();
tranformer.setConverter(file -> {
Producers p = null;
try {
p = xmlBeanExtractor.extract(file.getAbsolutePath(), Producers.class);
} catch (JAXBException | XMLStreamException e) {
e.printStackTrace();
}
return p;
});
return tranformer;
}
@Bean
public IntegrationFlow slimflow() {
return IntegrationFlows
.from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency)))
.transform(xmlBeanTranformer())
.handle("slimFileProcessor","processfile")
.channel(outputChannel())
.handle(fileOutboundChannelAdapter())
.get()
;
}
}
Нам нужно знать, что делает ваш slimFileProcessor.processfile(). Однако это не отражает того, что вы делаете в xmlBeanTranformer. Вы конвертируете полезную нагрузку File в объект Producers, и именно она отправляется в slimFileProcessor.
Итак, это первое: в File нет payload для FileWritingMessageHandler. Но мы можем исправить это чуть позже.
Теперь у вас есть журнал вроде:
ServiceActivatingHandler#0)' produced no reply for request
Итак, ваш slimFileProcessor не возвращает что-то для отправки на outputChannel() для возможного перемещения файла из одного каталога в другой.
Если возврат чего-либо вообще невозможен по логике, вы можете рассмотреть возможность использования .publishSubscribeChannel(). Сделайте этот xmlBeanTranformer() одним подписчиком, а fileOutboundChannelAdapter() другим. Таким образом, один и тот же объект File будет отправлен в две ветки. Только дело в том, что вторая ветка не будет вызвана, пока первая не закончит свою работу. Конечно, если все делать в одном потоке.
Вы все еще можете жить с простым линейным потоком только потому, что вы получаете усиление заголовка FileHeaders.ORIGINAL_FILE, который будет использоваться в FileWritingMessageHandler. Но следует иметь в виду, что последний поддерживает только следующие типы полезной нагрузки сообщения запроса: File, InputStream, byte[] или String. Для вашего варианта использования двигаться после процесса, конечно, было бы лучше иметь дело с типом File. Поэтому предлагаю рассмотреть вариант опубликовать-подписаться.