Извлечь определенные строки в PySpark

У меня есть такой фрейм данных

data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)), 
       (("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1|   A|    1|
|ID1|   B|    5|
|ID2|   A|   12|
|ID3|   A|    3|
|ID3|   B|    3|
|ID3|   C|    5|
|ID4|   A|   10|
+---+----+-----+

Я хочу извлечь только те строки (или идентификаторы), которые содержат только один конкретный тип - "A"

Следовательно, мой ожидаемый результат будет содержать следующие строки

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|    1|
|ID4|   A|   10|
+---+----+-----+

Для каждого идентификатора может быть любой тип - A, B, C и т. д. Я хочу извлечь те идентификаторы, которые содержат один и только один тип - «A».

Как я могу добиться этого в PySpark

Добавьте столбец подсчета групп и отфильтруйте, где количество равно 1.

pault 09.04.2019 17:48

Это должно сделать -from pyspark.sql.functions import col, when, collect_list, array_contains, size, first, а затем df = df.groupby(['ID']).agg(first(col('Type')).alias('Type'),firs‌​t(col('Value')).alia‌​s('Value'),collect_l‌​ist('Type').alias('T‌​ype_Arr'))

cph_sto 09.04.2019 17:54
df = df.where(array_contains(col('Type_Arr'),'A') & (size(col('Type_Arr'))==1)).drop('Type_Arr')
cph_sto 09.04.2019 17:54

@cph_sto: Ваш ответ правильный, не могли бы вы написать это как ответ, я приму его

Hardik Gupta 10.04.2019 04:31

@Hardikgupta Только что сделал это, с небольшими изменениями, чтобы решить проблему нескольких A под одним ID.

cph_sto 10.04.2019 08:48
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
5
5 150
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

К нему можно применить фильтр.

import pyspark.sql.functions as f

data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)), 
       (("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1|   A|    1|
|ID1|   B|    5|
|ID2|   A|   12|
|ID3|   A|    3|
|ID3|   B|    3|
|ID3|   C|    5|
|ID4|   A|   10|
+---+----+-----+

x= df.filter(f.col('Type')=='A')

x.show()

Если нам нужно отфильтровать все идентификаторы, которые имеют только одну запись, и это тоже с типом «A», то нижеприведенный код может быть решением.


df.registerTempTable('table1')


sqlContext.sql('select a.ID, a.Type,a.Value from table1 as a, (select ID, count(*) as cnt_val from table1 group by ID) b where a.ID = b.ID and (a.Type= = "A" and b.cnt_val ==1)').show()


+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|   12|
|ID4|   A|   10|
+---+----+-----+


Там будут лучшие альтернативные способы найти то же самое.

Я получаю сообщение об ошибке keyword can't be an expression

Hardik Gupta 09.04.2019 16:47

x = df.filter(f.col('Тип')=='A'). Должно быть два = символ. Обновил ответ.

rightjoin 09.04.2019 17:39

Прошу прощения, это не то, о чем спрашивает ОП. Вы просто отфильтровываете строки, в которых есть A. В то время как OP запрашивает те IDs, в которых есть только AAND ни один из других алфавитов.

cph_sto 09.04.2019 17:42

@cph_sto, о, да. Обновил ответ. Спасибо за выделение

rightjoin 09.04.2019 18:50

Я не владею Python, вот возможное решение в Scala:

df.groupBy("ID").agg(collect_set("Type").as("Types"))
  .select("ID").where((size($"Types")===1).and(array_contains($"Types", "A"))).show()
+---+
| ID|
+---+
|ID2|
|ID4|
+---+

Идея заключается в группировке по ID и фильтрации только Types размера 1, содержащего значение A.

Агрегация должна быть collect_set вместо collect_list на тот случай, если у ID будет несколько экземпляров "A"

D3V 10.04.2019 11:24
Ответ принят как подходящий

По просьбе OP я записываю ответ, который написал в комментариях.

Цель рассматриваемой задачи состоит в том, чтобы отфильтровать DataFrameгде каждый конкретный ID имеет только элемент TypeA и ничего другого.

# Loading the requisite packages
from pyspark.sql.functions import col, collect_set, array_contains, size, first

Идея состоит в том, чтобы сначала aggregate()DataFrame на ID сгруппировать все unique элементы Type, используя collect_set() в массиве. Важно иметь элементы unique, потому что может случиться так, что для конкретного ID может быть две строки, причем обе строки имеют Type как A. Вот почему мы должны использовать collect_set(), а не collect_list(), потому что последний будет возвращать не уникальные элементы, а все элементы.

Затем мы должны использовать first(), чтобы получить первое значение Type и Value в группе. В случае, если A является единственным uniqueType возможным для конкретного ID, то first() вернет единственное значение A в случае, если A встречается один раз, и верхнее значение, если есть дубликаты A.

df = df = df.groupby(['ID']).agg(first(col('Type')).alias('Type'),
                                 first(col('Value')).alias('Value'),
                                 collect_set('Type').alias('Type_Arr'))
df.show()
+---+----+-----+---------+
| ID|Type|Value| Type_Arr|
+---+----+-----+---------+
|ID2|   A|   12|      [A]|
|ID3|   A|    3|[A, B, C]|
|ID1|   A|    1|   [A, B]|
|ID4|   A|   10|      [A]|
+---+----+-----+---------+

Наконец, мы поставим 2 условия одновременно, чтобы отфильтровать нужный набор данных.

Условие 1: Проверяет наличие A в массиве Type, используя array_contains().

Условие 2: Проверяет size массива. Если размер больше 1, то Types должно быть несколько.

df = df.where(array_contains(col('Type_Arr'),'A') & (size(col('Type_Arr'))==1)).drop('Type_Arr')
df.show()
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|   12|
|ID4|   A|   10|
+---+----+-----+

Умное использование первого.

D3V 10.04.2019 11:27

Другие вопросы по теме