У меня есть сокет, который подключается к ServerSocketChannel, который переходит к другому селектору. Клиентский сокет отправляет одноразовое сообщение размером 8 байт, я его успешно прочитал, но затем селектор, который я вызываю selectorIO, должен заблокировать метод select(), но он тут же возвращается, а затем перечитывает то же самое сообщение, которое уже было отправлено.
public void readData()
{
int numberOfKeys = 0;
buffer = ByteBuffer.allocate(8);
buffer.clear();
while(true)
{
try
{
//This is not blocking anymore?!
numberOfKeys = selectorIO.select();
Set<SelectionKey> keys = selectorIO.selectedKeys();
Iterator<SelectionKey> itr = keys.iterator();
while(itr.hasNext())
{
SelectionKey key = itr.next();
if (key.isReadable())
{
SocketChannel channel = (SocketChannel)key.channel();
int numread = channel.read(buffer);
String s = new String(buffer.array());
System.out.println(s);
System.out.println(numread);
buffer.flip();
//channel.write(buffer);
int numwrote = channel.write(buffer);
System.out.println(numwrote+" Bytes writtent");
buffer.flip();
//buffer.reset();
}
itr.remove();
}
}
catch(Exception e)
{
System.out.println(e);
}
}
}
@rzwitserloot, это помогло, но теперь цикл выполняется ровно дважды. Выбор заблокирован во второй раз. Любые идеи о том, почему он будет дважды зацикливаться перед блокировкой, а не один раз?
Я думал, что вам нужен OP_WRITE, потому что я собирался использовать SocketChannel для записи...
Вам нужно только OP_WRITE, если вы не смогли записать все данные. Таким образом, вы хотите получать уведомления, когда в буфере отправки сокета становится больше места. Если вам удалось записать все данные, вы не хотите регистрировать OP_WRITE.
Когда вы создаете канал, обычно вы хотите зарегистрировать только OP_READ, а не OP_WRITE.
Ваш последний флип должен быть четким (или компактным). Для получения дополнительной информации см. tutorialspoint.com/java_nio/java_nio_socket_channel.htm




