У меня есть такой фрейм данных
data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)),
(("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1| A| 1|
|ID1| B| 5|
|ID2| A| 12|
|ID3| A| 3|
|ID3| B| 3|
|ID3| C| 5|
|ID4| A| 10|
+---+----+-----+
Я хочу извлечь только те строки (или идентификаторы), которые содержат только один конкретный тип - "A"
Следовательно, мой ожидаемый результат будет содержать следующие строки
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2| A| 1|
|ID4| A| 10|
+---+----+-----+
Для каждого идентификатора может быть любой тип - A, B, C и т. д. Я хочу извлечь те идентификаторы, которые содержат один и только один тип - «A».
Как я могу добиться этого в PySpark
Это должно сделать -from pyspark.sql.functions import col, when, collect_list, array_contains, size, first
, а затем df = df.groupby(['ID']).agg(first(col('Type')).alias('Type'),first(col('Value')).alias('Value'),collect_list('Type').alias('Type_Arr'))
df = df.where(array_contains(col('Type_Arr'),'A') & (size(col('Type_Arr'))==1)).drop('Type_Arr')
@cph_sto: Ваш ответ правильный, не могли бы вы написать это как ответ, я приму его
@Hardikgupta Только что сделал это, с небольшими изменениями, чтобы решить проблему нескольких A
под одним ID
.
К нему можно применить фильтр.
import pyspark.sql.functions as f
data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)),
(("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1| A| 1|
|ID1| B| 5|
|ID2| A| 12|
|ID3| A| 3|
|ID3| B| 3|
|ID3| C| 5|
|ID4| A| 10|
+---+----+-----+
x= df.filter(f.col('Type')=='A')
x.show()
Если нам нужно отфильтровать все идентификаторы, которые имеют только одну запись, и это тоже с типом «A», то нижеприведенный код может быть решением.
df.registerTempTable('table1')
sqlContext.sql('select a.ID, a.Type,a.Value from table1 as a, (select ID, count(*) as cnt_val from table1 group by ID) b where a.ID = b.ID and (a.Type= = "A" and b.cnt_val ==1)').show()
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2| A| 12|
|ID4| A| 10|
+---+----+-----+
Там будут лучшие альтернативные способы найти то же самое.
Я получаю сообщение об ошибке keyword can't be an expression
x = df.filter(f.col('Тип')=='A'). Должно быть два = символ. Обновил ответ.
Прошу прощения, это не то, о чем спрашивает ОП. Вы просто отфильтровываете строки, в которых есть A
. В то время как OP запрашивает те IDs
, в которых есть только A
AND
ни один из других алфавитов.
@cph_sto, о, да. Обновил ответ. Спасибо за выделение
Я не владею Python, вот возможное решение в Scala:
df.groupBy("ID").agg(collect_set("Type").as("Types"))
.select("ID").where((size($"Types")===1).and(array_contains($"Types", "A"))).show()
+---+
| ID|
+---+
|ID2|
|ID4|
+---+
Идея заключается в группировке по ID
и фильтрации только Types
размера 1, содержащего значение A
.
Агрегация должна быть collect_set
вместо collect_list
на тот случай, если у ID будет несколько экземпляров "A"
По просьбе OP я записываю ответ, который написал в комментариях.
Цель рассматриваемой задачи состоит в том, чтобы отфильтровать DataFrame
где каждый конкретный ID
имеет только элемент Type
A
и ничего другого.
# Loading the requisite packages
from pyspark.sql.functions import col, collect_set, array_contains, size, first
Идея состоит в том, чтобы сначала aggregate()
DataFrame
на ID
сгруппировать все unique
элементы Type
, используя collect_set()
в массиве. Важно иметь элементы unique
, потому что может случиться так, что для конкретного ID
может быть две строки, причем обе строки имеют Type
как A
. Вот почему мы должны использовать collect_set()
, а не collect_list()
, потому что последний будет возвращать не уникальные элементы, а все элементы.
Затем мы должны использовать first()
, чтобы получить первое значение Type
и Value
в группе. В случае, если A
является единственным unique
Type
возможным для конкретного ID
, то first()
вернет единственное значение A
в случае, если A
встречается один раз, и верхнее значение, если есть дубликаты A
.
df = df = df.groupby(['ID']).agg(first(col('Type')).alias('Type'),
first(col('Value')).alias('Value'),
collect_set('Type').alias('Type_Arr'))
df.show()
+---+----+-----+---------+
| ID|Type|Value| Type_Arr|
+---+----+-----+---------+
|ID2| A| 12| [A]|
|ID3| A| 3|[A, B, C]|
|ID1| A| 1| [A, B]|
|ID4| A| 10| [A]|
+---+----+-----+---------+
Наконец, мы поставим 2 условия одновременно, чтобы отфильтровать нужный набор данных.
Условие 1: Проверяет наличие A
в массиве Type
, используя array_contains()
.
Условие 2: Проверяет size
массива. Если размер больше 1, то Types
должно быть несколько.
df = df.where(array_contains(col('Type_Arr'),'A') & (size(col('Type_Arr'))==1)).drop('Type_Arr')
df.show()
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2| A| 12|
|ID4| A| 10|
+---+----+-----+
Умное использование первого.
Добавьте столбец подсчета групп и отфильтруйте, где количество равно 1.