Selector.select() не блокируется после чтения SelectionKey.isReadable

У меня есть сокет, который подключается к ServerSocketChannel, который переходит к другому селектору. Клиентский сокет отправляет одноразовое сообщение размером 8 байт, я его успешно прочитал, но затем селектор, который я вызываю selectorIO, должен заблокировать метод select(), но он тут же возвращается, а затем перечитывает то же самое сообщение, которое уже было отправлено.

public void readData()
    {
        int numberOfKeys = 0;
        buffer = ByteBuffer.allocate(8);
        buffer.clear();
        
        while(true)
        {
            try
            {
                //This is not blocking anymore?!
                numberOfKeys = selectorIO.select();
                
                Set<SelectionKey> keys = selectorIO.selectedKeys();
                Iterator<SelectionKey> itr = keys.iterator();
                
                while(itr.hasNext())
                {
                    SelectionKey key = itr.next();
                    if (key.isReadable())
                    {
                        SocketChannel channel = (SocketChannel)key.channel();
                        
                        int numread = channel.read(buffer);
                    
                        String s = new String(buffer.array());
                        System.out.println(s);
                        System.out.println(numread);
                        
                        
                        buffer.flip();
                        
                        //channel.write(buffer);
                        int numwrote = channel.write(buffer);
                        System.out.println(numwrote+" Bytes writtent");
                    
                    
                        buffer.flip();
                        
                        //buffer.reset();
                    }   
                    itr.remove();
                }
        
            }
            catch(Exception e)
            {
                System.out.println(e);
            }
        }
    }

Вы отменили регистрацию SelectKey.OP_WRITE? Если вы этого не сделали, вы можете написать, так что select() возвращается немедленно.

rzwitserloot 08.10.2022 05:43

@rzwitserloot, это помогло, но теперь цикл выполняется ровно дважды. Выбор заблокирован во второй раз. Любые идеи о том, почему он будет дважды зацикливаться перед блокировкой, а не один раз?

Matthew 08.10.2022 08:03

Я думал, что вам нужен OP_WRITE, потому что я собирался использовать SocketChannel для записи...

Matthew 08.10.2022 08:04

Вам нужно только OP_WRITE, если вы не смогли записать все данные. Таким образом, вы хотите получать уведомления, когда в буфере отправки сокета становится больше места. Если вам удалось записать все данные, вы не хотите регистрировать OP_WRITE.

pveentjer 08.10.2022 08:56

Когда вы создаете канал, обычно вы хотите зарегистрировать только OP_READ, а не OP_WRITE.

pveentjer 08.10.2022 09:00

Ваш последний флип должен быть четким (или компактным). Для получения дополнительной информации см. tutorialspoint.com/java_nio/java_nio_socket_channel.htm

pveentjer 08.10.2022 14:39
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
6
71
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Когда вы вызываете buffer.array() для создания строки, ByteBuffer не имеет ни малейшего представления о том, что байты были израсходованы, поэтому состояние ByteBuffer остается неизменным. Он по-прежнему содержит прочитанные байты и по-прежнему требует их использования. Это вызывает повторное чтение одного и того же сообщения.

Массив, возвращаемый функцией ByteBuffer.array(), не имеет представления о том, сколько полезных байтов доступно. Если массив имеет емкость 10 и установлено только 8 байт, вы пытаетесь создать строку с 2 фиктивными байтами. И если было прочитано только 2 байта, вы пытаетесь создать строку на основе 2 вместо 8 байтов. Подход к созданию строки неверен.

После того, как вы выполните команду channel.write, вы должны выполнить команду compact или clear в зависимости от того, доступны данные или нет.

Обычно я использую 2 отдельных байтовых буфера; один для чтения и один для записи.

Я попытался добавить как очистку, сброс, так и компактность, и он все еще продолжал зацикливаться. Если буфер не зависит от селектора, как и почему он узнает, находится ли буфер в таком состоянии?