Когда вы вызываете buffer.array() для создания строки, ByteBuffer не имеет ни малейшего представления о том, что байты были израсходованы, поэтому состояние ByteBuffer остается неизменным. Он по-прежнему содержит прочитанные байты и по-прежнему требует их использования. Это вызывает повторное чтение одного и того же сообщения.
Массив, возвращаемый функцией ByteBuffer.array(), не имеет представления о том, сколько полезных байтов доступно. Если массив имеет емкость 10 и установлено только 8 байт, вы пытаетесь создать строку с 2 фиктивными байтами. И если было прочитано только 2 байта, вы пытаетесь создать строку на основе 2 вместо 8 байтов. Подход к созданию строки неверен.
После того, как вы выполните команду channel.write, вы должны выполнить команду compact или clear в зависимости от того, доступны данные или нет.
Обычно я использую 2 отдельных байтовых буфера; один для чтения и один для записи.
Я попытался добавить как очистку, сброс, так и компактность, и он все еще продолжал зацикливаться. Если буфер не зависит от селектора, как и почему он узнает, находится ли буфер в таком состоянии?
Можете ли вы обновить код с этими изменениями @Matthew
Несколько проблем на работе.
Вы создаете ByteBuffer, и, как и все BB, они имеют заданную емкость. Затем вы читаете его (int numRead = channel.read(buffer)), и это делает одну из двух вещей:
В этом случае весь ББ заполнен (numRead будет равно емкости ББ), а статус «ЧИТАТЬ ГОТОВ» этого канала остается активным (поскольку есть еще байты, готовые для копирования).
Обратите внимание, что bb.array() возвращает весь резервный массив, но в этом сценарии, учитывая, что весь BB заполнен до отказа, это, так сказать, «срабатывает».
В этом случае numRead будет меньше, чем общая емкость этого байтового буфера, а new String(bb.array()) сломан — это будет попытка превратить в строку байты, которые вы прочитали до сих пор, и целую кучу мусора в конце.
new String(bb.array(), 0, bb.position()) сделает свою работу, но в целом это не то, как вы должны это делать. Во-первых, вы теперь запутались с кодировкой (вам действительно следует использовать new String(bb.array(), 0, bb.position(), StandardCharsets.UTF_8) - никогда не конвертируйте байты в символы или наоборот, если вы не укажете, какая кодировка используется, иначе система выбирает за вас, и это редко бывает правильно, и всегда сбивает с толку).
Общий способ использования буфера выглядит следующим образом:
Вы заполняете его (channel.read()), затем используете прямую манипуляцию с массивом вместо переворачивания + чтения, чтобы «обработать» его (путем передачи резервного массива в строковый конструктор), а затем вы .flip() это неправильный вызов, который вы хотите .clear().
BB работают таким образом, потому что, ну, логика:
limit не был концом, потому что, возможно, он не был полностью заполнен точно до предела каким-либо процессом, поставившим данные в эту вещь ... мы хотим, чтобы limit была позицией, как она была (поскольку это то, где «процесс, который заполнил буфер» оставил вещи). flip() делает это: устанавливает позицию обратно в 0 и ограничивает позицию до того места, где она была.После того, как вы прочитали данные в свой буфер, а затем обработали эти данные, вы хотите очистить: вы хотите, чтобы позиция вернулась к 0, а предел вернулся к емкости, готовой к тому, чтобы процесс, который заполняет буфер, начал заполнять его снова. clear() делает это. Ваш код вызывает flip(), что неверно.
Селектор настроен на определенную вещь, которая вас «интересует». Когда вы спрашиваете об этом .select(), вы говорите:
Дело в том, что когда вы обрабатываете канал, ваше мнение о том, что вас интересует, со временем меняется, и вам нужно обновлять этот селектор и включать/выключать SelectorKey по мере необходимости.
Например, допустим, вы пишете простую программу чата. Алиса только что вставила половину собрания сочинений Шекспира и теперь ваша программа чата должна отправить все это Бобу. Теперь вы должны включить SelectorKey.OP_WRITE на сетевом канале Боба. Это должно было быть выключено раньше, так как вам нечего было отправить Бобу. Но у вас есть что отправить сейчас, так что включите его.
Затем вы переходите к select(), который, скорее всего, вернется немедленно (на сетевой карте есть свободное место в буфере для подключения Боба). Вы начинаете копировать это собрание сочинений Шекспира в байтовый буфер, но у вас ничего не получается — емкость этого буфера меньше, чем общий размер Шекспира. В том-то и дело, что хорошо. Затем вы передаете этот буфер в сеть и возвращаетесь к выбору, все еще интересуясь OP_WRITE, потому что вы еще не скопировали все собрание сочинений Шекспира, вам пока понравилась только четверть.
В конце концов сеть очистит этот буфер через сетевой кабель, и только тогда ваш селектор скажет: О, эй, мы готовы написать еще немного!
Вы продолжаете выполнять этот процесс (добавляете еще немного шекспировского текста, который вам нужно отправить), пока не запихнете последнее в буфер, который затем передадите каналу. Затем вы должны удалить SelectorKey.OP_WRITE, потому что теперь вам больше не важно, есть ли место в сетевом буфере.
Пока все это происходило, у вас возникла проблема: что, если Алиса будет посылать все больше и больше книг, и она будет отправлять их быстрее, чем Боб сможет их получить? Это, конечно, возможно — может быть, Алиса использует оптоволокно, а Боб — спутниковый телефон. Конечно, вы можете буферизовать всю эту серверную часть, но все ограничено: наступает момент, когда Алиса ставит в очередь 50 ГБ книжного контента, который вам все еще нужно отправить Бобу. Вы можете либо решить, что ваш сервер просто рухнет, если Алиса сделает это, либо вам придется установить ограничение: как только «данные, отправленные Алисой, которые я еще не засунул в канал Боба», достигают определенного количество, вы должны идти: Хорошо, Алиса, не больше.
Когда это происходит, вы должны отменить регистрацию ключа OP_READ — вы знаете, что Алиса прислала вам некоторые данные, готовые для чтения, но вы не хотите их читать, ваши буферы заполнены. Это разумно: если у Боба более медленное соединение, а Алиса отправляет Бобу кучу данных, вы не сможете обрабатывать байты Алисы так же быстро, как она может их отправлять.
Помните также, что .select() может вернуться ложно (без причины). Ваш код не может предполагать, что «о, привет select() возвращено, поэтому здесь ДОЛЖНА быть хотя бы одна готовая вещь». Возможно, нет. Почему он «быстро возвращается дважды»? Потому что JVM разрешено.
Такой низкоуровневый асинхронный NIO обычно приводит к вращению вентиляторов. Это логика:
while (true) {} прокладываете свой путь: делайте все, что я могу сделать. Пишите все, что я могу написать, читайте все, что я могу читать, а затем зацикливайтесь, чтобы делать это снова и снова во веки веков.x.select() — единственное, что останавливает безудержный цикл while. Это единственное место, где вы когда-либо «спите». Это асинхронно: спящий поток — это смерть вашего приложения (в асинхронной модели ничто никогда не может спать, кроме как при выборе).OP_WRITE и нечего писать, селектор всегда будет мгновенно возвращаться.async NIO — это ракетостроение; это действительно очень сложно сделать правильно. Обычно вы хотите использовать фреймворки, которые упрощают работу, такие как grizzly или netty.
Написание низкоуровневого асинхронного кода похоже на написание приложения на низкоуровневом машинном коде. Люди склонны делать это, потому что «думают, что так будет быстрее», но единственное, чего они добиваются, это то, что они делают задачу, на программирование которой уходил час, и превращают ее в задачу, на выполнение которой уходит неделя, конечный результат трудно осуществим. test, глючный беспорядок и на самом деле медленнее, потому что вы не знаете, что делаете, и недооцениваете, насколько хорошо все промежуточные уровни (компилятор, среда выполнения, ОС и т. д.) оптимизируются.
Есть причины перейти на этот низкий уровень (например, когда вы пишете драйвер ядра), но обычно вы этого не делаете.
То же самое. Почему именно здесь вы используете NIO? Это не быстрее, за исключением довольно экзотических обстоятельств, и определенно намного сложнее кодировать его. Java страдает (как и большинство языков) от «проблемы красного/синего» довольно сильно.
У ОС нет абсолютно никаких проблем с обработкой 5000 потоков, и они могут делать это достаточно эффективно. Да, «о, нет, переключение контекста», но обратите внимание, что переключение контекста является неотъемлемой частью одновременной обработки множества подключений: промахи кэша будут частыми, и асинхронность не решит эту проблему. Сообщения в блогах, в которых пишут о том, насколько асинхронность крута, потому что она позволяет избежать «переключения контекста», похоже, забывают, что промах кеша из-за необходимости перехода к буферам другого соединения является таким же «переключением контекста», как и переход потока.
Единственное, о чем вам нужно позаботиться при написании этого кода в многопоточном режиме, который намного, намного проще писать, поддерживать и тестировать, это то, что вы хотите управлять размерами стека вашего потока: вы оба хотите, чтобы ваши потоки используйте ограниченный размер стека (если возникает исключение и трассировка стека заполнена экраном, это проблема), и вы хотите настроить их с ограниченными размерами. Вы можете указать размеры стека при создании потоков (конструктор потоков позволяет это, и различные вещи, которые создают потоки, такие как многопоточный ExecutorPool, позволяют указать либо размер стека, либо замыкание, создающее потоки). Используйте это, и вы сможете просто написать код, который быстро обрабатывает 5000 одновременных подключений, используя 10 000 потоков, и все это гораздо проще написать, чем асинхронный. Если вы должны использовать асинхронность, используйте фреймворк, чтобы избежать осложнений.
Чтобы вернуться к тому, что Алиса отправляет быстрее, чем Боб может получить модель, обратите внимание, насколько это проще:
InputStream заполнить массив байтов.OutputStream отправить байты в этом массиве от 0 до того, сколько вы прочитали.Вот и все. С асинхронным материалом либо Алиса опережает Боба (в этом случае вам лучше отключить OP_READ на соединении Алисы, чтобы закрыть ее ввод), либо Боб опережает Алису (в этом случае вам нужно отключить и включить Боба OP_WRITE), или даже это из-за капризов в скорости сети, иногда Алиса опережает Боба, а иногда Боб опережает Алису.
С синхронизацией, как и выше, все это не имеет значения — вызов Алисы read() или вызов Боба write() по мере необходимости блокирует, и это все исправляет. Видите, насколько это проще?
Вы отменили регистрацию
SelectKey.OP_WRITE? Если вы этого не сделали, вы можете написать, так чтоselect()возвращается немедленно.