Spring Integration не перемещает файл после обработки

Мое требование - переместить файлы из каталога ввода в каталог вывода. В настоящее время я получаю XML-файл, разбираю его, обрабатываю и хочу перейти в новую папку. Я использую SPring boot 2.0, Spring INtegration 5. Код прилагается. Эти потоки интеграции обрабатывают файл, но после обработки не перемещают файл в новый каталог.

Не могли бы вы сообщить мне, чего не хватает и как это исправить?

Журналы

2018-04-06 15:55:16.473[0;39m [32mDEBUG[0;39m [35m6364[0;39m [2m---[0;39m [2m[ask-scheduler-1][0;39m [36mo.s.i.handler.ServiceActivatingHandler  [0;39m [2m:[0;39m handler 'ServiceActivator for [org.springframework.integration.handler.BeanNameMessageProcessor@33a55bd8] (org.springframework.integration.handler.ServiceActivatingHandler#0)' produced no reply for request Message: GenericMessage [payload=Producers {id: -2147483648, parent-id: 0}, headers = {file_originalFile=C:\slim\OBDF\Entire_IMO_hierarchy.xml, id=3ee00fca-1f2b-be84-742a-b5c6edfaf42a, file_name=Entire_IMO_hierarchy.xml, file_relativePath=Entire_IMO_hierarchy.xml, timestamp=1523055316426}]
2018-04-06 15:55:16.475[0;39m [32mDEBUG[0;39m [35m6364[0;39m ---[0;39m [ask-scheduler-1][0;39m [36mo.s.integration.channel.DirectChannel   [0;39m :[0;39m postSend (sent=true) on channel 'slimflow.channel#1', message: GenericMessage [payload=Producers {id: -2147483648, parent-id: 0}, headers = {file_originalFile=C:\slim\OBDF\Entire_IMO_hierarchy.xml, id=3ee00fca-1f2b-be84-742a-b5c6edfaf42a, file_name=Entire_IMO_hierarchy.xml, file_relativePath=Entire_IMO_hierarchy.xml, timestamp=1523055316426}]
2018-04-06 15:55:16.480[0;39m [32mDEBUG[0;39m [35m6364[0;39m ---[0;39m [ask-scheduler-1][0;39m [36mo.s.integration.channel.DirectChannel   [0;39m :[0;39m postSend (sent=true) on channel 'slimflow.channel#0', message: GenericMessage [payload=C:\slim\OBDF\Entire_IMO_hierarchy.xml, headers = {file_originalFile=C:\slim\OBDF\Entire_IMO_hierarchy.xml, id=0f673954-bceb-6e64-0d47-639522002569, file_name=Entire_IMO_hierarchy.xml, file_relativePath=Entire_IMO_hierarchy.xml, timestamp=1523055316320}]

Конфигурация потока интеграции

import java.io.File;
import java.util.function.Function;

import javax.xml.bind.JAXBException;
import javax.xml.stream.XMLStreamException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.ChainFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.transformer.PayloadTypeConvertingTransformer;
import org.springframework.messaging.MessageHandler;

@Configuration
@EnableIntegration
public class SlimIntegrationConfig {
    @Value("${input.directory}")
    private String inputDir;

    @Value("${outputDir.directory}")
    private String outputDir;

    @Value("${input.scan.frequency: 100000}")
    private long scanFrequency;

    @Autowired
    private XmlBeanExtractor<Producers> xmlBeanExtractor;

    @Bean
    public MessageSource<File> inputFileSource() {
        FileReadingMessageSource src = new FileReadingMessageSource(
                (f1, f2) -> Long.valueOf(f1.lastModified()).compareTo(f2.lastModified()));

        src.setDirectory(new File(inputDir));
        src.setAutoCreateDirectory(true);

        ChainFileListFilter<File> chainFileListFilter = new ChainFileListFilter<>();        
        chainFileListFilter.addFilter(new AcceptOnceFileListFilter<>() );
        chainFileListFilter.addFilter(new RegexPatternFileListFilter("(?i)^.+\\.xml$"));        
        src.setFilter(chainFileListFilter);

        return src;
    }

    @Bean
    public DirectChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageHandler fileOutboundChannelAdapter() {

        FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(outputDir));
        adapter.setDeleteSourceFiles(true);
        adapter.setAutoCreateDirectory(true);
        adapter.setExpectReply(false);
        return adapter;
    }

    @Bean
    PayloadTypeConvertingTransformer<File, Producers> xmlBeanTranformer() {
        PayloadTypeConvertingTransformer<File, Producers> tranformer = new PayloadTypeConvertingTransformer<>();
        tranformer.setConverter(file -> {
            Producers p = null;
            try {
                p = xmlBeanExtractor.extract(file.getAbsolutePath(), Producers.class);
            } catch (JAXBException | XMLStreamException e) {
                e.printStackTrace();
            }

            return p;
        });

        return tranformer;
    }

    @Bean
    public IntegrationFlow slimflow() {
        return IntegrationFlows
                .from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency)))
                .transform(xmlBeanTranformer())
                .handle("slimFileProcessor","processfile")
                .channel(outputChannel())
                .handle(fileOutboundChannelAdapter())
                .get()
                ;
    }
}
1
0
720
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Нам нужно знать, что делает ваш slimFileProcessor.processfile(). Однако это не отражает того, что вы делаете в xmlBeanTranformer. Вы конвертируете полезную нагрузку File в объект Producers, и именно она отправляется в slimFileProcessor.

Итак, это первое: в File нет payload для FileWritingMessageHandler. Но мы можем исправить это чуть позже.

Теперь у вас есть журнал вроде:

ServiceActivatingHandler#0)' produced no reply for request

Итак, ваш slimFileProcessor не возвращает что-то для отправки на outputChannel() для возможного перемещения файла из одного каталога в другой.

Если возврат чего-либо вообще невозможен по логике, вы можете рассмотреть возможность использования .publishSubscribeChannel(). Сделайте этот xmlBeanTranformer() одним подписчиком, а fileOutboundChannelAdapter() другим. Таким образом, один и тот же объект File будет отправлен в две ветки. Только дело в том, что вторая ветка не будет вызвана, пока первая не закончит свою работу. Конечно, если все делать в одном потоке.

Вы все еще можете жить с простым линейным потоком только потому, что вы получаете усиление заголовка FileHeaders.ORIGINAL_FILE, который будет использоваться в FileWritingMessageHandler. Но следует иметь в виду, что последний поддерживает только следующие типы полезной нагрузки сообщения запроса: File, InputStream, byte[] или String. Для вашего варианта использования двигаться после процесса, конечно, было бы лучше иметь дело с типом File. Поэтому предлагаю рассмотреть вариант опубликовать-подписаться.

Другие вопросы по теме