Я создаю потоки динамической интеграции на основе данных из базы данных.
Каталоги, которые нам нужно опросить с помощью шаблона имени файла, находятся в базе данных
например
Instance, directory , filename
ABC , c:/input1 , test.txt
DEF , d:/input2 , fresh.xlsx
У меня есть около 200-300 записей, поэтому я создаю поток интеграции для каждой записи, поскольку у нее будет другой процессор и т. д.
для каждой записи
IntegrationFlowBuilder flowBuilder =
IntegrationFlows
.from(new CustomFileReadingSource(input), consumer);
flowBuilder.transform(new ObjectToJsonTransformer());
flowBuilder.handle(o -> {
// System.out.println(o.getPayload());
});
context.registration(flowBuilder.get()).register();
После того, как все они зарегистрированы, но когда я смотрю VisualVM или журнал, я вижу только 8-10 потоков вместо 100 или 200.
из журнала
2018-11-13 16:00:41.399 [task-scheduler-3] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:41.587 [task-scheduler-10] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:41.807 [task-scheduler-4] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.071 [task-scheduler-5] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.323 [task-scheduler-7] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.569 [task-scheduler-6] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:42.878 [task-scheduler-8] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.197 [task-scheduler-9] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.588 [task-scheduler-1] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:43.951 [task-scheduler-2] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:44.305 [task-scheduler-3] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:44.598 [task-scheduler-10] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.031 [task-scheduler-4] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.414 [task-scheduler-5] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.974 [task-scheduler-7] INFO c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
Как вы можете видеть, его всего несколько потоков опрашивают поток
Может ли кто-нибудь помочь, почему он не создает потоки или какой-либо лучший способ достижения параллельных опросов?




См. документация.
Планировщик по умолчанию имеет только 10 потоков. Обычно запланированные задачи выполняются недолго, и в этом случае обычно достаточно; в противном случае либо увеличьте количество потоков, либо добавьте исполнителя задачи, чтобы планировщик передал работу другому потоку.
Это правильно. Поскольку все Polling Ednpoints основаны на предопределенном глобальном ThreadPoolTaskScheduler с размером пула 10 по умолчанию: https://docs.spring.io/spring-integration/docs/5.1.0.RELEASE/reference/html/configuration.html#namespace-taskscheduler
С другой стороны, действительно бессмысленно пытаться иметь 100 потоков, в то время как ваш процессор имеет максимум 16 ядер. Создание большего количества потоков может даже замедлить работу вашего приложения.
Спасибо @Artem Bilan. Есть ли более разумный / эффективный способ достичь того, что я пытаюсь сделать? Могу ли я использовать один опросчик, и как только я получу файл, я могу вызвать определенный поток?
Я бы посоветовал вам не создавать так много динамических потоков, а иметь только один с некоторой реализацией AbstractMessageSourceAdvice для перебора этих каталогов после каждого опроса. Вы можете найти некоторые идеи в RotatingServerAdvice. После опроса вы можете выполнить маршрутизацию на основе содержимого сообщения. Не вижу причин злоупотреблять динамическими потоками в вашем случае.
Артем Билан. Большое спасибо. Я чувствовал то же самое, что злоупотреблял динамическими потоками. У вас есть пример на RotatingServerAdvice?
RotatingServerAdvice вам не подойдет, потому что он предназначен для удаленных протоколов, (S) FTP, S3 и т. д. В вашем случае это обычная локальная файловая система, поэтому вам нужно просто позаимствовать алгоритм вращения из этого класса. Образец здесь, в тестовом кейсе: github.com/spring-projects/spring-integration/blob/master/…
Спасибо. Я понимаю, что вы предлагаете. И последний вопрос. Допустим, я все еще создаю потоки для обработки файла, поскольку каждый файл будет полностью отличаться от шагов процесса. Как только я получу файл, как мне вызвать конкретный поток? ИЛИ могу я настроить каналы динамически, а затем на основе имени файла / шаблона, который я могу отправить на этот канал. Приносим извинения за многие вопросы, но мы будем благодарны за вашу помощь.
да. вы тоже можете это сделать: на основе содержимого сообщения вы можете выполнять маршрутизацию к каналам, созданным во время выполнения с помощью этого механизма динамических потоков. Дело в том, что старайтесь, чтобы динамические потоки были как можно меньше и легче. Более быстрый запуск, более чистое отключение.
Большое спасибо. Извините за досаду, сэр, но есть ли у вас пример «маршрутизации к каналам, созданным во время выполнения с помощью этого механизма динамических потоков». . :)
Хм? Вы просто используете обычный маршрутизатор, который может полагаться на имя канала, которое вы возвращаете из функции маршрутизации: docs.spring.io/spring-integration/docs/5.1.0.RELEASE/referen ce /…
Чувак, ты просто потрясающий. Извините за то, что был здесь новичком, но пытался научиться чему-то новому :) Большое спасибо! шутки в сторону
Спасибо, Гэри. Вы, ребята, действительно гуру интеграции Spring :)