PySpark количество вхождений за дату

У меня есть фрейм данных PySpark, который выглядит так:

+------+-------------------+
|port  |  timestamp        |
+------+-------------------+
|9200  |2020-06-19 02:12:41|
|9200  |2020-06-19 03:54:23|
|51    |2020-06-19 05:32:11|
|22    |2020-06-20 06:07:43|
|22    |2020-06-20 01:11:12|
|51    |2020-06-20 07:38:49|
+------+-------------------+ 

Я пытаюсь найти, сколько раз в день используется отдельный порт.

Например, результирующий фрейм данных должен выглядеть так:

+------------+----------------+
|window      |  ports         |
+------------+----------------+
|2020-06-19  |{9200: 2, 51: 1}|
|2020-06-20  |{22: 2, 51:1 }  |
+------------+----------------+ 

Его точно не нужно хранить в словаре, я просто не уверен, как он должен выглядеть, чтобы захватить все порты за день.

В настоящее время я пробовал следующее:

df.groupBy(window(df['timestamp'], "1 day")).agg(count('port'))

что приводит к:

+------------+----------------+
|window      |  count(port)   |
+------------+----------------+
|2020-06-19  |3               |
|2020-06-20  |3               |
+------------+----------------+ 

Это не то, что я ищу, так как он подсчитывает только количество портов в день и не разбивается по отдельным портам.

Вы ищете решение в Pyspark или pandas?

Mayank Porwal 15.12.2020 16:02

ПиСпарк, пожалуйста!

Ryan 15.12.2020 16:12
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
2
794
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

Сгруппируйте по окну и порту, агрегируйте количество портов, затем сгруппируйте по окну и соберите количество портов в массив.

df.groupBy(
    F.window(df['timestamp'], "1 day"), 'port'
).agg(
    F.array(
        F.col('port'),
        F.count('port')
    ).alias('ports')
).groupBy(
    'window'
).agg(
    F.collect_list('ports').alias('ports')
).withColumn(
    'window',
    F.col('window')['start'].cast('date')
)

+----------+--------------------+
|    window|               ports|
+----------+--------------------+
|2020-06-19|[[9200, 2], [51, 1]]|
|2020-06-20|  [[51, 1], [22, 2]]|
+----------+--------------------+

Спасибо за это, это отлично работает! В качестве дополнения: есть ли способ получить количество только указанных портов? Например, _ports = [9200, 22], что означает, что 51 не включен в фрейм данных.

Ryan 15.12.2020 17:08

да, просто сделайте df = df.filter('port != 51') в самом начале.

mck 15.12.2020 17:09

Если вы намерены получить только количество раз, когда отдельный порт используется в день, то это не что иное, как случай агрегирования количества записей по группе «столбец даты» и «порт».

import pyspark.sql.functions as F
df.groupBy(F.to_date('timestamp').alias('date'),'port').count().orderBy('date','port').show()
+----------+----+-----+
|      date|port|count|
+----------+----+-----+
|2020-06-19|  51|    1|
|2020-06-19|9200|    2|
|2020-06-20|  22|    2|
|2020-06-20|  51|    1|
+----------+----+-----+

Решение Spark-sql:

val df = spark.sql("""
with t1 (
select 9200 x,  '2020-06-19 02:12:41' y union all 
select 9200 x, '2020-06-19 03:54:23' y union all 
select 51 x, '2020-06-19 05:32:11' y union all 
select 22 x, '2020-06-20 06:07:43' y union all 
select 22 x, '2020-06-20 01:11:12' y union all 
select 51 x, '2020-06-20 07:38:49' y
) select x as port, y as timestamp  from t1
""")
df.show(false)

+----+-------------------+
|port|timestamp          |
+----+-------------------+
|9200|2020-06-19 02:12:41|
|9200|2020-06-19 03:54:23|
|51  |2020-06-19 05:32:11|
|22  |2020-06-20 06:07:43|
|22  |2020-06-20 01:11:12|
|51  |2020-06-20 07:38:49|
+----+-------------------+

df.createOrReplaceTempView("logtable")

spark.sql(""" 
select window, collect_set(struct(port,t)) ports from 
( select cast(timestamp as date) window, port, count(port) over(partition by cast(timestamp as date), port ) t from  logtable ) temp 
group by 1
""").show(false)

+----------+--------------------+
|window    |ports               |
+----------+--------------------+
|2020-06-20|[[22, 2], [51, 1]]  |
|2020-06-19|[[9200, 2], [51, 1]]|
+----------+--------------------+

Другие вопросы по теме