Список столбцов, отвечающих определенному условию

Работа с Python 3.7.9, Spark 2.4.5 Я пытаюсь вручную «попробовать проанализировать» заданное подмножество столбцов из строки в целое число, а затем добавить два дополнительных столбца в фреймворк данных:

  • _num_invalid_columns: с количеством столбцов, которые не удалось проанализировать (обозначается как -9999).
  • _invalid_colums_list: список столбцов, разделенных запятыми или вертикальной чертой, которые не удалось проанализировать.

Мне удалось рассчитать "_num_invalid_columns", но у меня возникли проблемы с "_invalid_columns_list". Код для воспроизведения ниже, я уменьшил его насколько это возможно.

'''
Uncomment these 2 lines if using Jupyter Notebook
import findspark
findspark.init()
'''
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as F

def tryparse_integer(integer_str):    
    '''
    if integer_str is None returns None as IntegerType()
    if integer_str is not None and cannot be parsed to integer, returns -9999 as IntegerType()
    if integer_str is not None and can be parsed to integer, returns integer_str as IntegerType()
    (that way we can tell between null data and invalid data)
    '''
    return F.when(F.isnull(integer_str), F.lit(None).cast(IntegerType())) \
        .otherwise( \
            F.when(F.isnull(integer_str.cast(IntegerType())), F.lit(-9999).cast(IntegerType())) \
            .otherwise(integer_str.cast(IntegerType())) \
        ) 
        
def is_invalid_number(col):
    return F.when(col == -9999, 1).otherwise(0)

spark = SparkSession.builder.appName("RandallTest").getOrCreate()

data = [('1', '2','hello'), ('error','error','hello'), ('error','2','hello')]

schema = StructType([
  StructField('column1', StringType()),
  StructField('column2', StringType()),
  StructField('column3', StringType())
  ])

df = spark.createDataFrame(data, schema = schema)

df.printSchema()

integerColumns = ['column1','column2']

df_parsed = df.select(*[
    tryparse_integer(F.col(colName)).alias(colName) if (colName in integerColumns)
    else colName
    for colName in df.columns])  

df_parsed.printSchema()

df_parsed_with_errorcount = df_parsed \
    .withColumn('_num_invalid_columns', sum(
    is_invalid_number(F.col(colName)) if (colName in integerColumns) 
    else 0
    for colName in df_parsed.columns)) \
    .withColumn('_invalid_columns_list', F.lit('--'.join(filter(None, (
    ##Not what I need, but works:
    colName if (colName in integerColumns)  
    ##Not working if I uncomment the actual logic I want. Something like any of these lines (59, 60 or 61 all produce errors)
    ##colName if (colName in integerColumns and F.col(colName) == -9999)
    ##colName if (colName in integerColumns & F.col(colName) == -9999)
    ##colName if (colName in integerColumns & is_invalid_number(F.col(colName)) == 1)
    else None
    for colName in df_parsed.columns)))))   

df_parsed_with_errorcount.show()
df_parsed_with_errorcount.take(10)

Пример ввода:

column1     column2     column3     
'1'         '2'         'hello'        
'error'     'error'     'hello'     
'error'     '2'         'hello'     

Столбцы для "попробовать проанализировать": столбец1, столбец2

Ожидаемый результат:

column1     column2     column3     _num_invalid_columns    _invalid_columns_list
1           2           'hello'     0   
-9999       -9999       'hello'     2                       column1,column2
-9999       2           'hello'     1                       column1
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
110
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Используйте F.lit(colName), чтобы поместить имена столбцов в фрейм данных:

df_parsed_with_errorcount = df_parsed.withColumn(
    '_invalid_columns_list',
    F.concat_ws(
        ',',
        *[F.when(is_invalid_number(F.col(colName)) == 1, F.lit(colName)) for colName in df_parsed.columns]
    )
)

df_parsed_with_errorcount.show()
+-------+-------+-------+---------------------+
|column1|column2|column3|_invalid_columns_list|
+-------+-------+-------+---------------------+
|      1|      2|  hello|                     |
|  -9999|  -9999|  hello|      column1,column2|
|  -9999|      2|  hello|              column1|
+-------+-------+-------+---------------------+

спасибо, есть ли способ использовать F.when, что-то похожее на «обложку», которую я использовал для «суммы»? Мой реальный вариант использования будет иметь больше, чем целочисленные «пробные синтаксические анализы», мои фактические _num_invalid_columns, например, выглядят так: df_parsed_with_errorcount = df_parsed.withColumn('_num_invalid_columns', sum( is_invalid_date(F.col(colName)) if (colName in dateColumns. keys()) else is_invalid_number(F.col(colName)) if (colName в списке(set().union(decimal164Columns, decimal96Columns, decimal84Columns, integerColumns))) else 0 для colName в df_parsed.columns))

Randall Gonzalez 09.12.2020 22:07

Расширяя ввод от mck, мой окончательный код выглядит так:

Сначала модификация вспомогательной функции, чтобы сделать ее, так сказать, "безопасной для типов".

def is_invalid_number(col):
    ##return F.when(col == -9999, 1).otherwise(0) --> does not work, no type safety
    return F.when(col.cast(StringType()) == '-9999', 1).otherwise(0)

А затем фактическое вычисление столбца с использованием concat_ws() + array_contains() + моя вспомогательная функция

df_parsed_with_errorcount = df_parsed \
    .withColumn('_num_invalid_columns', sum(
    is_invalid_number(F.col(colName)) if (colName in integerColumns) 
    else 0
    for colName in df_parsed.columns)) \
    .withColumn('_invalid_columns_list', F.concat_ws(',', *[ \
    (F.when(F.array_contains(F.array([F.lit(x) for x in integerColumns]), colName), F.when(is_invalid_number(F.col(colName)) == 1, F.lit(colName))) \
    .otherwise(F.lit(None))
    ) for colName in df_parsed.columns]))

Из того, что я могу сказать из сообщений об ошибках, показанных перед тем, как сделать тип вспомогательной функции is_invalid_number безопасным; Spark не гарантирует порядок выполнения условий внутри функции when() даже после вложения одной функции when() в другую, в отличие от использования одной единственной функции when() с двумя условиями, разделенными символами & (и)

Другие вопросы по теме