Matthew 08.10.2022 07:38

Можете ли вы обновить код с этими изменениями @Matthew

pveentjer 08.10.2022 07:42
Ответ принят как подходящий

Несколько проблем на работе.

Управление буфером

Создание строки не работает

Вы создаете ByteBuffer, и, как и все BB, они имеют заданную емкость. Затем вы читаете его (int numRead = channel.read(buffer)), и это делает одну из двух вещей:

  1. Емкость ББ меньше, чем количество байтов, которые можно сразу запихнуть в этот буфер, скопировав прямо из буферов вашей сетевой карты.

В этом случае весь ББ заполнен (numRead будет равно емкости ББ), а статус «ЧИТАТЬ ГОТОВ» этого канала остается активным (поскольку есть еще байты, готовые для копирования).

Обратите внимание, что bb.array() возвращает весь резервный массив, но в этом сценарии, учитывая, что весь BB заполнен до отказа, это, так сказать, «срабатывает».

  1. Емкость ББ больше, чем количество байт, которое можно сразу запихнуть в тот буфер.

В этом случае numRead будет меньше, чем общая емкость этого байтового буфера, а new String(bb.array()) сломан — это будет попытка превратить в строку байты, которые вы прочитали до сих пор, и целую кучу мусора в конце.

new String(bb.array(), 0, bb.position()) сделает свою работу, но в целом это не то, как вы должны это делать. Во-первых, вы теперь запутались с кодировкой (вам действительно следует использовать new String(bb.array(), 0, bb.position(), StandardCharsets.UTF_8) - никогда не конвертируйте байты в символы или наоборот, если вы не укажете, какая кодировка используется, иначе система выбирает за вас, и это редко бывает правильно, и всегда сбивает с толку).

Нет правильного сброса

Общий способ использования буфера выглядит следующим образом:

  • заполните его (либо вы его заполните, либо вы вызовете read() для чего-то).
  • переверните его.
  • процесс с ним (либо вы отдаете его чему-то, что отправляет данные в нем, либо вы проходите по байтам).
  • очистить его.
  • повторение.

Вы заполняете его (channel.read()), затем используете прямую манипуляцию с массивом вместо переворачивания + чтения, чтобы «обработать» его (путем передачи резервного массива в строковый конструктор), а затем вы .flip() это неправильный вызов, который вы хотите .clear().

BB работают таким образом, потому что, ну, логика:

  • У них есть установленная емкость, и вы не обязательно используете всю эту емкость. Часто вы используете немного меньше. Итак, вы сначала заполняете его, а затем вы хотите, чтобы этот BB разрешал вводить данные от 0 до емкости: «позиция» равна 0 (и когда мы заполняем эту вещь, она обновляется), «лимит» установлен на емкость.
  • Затем, чтобы обработать его, мы хотим, чтобы position снова было 0 (мы начинаем обработку с начала, конечно), но мы хотим, чтобы limit не был концом, потому что, возможно, он не был полностью заполнен точно до предела каким-либо процессом, поставившим данные в эту вещь ... мы хотим, чтобы limit была позицией, как она была (поскольку это то, где «процесс, который заполнил буфер» оставил вещи). flip() делает это: устанавливает позицию обратно в 0 и ограничивает позицию до того места, где она была.

После того, как вы прочитали данные в свой буфер, а затем обработали эти данные, вы хотите очистить: вы хотите, чтобы позиция вернулась к 0, а предел вернулся к емкости, готовой к тому, чтобы процесс, который заполняет буфер, начал заполнять его снова. clear() делает это. Ваш код вызывает flip(), что неверно.

Путаница с селекторами

Селектор настроен на определенную вещь, которая вас «интересует». Когда вы спрашиваете об этом .select(), вы говорите:

  1. Возможно ли что-то из того, что меня интересует, прямо сейчас? Если да, немедленно возвращайтесь.
  2. Если нет, иди спать, пока не станет возможным то, что меня интересует.

