Как вызвать метод в моем потоке интеграции

Я создал метод, который читает файл csv и записывает данные в новый файл csv, я хочу вызвать этот метод в моем потоке интеграции, поэтому в основном я извлекаю файл csv с моего FTP-сервера с именем FEFOexportBEY.csv и Я хочу сгенерировать новый файл из этого csv и назвать его finalBEY.csv, настраиваемый метод должен выполнять запись, но я хочу вызвать его в потоке после опроса csv с FTP. Ниже приведено кодирование настраиваемого метода и интеграции потока. Я обнаружил, что это можно сделать с помощью .handle, но не смог найти правильный способ. В моем старом проекте я смог сделать это в конфигурации xml, создав bean-компонент с именем класса метода, а затем внедрив метод в маршрут, теперь я использую DSL, и должен быть возможный способ, если кто-то может помочь .

Способ обращения

@Component
public class CSVToCSVNoQ {

public CSVToCSVNoQ() {
}

public void writeCSVfinal(String payload,@Header("new") String newCSV,@Header("old") String oldCsv) throws IOException {

    CSVReader reader = null;
    reader = new CSVReader(new FileReader(oldCsv));
    FileWriter fileWriter = new FileWriter(newCSV);

    //try (CSVWriter writer = new CSVWriter(new FileWriter(newCSV), ',', CSVWriter.NO_QUOTE_CHARACTER)) {
    try(CSVWriter csvWriter = new CSVWriter(fileWriter,CSVWriter.DEFAULT_SEPARATOR,
                                                        CSVWriter.NO_QUOTE_CHARACTER)){
        List<String[]> line;
        reader.readNext();
        reader.readNext();
        SimpleDateFormat from = new SimpleDateFormat("yyyy-mm-dd");
        SimpleDateFormat to = new SimpleDateFormat("ddMMMyyyy");

        line = reader.readAll();

        Iterator<String[]> itr = line.iterator();
        while (itr.hasNext()){
            String[] array = itr.next();
            if (array[0].equals("DET")) {
                // System.out.println("Change Format " + to.format(from.parse(array[5])));
                array[5] = to.format(from.parse(array[5]));
            }
        }

        while (itr.hasNext()){
            String[] array = itr.next();
            System.out.println("Line " + itr.next());
        }

        csvWriter.writeAll(line);
        csvWriter.close();

    } catch (IOException e) {
        e.printStackTrace();
    } catch (ParseException e) {
        e.printStackTrace();
    }

    try {
        reader.close();
    } catch (IOException e) {
        e.printStackTrace();
    }

}

}

Интеграционный поток на входящем.

 public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch) throws IOException {

    final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
            .preserveTimestamp(true)
          //.patternFilter("*.csv")
            .maxFetchSize(MAX_MESSAGES_PER_POLL)
            .remoteDirectory(myBranch.getFolderPath())
            .regexFilter("FEFOexport"+myBranch.getBranchCode()+".csv")
            .deleteRemoteFiles(true)
            .localDirectory(new File(myBranch.getBranchCode()))
            .temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)

            /*.localFilenameExpression(new FunctionExpression<String>(s -> {
                final int fileTypeSepPos = s.lastIndexOf('.');
                return DateTimeFormatter
                        .ofPattern(TIMESTAMP_FORMAT_OF_FILES)
                        .withZone(ZoneId.of(TIMEZONE_UTC))
                        .format(Instant.now())
                        + "_"
                        + s.substring(0,fileTypeSepPos)
                        + s.substring(fileTypeSepPos);
            }))*/;

    // Poller definition
    final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
            .id("stockInboundPoller")
            .autoStartup(true)
            .poller(poller());

    IntegrationFlow flow = IntegrationFlows
            .from(sourceSpecFtp, stockInboundPoller)

            .transform(File.class, p ->{
                // log step
                LOG1.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
                return p;
            })

            .channel(CHANNEL_INTERMEDIATE_STAGE)
            .handle(m -> {
                try {
                    this.csvToCSVNoQ.writeCSVfinal("test", myBranch.getBranchCode() + "/final" + myBranch.getBranchCode() + ".csv", myBranch.getBranchCode() + "/FEFOexport" + myBranch.getBranchCode() + ".csv");
                    LOG1.info("Writing final file .csv " + m);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            })
            .get();

    return flow;
}

