Понимание поведения канала/емкости очереди по умолчанию

У меня странная проблема с приоритетным каналом интеграции spring (или, по крайней мере, я думаю, что все идет не так). У меня есть следующий поток:

IntegrationFlows
                .from(fileReadingMessageSource,
                        c -> c.poller(Pollers.fixedDelay(period)
                                             .taskExecutor(Executors.newFixedThreadPool(poolSize))
                                             .maxMessagesPerPoll(maxMessagesPerPoll)))
                .channel("alphabetically")
                .bridge(s -> s.poller(Pollers.fixedDelay(100)))
                .channel(ApplicationConfiguration.INBOUND_CHANNEL)
                .get();

И приоритетный канал емкостью 1'000:

@Bean
    public PriorityChannel alphabetically(@Value("${inbound.sort.queue-capacity}") int capacity) {
        return new PriorityChannel(capacity, Comparator.comparing(left -> ((File) left.getPayload()).getName()));
    }

Я использую этот поток для чтения около 20 000 файлов из входного каталога. Все работает нормально, но примерно через 2000 файлов поток перестает работать и не подхватывает новые файлы.

Я думал, что поведение канала очереди по умолчанию заключается в том, что, когда он достигает емкости, он просто ждет освобождения емкости и принимает следующие файлы, которые будут поставлены в очередь? Но я мог ошибиться... Если это не так, и есть некоторый тайм-аут для файлов, которые будут выбраны средством опроса, и не будет достаточно места в приоритетном канале, что бы вы предложили обойти? Это?

3 метода стилизации элементов HTML
3 метода стилизации элементов HTML
Когда дело доходит до применения какого-либо стиля к нашему HTML, существует три подхода: встроенный, внутренний и внешний. Предпочтительным обычно...
Формы c голосовым вводом в React с помощью Speechly
Формы c голосовым вводом в React с помощью Speechly
Пытались ли вы когда-нибудь заполнить веб-форму в области электронной коммерции, которая требует много кликов и выбора? Вас попросят заполнить дату,...
Стилизация и валидация html-формы без использования JavaScript (только HTML/CSS)
Стилизация и валидация html-формы без использования JavaScript (только HTML/CSS)
Будучи разработчиком веб-приложений, легко впасть в заблуждение, считая, что приложение без JavaScript не имеет права на жизнь. Нам становится удобно...
Flatpickr: простой модуль календаря для вашего приложения на React
Flatpickr: простой модуль календаря для вашего приложения на React
Если вы ищете пакет для быстрой интеграции календаря с выбором даты в ваше приложения, то библиотека Flatpickr отлично справится с этой задачей....
В чем разница между Promise и Observable?
В чем разница между Promise и Observable?
Разберитесь в этом вопросе, и вы значительно повысите уровень своей компетенции.
Что такое cURL в PHP? Встроенные функции и пример GET запроса
Что такое cURL в PHP? Встроенные функции и пример GET запроса
Клиент для URL-адресов, cURL, позволяет взаимодействовать с множеством различных серверов по множеству различных протоколов с синтаксисом URL.
0
0
31
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Добавьте ведение журнала, чтобы увидеть, что происходит; это отлично работает для меня...

@SpringBootApplication
public class So56259801Application {

    public static void main(String[] args) {
        SpringApplication.run(So56259801Application.class, args);
    }

    private int i;

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(() -> "foo" + i++, e -> e.poller(Pollers.fixedDelay(5_000)
                    .taskExecutor(Executors.newFixedThreadPool(1))))
                .log()
                .channel(MessageChannels.queue(3))
                .bridge(b -> b.poller(Pollers.fixedDelay(10_000)))
                .log()
                .get();
    }

}

а также

2019-05-28 13:43:59.719  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo0, headers={id=d87cba1d-dc6b-fdf4-56ed-61f08048851b, timestamp=1559065439718}]
2019-05-28 13:43:59.719  INFO 75315 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo0, headers={id=d87cba1d-dc6b-fdf4-56ed-61f08048851b, timestamp=1559065439718}]
2019-05-28 13:43:59.724  INFO 75315 --- [           main] com.example.So56259801Application        : Started So56259801Application in 0.832 seconds (JVM running for 1.242)
2019-05-28 13:44:04.719  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo1, headers={id=8b7676e6-9bac-cdf0-4f4c-38513267b666, timestamp=1559065444719}]
2019-05-28 13:44:09.721  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo2, headers={id=3b5346f8-d007-dd33-bee3-28eed4cfbd00, timestamp=1559065449721}]
2019-05-28 13:44:10.727  INFO 75315 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo1, headers={id=8b7676e6-9bac-cdf0-4f4c-38513267b666, timestamp=1559065444719}]
2019-05-28 13:44:10.727  INFO 75315 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo2, headers={id=3b5346f8-d007-dd33-bee3-28eed4cfbd00, timestamp=1559065449721}]
2019-05-28 13:44:14.723  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo3, headers={id=84df8665-1aa9-df90-2037-4dd1781b1bf3, timestamp=1559065454723}]
2019-05-28 13:44:19.727  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo4, headers={id=4e81897b-a19c-4789-529a-46266762ccc6, timestamp=1559065459727}]
2019-05-28 13:44:21.733  INFO 75315 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo3, headers={id=84df8665-1aa9-df90-2037-4dd1781b1bf3, timestamp=1559065454723}]
2019-05-28 13:44:21.733  INFO 75315 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo4, headers={id=4e81897b-a19c-4789-529a-46266762ccc6, timestamp=1559065459727}]
2019-05-28 13:44:24.730  INFO 75315 --- [pool-1-thread-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=foo5, headers={id=3fb96f4a-7d25-f94a-23d2-d4a121932554, timestamp=1559065464730}]

Другие вопросы по теме