Дело в том, что когда вы обрабатываете канал, ваше мнение о том, что вас интересует, со временем меняется, и вам нужно обновлять этот селектор и включать/выключать SelectorKey по мере необходимости.

Например, допустим, вы пишете простую программу чата. Алиса только что вставила половину собрания сочинений Шекспира и теперь ваша программа чата должна отправить все это Бобу. Теперь вы должны включить SelectorKey.OP_WRITE на сетевом канале Боба. Это должно было быть выключено раньше, так как вам нечего было отправить Бобу. Но у вас есть что отправить сейчас, так что включите его.

Затем вы переходите к select(), который, скорее всего, вернется немедленно (на сетевой карте есть свободное место в буфере для подключения Боба). Вы начинаете копировать это собрание сочинений Шекспира в байтовый буфер, но у вас ничего не получается — емкость этого буфера меньше, чем общий размер Шекспира. В том-то и дело, что хорошо. Затем вы передаете этот буфер в сеть и возвращаетесь к выбору, все еще интересуясь OP_WRITE, потому что вы еще не скопировали все собрание сочинений Шекспира, вам пока понравилась только четверть.

В конце концов сеть очистит этот буфер через сетевой кабель, и только тогда ваш селектор скажет: О, эй, мы готовы написать еще немного!

Вы продолжаете выполнять этот процесс (добавляете еще немного шекспировского текста, который вам нужно отправить), пока не запихнете последнее в буфер, который затем передадите каналу. Затем вы должны удалить SelectorKey.OP_WRITE, потому что теперь вам больше не важно, есть ли место в сетевом буфере.

Пока все это происходило, у вас возникла проблема: что, если Алиса будет посылать все больше и больше книг, и она будет отправлять их быстрее, чем Боб сможет их получить? Это, конечно, возможно — может быть, Алиса использует оптоволокно, а Боб — спутниковый телефон. Конечно, вы можете буферизовать всю эту серверную часть, но все ограничено: наступает момент, когда Алиса ставит в очередь 50 ГБ книжного контента, который вам все еще нужно отправить Бобу. Вы можете либо решить, что ваш сервер просто рухнет, если Алиса сделает это, либо вам придется установить ограничение: как только «данные, отправленные Алисой, которые я еще не засунул в канал Боба», достигают определенного количество, вы должны идти: Хорошо, Алиса, не больше.

Когда это происходит, вы должны отменить регистрацию ключа OP_READ — вы знаете, что Алиса прислала вам некоторые данные, готовые для чтения, но вы не хотите их читать, ваши буферы заполнены. Это разумно: если у Боба более медленное соединение, а Алиса отправляет Бобу кучу данных, вы не сможете обрабатывать байты Алисы так же быстро, как она может их отправлять.

Помните также, что .select() может вернуться ложно (без причины). Ваш код не может предполагать, что «о, привет select() возвращено, поэтому здесь ДОЛЖНА быть хотя бы одна готовая вещь». Возможно, нет. Почему он «быстро возвращается дважды»? Потому что JVM разрешено.

Такой низкоуровневый асинхронный NIO обычно приводит к вращению вентиляторов. Это логика:

  1. Ваш цикл кода работает очень просто: вы while (true) {} прокладываете свой путь: делайте все, что я могу сделать. Пишите все, что я могу написать, читайте все, что я могу читать, а затем зацикливайтесь, чтобы делать это снова и снова во веки веков.
  2. Вызов x.select() — единственное, что останавливает безудержный цикл while. Это единственное место, где вы когда-либо «спите». Это асинхронно: спящий поток — это смерть вашего приложения (в асинхронной модели ничто никогда не может спать, кроме как при выборе).
  3. Если селектор настроен неправильно, например, вы зарегистрировались OP_WRITE и нечего писать, селектор всегда будет мгновенно возвращаться.
  4. Таким образом, ваш код выходит из-под контроля: он постоянно зацикливается, никогда не спит, что приводит к 100%-ной загрузке ЦП, включаются вентиляторы, ноутбуки разряжают свои батареи за считанные минуты, энергия тратится впустую, вещи нагреваются, затраты на IAAS зашкаливают.