Старая конфигурация XML

 <bean id = "myBeanId" class = "com.preparationforinterview.csvprocessing.CSVTOCSVNOQUT"/>


    <!--BEY routing-->
    <route>
        <from uri = "ftp://[email protected]:21/ftp/erbranch/EDMS/FEFO/?fileName=FEFOexportBEY.csv"/>
        <log message = "Level=INFO&amp;showBody=true&amp;showHeaders=true"/>
        <to uri = "file:input"/>
        <bean ref = "myBeanId" method = "writeCSVfinal(output\finalBEY.csv,input\FEFOexportBEY.csv)"/>
        <bean ref = "myBeanId" method = "readCSVOrder(input\FEFOexportBEY.csv)"/>
        <log message = "Inserting orders and details in the database from BEY"/>
        <to uri = "ftp://[email protected]:21/ftp/erbranch/EDMS/FEFO/History/?autoCreate=true&amp"/>
    </route>
0
0
790
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Прежде всего, ваше определение ссылки на метод полностью неверно в этом .handle().

В любом случае давайте пока об этом забудем!

С другой стороны, ваша подпись метода writeCSVfinal() не очень удобна для обмена сообщениями. У вас есть два аргумента, которые с точки зрения инициатора идентичны, и у вас будет двусмысленность, потому что фреймворк не может определить, какой из них относится к payload, из сообщения запроса.

Но мы все еще можем достичь цели с помощью вашего метода и .handle() следующим образом:

.handle(m -> this.cSVToCSVNoQ.writeCSVfinal(m.getPayload(), m.getHeaders().get(...)))

Или какая-то другая логика для реального определения значений аргументов метода.

Также имейте в виду, что ваш метод возвращает void, поэтому такой обработчик можно использовать только в конце потока. Нет ничего, что могло бы быть отправлено на следующий канал в цепочке.

Совершенно непонятно, как работает ваш код Camel.

Извините за мои знания о весенней интеграции, но я все еще изучаю ее, поэтому мой метод writeCSVfinal () сначала принимает новое местоположение и имя файла в виде строки, а затем второе местоположение и имя старого файла. Он записывает новый файл на основе старого ... это работает плавно вне Spring интеграции, поэтому я пытался поместить его в дескриптор, как лучше всего создать его, чтобы он был более дружелюбным с обменом сообщениями ... Итак, что я понял из вашей помощи выше, так это то, что я должен передавать аргументы в .get (..), есть ли какой-нибудь пример, на который вы можете указать мне, чтобы я мог ссылаться на него? @Artem Bilan

Elias Khattar 13.12.2018 13:30

Пожалуйста, прочтите это: docs.spring.io/spring-integration/docs/current/reference/htm‌ l /…. Идентификатор не имеет отношения к вашей проблеме, но, по крайней мере, он говорит, что у вас не может быть двух аргументов без конкретной аннотации, поэтому фреймворк не поймет, какой из них предназначен для payload.

Artem Bilan 13.12.2018 15:00

Если вы говорите, что он хорошо работает вне Spring Integration, вам просто нужно познакомиться с Spring Integration. Пожалуйста, прочтите справочное руководство с самого начала, чтобы определить, что между ними находится сообщение, обработчик и канал. Как работает запрос-ответ и что такое сопоставление аргументов. Я не знаю, где вы берете свои new file location и old file location, поэтому я просто показал вам возможный вариант вызова вашего метода. Вы должны определить для себя, какая часть сообщения запроса несет соответствующую информацию.

Artem Bilan 13.12.2018 15:04

Извините, мне потребовалось время, чтобы разобраться в этом еще раз, я смог отсортировать это, добавив заголовки в метод ... Я редактирую код выше ... единственная проблема, с которой я сталкиваюсь, - это наличие двух серверов, которые я ' m опрашивает новые файлы, и если один сервер получает новый файл, он вытягивается на локальный, и логически он должен преобразовать новый файл в finalXXX.csv и отправить его на удаленный, но то, что он делает, он выполняет метод на всех файлы, которые существуют в моей локальной папке, а не только в новой ... не уверен, в чем проблема ... пытаюсь понять .. @ Artem Bilan

Elias Khattar 19.12.2018 12:29

То, что вы описываете, не имеет отношения к теме этой темы. Пожалуйста, поднимите новый вопрос SO с более подробной информацией, и давайте рассмотрим этот вопрос как ответ: stackoverflow.com/help/someone-answers!

Artem Bilan 19.12.2018 14:17

Другие вопросы по теме