Группировка и сортировка в структурированной потоковой передаче Spark

У меня есть вариант использования, где у меня есть набор потоковых данных, такой как номер мобильного телефона, время начала и продолжительность звонка. Мне нужно выполнить группировку по номеру мобильного телефона, отсортировать группу по времени начала и отфильтровать звонки, в которых сумма (время начала + продолжительность) больше следующей суммы (время начала + продолжительность).

Я попробовал Window.partitionby("mobilenumber").orderby("starttime") Но позже понял, что для потоковых наборов данных это не сработает.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
136
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Стандартно это невозможно, если не используется режим complete. Более конкретно:

Операции сортировки поддерживаются для потоковых наборов данных только после агрегирования и в режиме полного вывода.

Основной принцип структурированной потоковой передачи Spark заключается в том, что запрос должен вернуть тот же ответ в потоковом или пакетном режиме. Мы поддерживаем сортировку в полном режиме, потому что у нас есть все данные и мы можем их отсортировать правильно и верните полный ответ. В режиме обновления или добавления сортировка вернет правильный ответ только в том случае, если мы сможем пообещать, что записи с более низким уровнем сортировки поступят позже (а мы не можем). Поэтому это запрещено.

Записывайте данные в таблицу или HDFS, а затем в базу данных с материализованными представлениями, которые могут помочь в аспектах инкрементной сборки. Это предпочтительнее, чем UDAF, который работает - до тех пор, пока вы не измените некоторые операции с состоянием таким образом, что вы потеряете все эти данные с состоянием. При прежнем подходе у вас все еще есть эти данные - сохранились. См. документы. В наши дни эта база данных может быть таблицей Delta.

Я попытался создать udaf и передать столбцы времени начала и продолжительности, собрать их все в arraylist, затем отсортировать этот список и сравнить соседние индексы, но здесь проблема в том, что агрегатные функции возвращают одно значение (но мои условия в наборе данных возвращали бы больше значений), и если мой поток получит новые данные, будет ли весь процесс пересчитан, и правильный ли это способ сортировки?

Harsha 17.04.2024 03:59

Нет. Не со сбором. Просто не поддерживаемый подход.

thebluephantom 17.04.2024 08:31

Ну, не на 100% верно то, что я говорю. UDAF не для pyspark и при перезапуске потока?

thebluephantom 17.04.2024 09:35

Это предпочтительнее, чем UDAF, который работает - до тех пор, пока вы не измените некоторые операции с состоянием таким образом, что вы потеряете все эти данные с состоянием. При прежнем подходе у вас все еще есть эти данные - сохранились. См. документы. Обновленный ответ.

thebluephantom 17.04.2024 17:57

Да, я пробовал использовать udaf, но каждый раз, когда в поток данных добавляется новый файл или данные, необходимо сортировать все данные, при этом производительность снижается.

Harsha 18.04.2024 05:29

Другие вопросы по теме