Я запускаю hadoop в кластере виртуальных машин в своем школьном облаке (честно говоря, не знаю подробностей). Я использую apache spark для общения с hadoop и запуска моего текущего кода.
Я пытался выполнить некоторые агрегации своих данных, чтобы найти общее значение потребления за час/день/месяц (столбец ENERGY_READING из данных)
ПОТРЕБЛЕНИЕ.tsv с некоторыми уже проделанными манипуляциями
+--------+-------------------+----+--------------+
|HOUSE_ID|CONDATE |HOUR|ENERGY_READING|
+--------+-------------------+----+--------------+
|9 |2015-05-30 00:00:00|0 |11000.001444 |
|9 |2015-05-30 00:00:10|0 |11000.002888 |
|9 |2015-05-30 00:00:20|0 |11000.004332 |
|9 |2015-05-30 00:00:30|0 |11000.005776 |
|9 |2015-05-30 00:00:40|0 |11000.00722 |
|9 |2015-05-30 00:00:50|0 |11000.008664 |
|9 |2015-05-30 00:01:00|0 |11000.010108 |
|9 |2015-05-30 00:01:10|0 |11000.011552 |
|9 |2015-05-30 00:01:20|0 |11000.012996 |
|9 |2015-05-30 00:01:30|0 |11000.01444 |
|9 |2015-05-30 00:01:40|0 |11000.015884 |
|9 |2015-05-30 00:01:50|0 |11000.017328 |
|9 |2015-05-30 00:02:00|0 |11000.018772 |
|9 |2015-05-30 00:02:10|0 |11000.020216 |
|9 |2015-05-30 00:02:20|0 |11000.02166 |
|9 |2015-05-30 00:02:30|0 |11000.023104 |
|9 |2015-05-30 00:02:40|0 |11000.024548 |
|9 |2015-05-30 00:02:50|0 |11000.025992 |
|9 |2015-05-30 00:03:00|0 |11000.027436 |
|9 |2015-05-30 00:03:10|0 |11000.02888 |
+--------+-------------------+----+--------------+
Java-класс
StructType schema = new StructType()
.add("LOG_ID",IntegerType)
.add("HOUSE_ID", IntegerType)
.add("CONDATE", StringType)
.add("ENERGY_READING", DoubleType)
.add("FLAG", IntegerType);
Dataset<Row> data = spark.read()
.option("header", true)
.option("delimiter", "\t")
.option("mode","DROPMALFORMED")
.schema(schema)
.csv("hdfs://hd-master:9820/CONSUMPTION.tsv");
data = data.withColumn("CONDATE", functions.to_timestamp(functions.col("CONDATE"),"yy-MM-dd HH:mm:ss.SSSSSSSSS").cast(TimestampType));
data = data.withColumn("HOUR", functions.hour(functions.col("CONDATE")));
Dataset<Row> df = data.select("HOUSE_ID","CONDATE","HOUR","ENERGY_READING");
Таким образом, данные, которые у меня есть, увеличиваются каждые 10 секунд. Я хочу получить первое и последнее значения для каждого часа/дня/месяца.
По сути, я хочу, чтобы первое значение дня было 11000.001444, а последнее значение, скажем, 11000.01444 в данном случае. А затем вычесть второе из первого, чтобы получить общее потребление за этот час/день/месяц.
Что дало бы мне вывод
HOUSE_ID CONDATE HOUR ENERGY_READING
9 15-05-30 0 0.013
9 15-05-30 1 ...
Я вижу, что ты говоришь. Таким образом, первое значение каждой группы будет вычтено из предыдущей группы. Итак, как мне написать такой код?
Да, точно. это то, что вы хотите сделать вместо этого?
Да, это точно такая же идея, как и то, что я делал, просто другой подход, и в любом случае это даст относительно тот же результат.
Приведенный ниже код группирует по минутам и вычисляет потребление за эту минуту:
import org.apache.spark.sql.expressions.Window
Dataset<Row> df2 = df.groupBy(
functions.col("HOUSE_ID"),
functions.minute(col("CONDATE")).alias("minute")
).agg(
functions.min("ENERGY_READING").alias("ENERGY_READING")
).withColumn(
"LAG_ENERGY_READING",
functions.lag(functions.col("ENERGY_READING"), 1).over(Window.partitionBy("HOUSE_ID").orderBy("minute"))
).withColumn(
"consumption",
functions.expr("ENERGY_READING - LAG_ENERGY_READING")
)
Оператор "-" не может быть применен к "org.apache.spark.sql.Column", "org.apache.spark.sql.Column"
в самом деле :'(
да, плохо проверить это. Дай мне пару минут
не беспокойся. Для меня это немного процесс. Создайте файл jar, затем перенесите его на сервер, а затем запустите, и только сейчас я забыл распечатать вывод.
Давайте продолжим обсуждение в чате.
Если вы получаете последнее значение за каждый час, вы делаете 00:50 - 00:00, 01:50 - 01:00 и т. д., и вы пропустите энергию, потребленную в период с 00:50 до 01:00, 01: от 50 до 02:00 и т. д. Вместо этого вы хотите вычесть значение в 01:00 из значения в 00:00? (т.е. 01:00 - 00:00) . В этом случае вам понадобится первое значение каждой группы.