Apache Spark Group By (получение первых и последних значений в группе)

Я запускаю hadoop в кластере виртуальных машин в своем школьном облаке (честно говоря, не знаю подробностей). Я использую apache spark для общения с hadoop и запуска моего текущего кода.

Я пытался выполнить некоторые агрегации своих данных, чтобы найти общее значение потребления за час/день/месяц (столбец ENERGY_READING из данных)

ПОТРЕБЛЕНИЕ.tsv с некоторыми уже проделанными манипуляциями

+--------+-------------------+----+--------------+
|HOUSE_ID|CONDATE            |HOUR|ENERGY_READING|
+--------+-------------------+----+--------------+
|9       |2015-05-30 00:00:00|0   |11000.001444  |
|9       |2015-05-30 00:00:10|0   |11000.002888  |
|9       |2015-05-30 00:00:20|0   |11000.004332  |
|9       |2015-05-30 00:00:30|0   |11000.005776  |
|9       |2015-05-30 00:00:40|0   |11000.00722   |
|9       |2015-05-30 00:00:50|0   |11000.008664  |
|9       |2015-05-30 00:01:00|0   |11000.010108  |
|9       |2015-05-30 00:01:10|0   |11000.011552  |
|9       |2015-05-30 00:01:20|0   |11000.012996  |
|9       |2015-05-30 00:01:30|0   |11000.01444   |
|9       |2015-05-30 00:01:40|0   |11000.015884  |
|9       |2015-05-30 00:01:50|0   |11000.017328  |
|9       |2015-05-30 00:02:00|0   |11000.018772  |
|9       |2015-05-30 00:02:10|0   |11000.020216  |
|9       |2015-05-30 00:02:20|0   |11000.02166   |
|9       |2015-05-30 00:02:30|0   |11000.023104  |
|9       |2015-05-30 00:02:40|0   |11000.024548  |
|9       |2015-05-30 00:02:50|0   |11000.025992  |
|9       |2015-05-30 00:03:00|0   |11000.027436  |
|9       |2015-05-30 00:03:10|0   |11000.02888   |
+--------+-------------------+----+--------------+

Java-класс

StructType schema = new StructType()
                .add("LOG_ID",IntegerType)
                .add("HOUSE_ID", IntegerType)
                .add("CONDATE", StringType)
                .add("ENERGY_READING", DoubleType)
                .add("FLAG", IntegerType);

        Dataset<Row> data = spark.read()
                .option("header", true)
                .option("delimiter", "\t")
                .option("mode","DROPMALFORMED")
                .schema(schema)
                .csv("hdfs://hd-master:9820/CONSUMPTION.tsv");

        data = data.withColumn("CONDATE", functions.to_timestamp(functions.col("CONDATE"),"yy-MM-dd HH:mm:ss.SSSSSSSSS").cast(TimestampType));

        data = data.withColumn("HOUR", functions.hour(functions.col("CONDATE")));

        Dataset<Row> df = data.select("HOUSE_ID","CONDATE","HOUR","ENERGY_READING");

Таким образом, данные, которые у меня есть, увеличиваются каждые 10 секунд. Я хочу получить первое и последнее значения для каждого часа/дня/месяца.

По сути, я хочу, чтобы первое значение дня было 11000.001444, а последнее значение, скажем, 11000.01444 в данном случае. А затем вычесть второе из первого, чтобы получить общее потребление за этот час/день/месяц.

Что дало бы мне вывод

HOUSE_ID   CONDATE      HOUR       ENERGY_READING
  9        15-05-30      0              0.013
  9        15-05-30      1              ...

Если вы получаете последнее значение за каждый час, вы делаете 00:50 - 00:00, 01:50 - 01:00 и т. д., и вы пропустите энергию, потребленную в период с 00:50 до 01:00, 01: от 50 до 02:00 и т. д. Вместо этого вы хотите вычесть значение в 01:00 из значения в 00:00? (т.е. 01:00 - 00:00) . В этом случае вам понадобится первое значение каждой группы.

mck 14.12.2020 19:31

Я вижу, что ты говоришь. Таким образом, первое значение каждой группы будет вычтено из предыдущей группы. Итак, как мне написать такой код?

Nikster 14.12.2020 19:44

Да, точно. это то, что вы хотите сделать вместо этого?

mck 14.12.2020 19:45

Да, это точно такая же идея, как и то, что я делал, просто другой подход, и в любом случае это даст относительно тот же результат.

Nikster 14.12.2020 19:46
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
4
132
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Приведенный ниже код группирует по минутам и вычисляет потребление за эту минуту:

import org.apache.spark.sql.expressions.Window

Dataset<Row> df2 = df.groupBy(
    functions.col("HOUSE_ID"),
    functions.minute(col("CONDATE")).alias("minute")
).agg(
    functions.min("ENERGY_READING").alias("ENERGY_READING")
).withColumn(
    "LAG_ENERGY_READING",
    functions.lag(functions.col("ENERGY_READING"), 1).over(Window.partitionBy("HOUSE_ID").orderBy("minute"))
).withColumn(
    "consumption",
    functions.expr("ENERGY_READING - LAG_ENERGY_READING")
)

Оператор "-" не может быть применен к "org.apache.spark.sql.Column", "org.apache.spark.sql.Column"

Nikster 14.12.2020 19:53

в самом деле :'(

Nikster 14.12.2020 19:54

да, плохо проверить это. Дай мне пару минут

Nikster 14.12.2020 19:56

не беспокойся. Для меня это немного процесс. Создайте файл jar, затем перенесите его на сервер, а затем запустите, и только сейчас я забыл распечатать вывод.

Nikster 14.12.2020 19:59

Давайте продолжим обсуждение в чате.

mck 14.12.2020 20:00

Другие вопросы по теме