Проблема с потоками динамических потоков Spring Integration

Я создаю потоки динамической интеграции на основе данных из базы данных.

Каталоги, которые нам нужно опросить с помощью шаблона имени файла, находятся в базе данных

например

Instance, directory , filename 
ABC     , c:/input1  , test.txt
DEF     , d:/input2 ,  fresh.xlsx

У меня есть около 200-300 записей, поэтому я создаю поток интеграции для каждой записи, поскольку у нее будет другой процессор и т. д.

для каждой записи

 IntegrationFlowBuilder flowBuilder =
                        IntegrationFlows
                                .from(new CustomFileReadingSource(input), consumer);

     flowBuilder.transform(new ObjectToJsonTransformer());

      flowBuilder.handle(o -> {

    //                System.out.println(o.getPayload());
                });
context.registration(flowBuilder.get()).register();

После того, как все они зарегистрированы, но когда я смотрю VisualVM или журнал, я вижу только 8-10 потоков вместо 100 или 200.

из журнала

2018-11-13 16:00:41.399 [task-scheduler-3] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:41.587 [task-scheduler-10] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:41.807 [task-scheduler-4] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:42.071 [task-scheduler-5] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:42.323 [task-scheduler-7] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:42.569 [task-scheduler-6] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:42.878 [task-scheduler-8] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:43.197 [task-scheduler-9] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:43.588 [task-scheduler-1] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:43.951 [task-scheduler-2] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:44.305 [task-scheduler-3] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:44.598 [task-scheduler-10] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
2018-11-13 16:00:45.031 [task-scheduler-4] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:45.414 [task-scheduler-5] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
2018-11-13 16:00:45.974 [task-scheduler-7] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 

Как вы можете видеть, его всего несколько потоков опрашивают поток

Может ли кто-нибудь помочь, почему он не создает потоки или какой-либо лучший способ достижения параллельных опросов?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
0
350
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

См. документация.

Планировщик по умолчанию имеет только 10 потоков. Обычно запланированные задачи выполняются недолго, и в этом случае обычно достаточно; в противном случае либо увеличьте количество потоков, либо добавьте исполнителя задачи, чтобы планировщик передал работу другому потоку.

Спасибо, Гэри. Вы, ребята, действительно гуру интеграции Spring :)

Makky 13.11.2018 17:39
Ответ принят как подходящий

Это правильно. Поскольку все Polling Ednpoints основаны на предопределенном глобальном ThreadPoolTaskScheduler с размером пула 10 по умолчанию: https://docs.spring.io/spring-integration/docs/5.1.0.RELEASE/reference/html/configuration.html#namespace-taskscheduler

С другой стороны, действительно бессмысленно пытаться иметь 100 потоков, в то время как ваш процессор имеет максимум 16 ядер. Создание большего количества потоков может даже замедлить работу вашего приложения.

Спасибо @Artem Bilan. Есть ли более разумный / эффективный способ достичь того, что я пытаюсь сделать? Могу ли я использовать один опросчик, и как только я получу файл, я могу вызвать определенный поток?

Makky 13.11.2018 17:19

Я бы посоветовал вам не создавать так много динамических потоков, а иметь только один с некоторой реализацией AbstractMessageSourceAdvice для перебора этих каталогов после каждого опроса. Вы можете найти некоторые идеи в RotatingServerAdvice. После опроса вы можете выполнить маршрутизацию на основе содержимого сообщения. Не вижу причин злоупотреблять динамическими потоками в вашем случае.

Artem Bilan 13.11.2018 17:24

Артем Билан. Большое спасибо. Я чувствовал то же самое, что злоупотреблял динамическими потоками. У вас есть пример на RotatingServerAdvice?

Makky 13.11.2018 17:28

RotatingServerAdvice вам не подойдет, потому что он предназначен для удаленных протоколов, (S) FTP, S3 и т. д. В вашем случае это обычная локальная файловая система, поэтому вам нужно просто позаимствовать алгоритм вращения из этого класса. Образец здесь, в тестовом кейсе: github.com/spring-projects/spring-integration/blob/master/…

Artem Bilan 13.11.2018 17:32

Спасибо. Я понимаю, что вы предлагаете. И последний вопрос. Допустим, я все еще создаю потоки для обработки файла, поскольку каждый файл будет полностью отличаться от шагов процесса. Как только я получу файл, как мне вызвать конкретный поток? ИЛИ могу я настроить каналы динамически, а затем на основе имени файла / шаблона, который я могу отправить на этот канал. Приносим извинения за многие вопросы, но мы будем благодарны за вашу помощь.

Makky 13.11.2018 17:34

да. вы тоже можете это сделать: на основе содержимого сообщения вы можете выполнять маршрутизацию к каналам, созданным во время выполнения с помощью этого механизма динамических потоков. Дело в том, что старайтесь, чтобы динамические потоки были как можно меньше и легче. Более быстрый запуск, более чистое отключение.

Artem Bilan 13.11.2018 17:37

Большое спасибо. Извините за досаду, сэр, но есть ли у вас пример «маршрутизации к каналам, созданным во время выполнения с помощью этого механизма динамических потоков». . :)

Makky 13.11.2018 17:38

Хм? Вы просто используете обычный маршрутизатор, который может полагаться на имя канала, которое вы возвращаете из функции маршрутизации: docs.spring.io/spring-integration/docs/5.1.0.RELEASE/referen‌ ce /…

Artem Bilan 13.11.2018 17:42

Чувак, ты просто потрясающий. Извините за то, что был здесь новичком, но пытался научиться чему-то новому :) Большое спасибо! шутки в сторону

Makky 13.11.2018 17:43

Другие вопросы по теме