У меня есть вариант использования, где у меня есть набор потоковых данных, такой как номер мобильного телефона, время начала и продолжительность звонка. Мне нужно выполнить группировку по номеру мобильного телефона, отсортировать группу по времени начала и отфильтровать звонки, в которых сумма (время начала + продолжительность) больше следующей суммы (время начала + продолжительность).
Я попробовал Window.partitionby("mobilenumber").orderby("starttime") Но позже понял, что для потоковых наборов данных это не сработает.
Стандартно это невозможно, если не используется режим complete
. Более конкретно:
Операции сортировки поддерживаются для потоковых наборов данных только после агрегирования и в режиме полного вывода.
Основной принцип структурированной потоковой передачи Spark заключается в том, что запрос должен вернуть тот же ответ в потоковом или пакетном режиме. Мы поддерживаем сортировку в полном режиме, потому что у нас есть все данные и мы можем их отсортировать правильно и верните полный ответ. В режиме обновления или добавления сортировка вернет правильный ответ только в том случае, если мы сможем пообещать, что записи с более низким уровнем сортировки поступят позже (а мы не можем). Поэтому это запрещено.
Записывайте данные в таблицу или HDFS, а затем в базу данных с материализованными представлениями, которые могут помочь в аспектах инкрементной сборки. Это предпочтительнее, чем UDAF, который работает - до тех пор, пока вы не измените некоторые операции с состоянием таким образом, что вы потеряете все эти данные с состоянием. При прежнем подходе у вас все еще есть эти данные - сохранились. См. документы. В наши дни эта база данных может быть таблицей Delta.
Нет. Не со сбором. Просто не поддерживаемый подход.
Ну, не на 100% верно то, что я говорю. UDAF не для pyspark и при перезапуске потока?
Это предпочтительнее, чем UDAF, который работает - до тех пор, пока вы не измените некоторые операции с состоянием таким образом, что вы потеряете все эти данные с состоянием. При прежнем подходе у вас все еще есть эти данные - сохранились. См. документы. Обновленный ответ.
Да, я пробовал использовать udaf, но каждый раз, когда в поток данных добавляется новый файл или данные, необходимо сортировать все данные, при этом производительность снижается.
Я попытался создать udaf и передать столбцы времени начала и продолжительности, собрать их все в arraylist, затем отсортировать этот список и сравнить соседние индексы, но здесь проблема в том, что агрегатные функции возвращают одно значение (но мои условия в наборе данных возвращали бы больше значений), и если мой поток получит новые данные, будет ли весь процесс пересчитан, и правильный ли это способ сортировки?