У меня есть фрейм данных PySpark, который выглядит так:
+------+-------------------+
|port | timestamp |
+------+-------------------+
|9200 |2020-06-19 02:12:41|
|9200 |2020-06-19 03:54:23|
|51 |2020-06-19 05:32:11|
|22 |2020-06-20 06:07:43|
|22 |2020-06-20 01:11:12|
|51 |2020-06-20 07:38:49|
+------+-------------------+
Я пытаюсь найти, сколько раз в день используется отдельный порт.
Например, результирующий фрейм данных должен выглядеть так:
+------------+----------------+
|window | ports |
+------------+----------------+
|2020-06-19 |{9200: 2, 51: 1}|
|2020-06-20 |{22: 2, 51:1 } |
+------------+----------------+
Его точно не нужно хранить в словаре, я просто не уверен, как он должен выглядеть, чтобы захватить все порты за день.
В настоящее время я пробовал следующее:
df.groupBy(window(df['timestamp'], "1 day")).agg(count('port'))
что приводит к:
+------------+----------------+
|window | count(port) |
+------------+----------------+
|2020-06-19 |3 |
|2020-06-20 |3 |
+------------+----------------+
Это не то, что я ищу, так как он подсчитывает только количество портов в день и не разбивается по отдельным портам.
ПиСпарк, пожалуйста!
Сгруппируйте по окну и порту, агрегируйте количество портов, затем сгруппируйте по окну и соберите количество портов в массив.
df.groupBy(
F.window(df['timestamp'], "1 day"), 'port'
).agg(
F.array(
F.col('port'),
F.count('port')
).alias('ports')
).groupBy(
'window'
).agg(
F.collect_list('ports').alias('ports')
).withColumn(
'window',
F.col('window')['start'].cast('date')
)
+----------+--------------------+
| window| ports|
+----------+--------------------+
|2020-06-19|[[9200, 2], [51, 1]]|
|2020-06-20| [[51, 1], [22, 2]]|
+----------+--------------------+
Спасибо за это, это отлично работает! В качестве дополнения: есть ли способ получить количество только указанных портов? Например, _ports = [9200, 22], что означает, что 51 не включен в фрейм данных.
да, просто сделайте df = df.filter('port != 51')
в самом начале.
Если вы намерены получить только количество раз, когда отдельный порт используется в день, то это не что иное, как случай агрегирования количества записей по группе «столбец даты» и «порт».
import pyspark.sql.functions as F
df.groupBy(F.to_date('timestamp').alias('date'),'port').count().orderBy('date','port').show()
+----------+----+-----+
| date|port|count|
+----------+----+-----+
|2020-06-19| 51| 1|
|2020-06-19|9200| 2|
|2020-06-20| 22| 2|
|2020-06-20| 51| 1|
+----------+----+-----+
Решение Spark-sql:
val df = spark.sql("""
with t1 (
select 9200 x, '2020-06-19 02:12:41' y union all
select 9200 x, '2020-06-19 03:54:23' y union all
select 51 x, '2020-06-19 05:32:11' y union all
select 22 x, '2020-06-20 06:07:43' y union all
select 22 x, '2020-06-20 01:11:12' y union all
select 51 x, '2020-06-20 07:38:49' y
) select x as port, y as timestamp from t1
""")
df.show(false)
+----+-------------------+
|port|timestamp |
+----+-------------------+
|9200|2020-06-19 02:12:41|
|9200|2020-06-19 03:54:23|
|51 |2020-06-19 05:32:11|
|22 |2020-06-20 06:07:43|
|22 |2020-06-20 01:11:12|
|51 |2020-06-20 07:38:49|
+----+-------------------+
df.createOrReplaceTempView("logtable")
spark.sql("""
select window, collect_set(struct(port,t)) ports from
( select cast(timestamp as date) window, port, count(port) over(partition by cast(timestamp as date), port ) t from logtable ) temp
group by 1
""").show(false)
+----------+--------------------+
|window |ports |
+----------+--------------------+
|2020-06-20|[[22, 2], [51, 1]] |
|2020-06-19|[[9200, 2], [51, 1]]|
+----------+--------------------+
Вы ищете решение в
Pyspark
илиpandas
?