Я создал метод, который читает файл csv и записывает данные в новый файл csv, я хочу вызвать этот метод в моем потоке интеграции, поэтому в основном я извлекаю файл csv с моего FTP-сервера с именем FEFOexportBEY.csv и Я хочу сгенерировать новый файл из этого csv и назвать его finalBEY.csv, настраиваемый метод должен выполнять запись, но я хочу вызвать его в потоке после опроса csv с FTP. Ниже приведено кодирование настраиваемого метода и интеграции потока. Я обнаружил, что это можно сделать с помощью .handle, но не смог найти правильный способ. В моем старом проекте я смог сделать это в конфигурации xml, создав bean-компонент с именем класса метода, а затем внедрив метод в маршрут, теперь я использую DSL, и должен быть возможный способ, если кто-то может помочь .
Способ обращения
@Component
public class CSVToCSVNoQ {
public CSVToCSVNoQ() {
}
public void writeCSVfinal(String payload,@Header("new") String newCSV,@Header("old") String oldCsv) throws IOException {
CSVReader reader = null;
reader = new CSVReader(new FileReader(oldCsv));
FileWriter fileWriter = new FileWriter(newCSV);
//try (CSVWriter writer = new CSVWriter(new FileWriter(newCSV), ',', CSVWriter.NO_QUOTE_CHARACTER)) {
try(CSVWriter csvWriter = new CSVWriter(fileWriter,CSVWriter.DEFAULT_SEPARATOR,
CSVWriter.NO_QUOTE_CHARACTER)){
List<String[]> line;
reader.readNext();
reader.readNext();
SimpleDateFormat from = new SimpleDateFormat("yyyy-mm-dd");
SimpleDateFormat to = new SimpleDateFormat("ddMMMyyyy");
line = reader.readAll();
Iterator<String[]> itr = line.iterator();
while (itr.hasNext()){
String[] array = itr.next();
if (array[0].equals("DET")) {
// System.out.println("Change Format " + to.format(from.parse(array[5])));
array[5] = to.format(from.parse(array[5]));
}
}
while (itr.hasNext()){
String[] array = itr.next();
System.out.println("Line " + itr.next());
}
csvWriter.writeAll(line);
csvWriter.close();
} catch (IOException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Интеграционный поток на входящем.
public IntegrationFlow fileInboundFlowFromFTPServer(Branch myBranch) throws IOException {
final FtpInboundChannelAdapterSpec sourceSpecFtp = Ftp.inboundAdapter(createNewFtpSessionFactory(myBranch))
.preserveTimestamp(true)
//.patternFilter("*.csv")
.maxFetchSize(MAX_MESSAGES_PER_POLL)
.remoteDirectory(myBranch.getFolderPath())
.regexFilter("FEFOexport"+myBranch.getBranchCode()+".csv")
.deleteRemoteFiles(true)
.localDirectory(new File(myBranch.getBranchCode()))
.temporaryFileSuffix(TEMPORARY_FILE_SUFFIX)
/*.localFilenameExpression(new FunctionExpression<String>(s -> {
final int fileTypeSepPos = s.lastIndexOf('.');
return DateTimeFormatter
.ofPattern(TIMESTAMP_FORMAT_OF_FILES)
.withZone(ZoneId.of(TIMEZONE_UTC))
.format(Instant.now())
+ "_"
+ s.substring(0,fileTypeSepPos)
+ s.substring(fileTypeSepPos);
}))*/;
// Poller definition
final Consumer<SourcePollingChannelAdapterSpec> stockInboundPoller = endpointConfigurer -> endpointConfigurer
.id("stockInboundPoller")
.autoStartup(true)
.poller(poller());
IntegrationFlow flow = IntegrationFlows
.from(sourceSpecFtp, stockInboundPoller)
.transform(File.class, p ->{
// log step
LOG1.info("flow=stockInboundFlowFromAFT, message=incoming file: " + p);
return p;
})
.channel(CHANNEL_INTERMEDIATE_STAGE)
.handle(m -> {
try {
this.csvToCSVNoQ.writeCSVfinal("test", myBranch.getBranchCode() + "/final" + myBranch.getBranchCode() + ".csv", myBranch.getBranchCode() + "/FEFOexport" + myBranch.getBranchCode() + ".csv");
LOG1.info("Writing final file .csv " + m);
} catch (IOException e) {
e.printStackTrace();
}
})
.get();
return flow;
}
Старая конфигурация XML
<bean id = "myBeanId" class = "com.preparationforinterview.csvprocessing.CSVTOCSVNOQUT"/>
<!--BEY routing-->
<route>
<from uri = "ftp://[email protected]:21/ftp/erbranch/EDMS/FEFO/?fileName=FEFOexportBEY.csv"/>
<log message = "Level=INFO&showBody=true&showHeaders=true"/>
<to uri = "file:input"/>
<bean ref = "myBeanId" method = "writeCSVfinal(output\finalBEY.csv,input\FEFOexportBEY.csv)"/>
<bean ref = "myBeanId" method = "readCSVOrder(input\FEFOexportBEY.csv)"/>
<log message = "Inserting orders and details in the database from BEY"/>
<to uri = "ftp://[email protected]:21/ftp/erbranch/EDMS/FEFO/History/?autoCreate=true&"/>
</route>
Прежде всего, ваше определение ссылки на метод полностью неверно в этом .handle().
В любом случае давайте пока об этом забудем!
С другой стороны, ваша подпись метода writeCSVfinal() не очень удобна для обмена сообщениями. У вас есть два аргумента, которые с точки зрения инициатора идентичны, и у вас будет двусмысленность, потому что фреймворк не может определить, какой из них относится к payload, из сообщения запроса.
Но мы все еще можем достичь цели с помощью вашего метода и .handle() следующим образом:
.handle(m -> this.cSVToCSVNoQ.writeCSVfinal(m.getPayload(), m.getHeaders().get(...)))
Или какая-то другая логика для реального определения значений аргументов метода.
Также имейте в виду, что ваш метод возвращает void, поэтому такой обработчик можно использовать только в конце потока. Нет ничего, что могло бы быть отправлено на следующий канал в цепочке.
Совершенно непонятно, как работает ваш код Camel.
Пожалуйста, прочтите это: docs.spring.io/spring-integration/docs/current/reference/htm l /…. Идентификатор не имеет отношения к вашей проблеме, но, по крайней мере, он говорит, что у вас не может быть двух аргументов без конкретной аннотации, поэтому фреймворк не поймет, какой из них предназначен для payload.
Если вы говорите, что он хорошо работает вне Spring Integration, вам просто нужно познакомиться с Spring Integration. Пожалуйста, прочтите справочное руководство с самого начала, чтобы определить, что между ними находится сообщение, обработчик и канал. Как работает запрос-ответ и что такое сопоставление аргументов. Я не знаю, где вы берете свои new file location и old file location, поэтому я просто показал вам возможный вариант вызова вашего метода. Вы должны определить для себя, какая часть сообщения запроса несет соответствующую информацию.
Извините, мне потребовалось время, чтобы разобраться в этом еще раз, я смог отсортировать это, добавив заголовки в метод ... Я редактирую код выше ... единственная проблема, с которой я сталкиваюсь, - это наличие двух серверов, которые я ' m опрашивает новые файлы, и если один сервер получает новый файл, он вытягивается на локальный, и логически он должен преобразовать новый файл в finalXXX.csv и отправить его на удаленный, но то, что он делает, он выполняет метод на всех файлы, которые существуют в моей локальной папке, а не только в новой ... не уверен, в чем проблема ... пытаюсь понять .. @ Artem Bilan
То, что вы описываете, не имеет отношения к теме этой темы. Пожалуйста, поднимите новый вопрос SO с более подробной информацией, и давайте рассмотрим этот вопрос как ответ: stackoverflow.com/help/someone-answers!
Извините за мои знания о весенней интеграции, но я все еще изучаю ее, поэтому мой метод writeCSVfinal () сначала принимает новое местоположение и имя файла в виде строки, а затем второе местоположение и имя старого файла. Он записывает новый файл на основе старого ... это работает плавно вне Spring интеграции, поэтому я пытался поместить его в дескриптор, как лучше всего создать его, чтобы он был более дружелюбным с обменом сообщениями ... Итак, что я понял из вашей помощи выше, так это то, что я должен передавать аргументы в .get (..), есть ли какой-нибудь пример, на который вы можете указать мне, чтобы я мог ссылаться на него? @Artem Bilan