В Apache Beam Python SDK можно выполнить следующее:
input
| GroupBy(account=lambda s: s["account"])
.aggregate_field(lambda x: x["wordsAddup"] - x["wordsSubtract"], sum, 'wordsRead')
Как мы можем выполнить подобное действие в Java SDK? Как ни странно, в руководстве по программированию есть только примеры на Python для этого преобразования.
Вот моя попытка создать эквивалент на Java:
input.apply(
Group.byFieldNames("account")
.aggregateField(<INSERT EQUIVALENT HERE>, Sum.ofIntegers(), "wordsRead"));
Есть несколько примеров Java на https://beam.apache.org/documentation/programming-guide/#using-schemas. (Обратите внимание, что вам, возможно, придется выбрать вкладку java
в селекторе, в котором есть как Java, так и Python, чтобы увидеть их.)
Я не думаю, что в Java первый аргумент агрегатного поля может принимать произвольное выражение; это должно быть имя поля. Вы можете продолжить операцию группировки с проекцией, которая добавляет новое поле для желаемого выражения. Например
input
.apply(SqlTransform.query(
"SELECT *, wordsAddup - wordsSubtract AS wordsDiff from PCOLLECTION")
.apply(Group.byFieldNames("account")
.aggregateField("wordsDiff", Sum.ofIntegers(), "wordsRead"));
Добавил пример, надеюсь поможет.
Спасибо. Хотя это главная страница руководства по программированию. Похоже, что нет эквивалентной отдельной страницы, на которой подробно рассматриваются подробные примеры одного и того же преобразования как такового, как для Python (см. Гиперссылку в вопросе). Я был бы признателен, если бы вы могли уточнить, что вы подразумеваете под «вы можете продолжить операцию группировки с проекцией, которая добавляет новое поле для желаемого выражения». Вы имеете в виду целый алгоритм PTransforms для получения нужного столбца? Я отмечу это как правильный ответ, если есть пример кода, выполняющий нужную задачу.