async NIO — это ракетостроение; это действительно очень сложно сделать правильно. Обычно вы хотите использовать фреймворки, которые упрощают работу, такие как grizzly или netty.

Вероятный: сосредоточение внимания не на том

Написание низкоуровневого асинхронного кода похоже на написание приложения на низкоуровневом машинном коде. Люди склонны делать это, потому что «думают, что так будет быстрее», но единственное, чего они добиваются, это то, что они делают задачу, на программирование которой уходил час, и превращают ее в задачу, на выполнение которой уходит неделя, конечный результат трудно осуществим. test, глючный беспорядок и на самом деле медленнее, потому что вы не знаете, что делаете, и недооцениваете, насколько хорошо все промежуточные уровни (компилятор, среда выполнения, ОС и т. д.) оптимизируются.

Есть причины перейти на этот низкий уровень (например, когда вы пишете драйвер ядра), но обычно вы этого не делаете.

То же самое. Почему именно здесь вы используете NIO? Это не быстрее, за исключением довольно экзотических обстоятельств, и определенно намного сложнее кодировать его. Java страдает (как и большинство языков) от «проблемы красного/синего» довольно сильно.

У ОС нет абсолютно никаких проблем с обработкой 5000 потоков, и они могут делать это достаточно эффективно. Да, «о, нет, переключение контекста», но обратите внимание, что переключение контекста является неотъемлемой частью одновременной обработки множества подключений: промахи кэша будут частыми, и асинхронность не решит эту проблему. Сообщения в блогах, в которых пишут о том, насколько асинхронность крута, потому что она позволяет избежать «переключения контекста», похоже, забывают, что промах кеша из-за необходимости перехода к буферам другого соединения является таким же «переключением контекста», как и переход потока.

Единственное, о чем вам нужно позаботиться при написании этого кода в многопоточном режиме, который намного, намного проще писать, поддерживать и тестировать, это то, что вы хотите управлять размерами стека вашего потока: вы оба хотите, чтобы ваши потоки используйте ограниченный размер стека (если возникает исключение и трассировка стека заполнена экраном, это проблема), и вы хотите настроить их с ограниченными размерами. Вы можете указать размеры стека при создании потоков (конструктор потоков позволяет это, и различные вещи, которые создают потоки, такие как многопоточный ExecutorPool, позволяют указать либо размер стека, либо замыкание, создающее потоки). Используйте это, и вы сможете просто написать код, который быстро обрабатывает 5000 одновременных подключений, используя 10 000 потоков, и все это гораздо проще написать, чем асинхронный. Если вы должны использовать асинхронность, используйте фреймворк, чтобы избежать осложнений.

Чтобы вернуться к тому, что Алиса отправляет быстрее, чем Боб может получить модель, обратите внимание, насколько это проще:

  1. Вы просите Алису InputStream заполнить массив байтов.
  2. Затем вы просите Боба OutputStream отправить байты в этом массиве от 0 до того, сколько вы прочитали.
  3. Вернитесь к 1.

Вот и все. С асинхронным материалом либо Алиса опережает Боба (в этом случае вам лучше отключить OP_READ на соединении Алисы, чтобы закрыть ее ввод), либо Боб опережает Алису (в этом случае вам нужно отключить и включить Боба OP_WRITE), или даже это из-за капризов в скорости сети, иногда Алиса опережает Боба, а иногда Боб опережает Алису.

С синхронизацией, как и выше, все это не имеет значения — вызов Алисы read() или вызов Боба write() по мере необходимости блокирует, и это все исправляет. Видите, насколько это проще?

Другие вопросы по теме