Работа с Python 3.7.9, Spark 2.4.5 Я пытаюсь вручную «попробовать проанализировать» заданное подмножество столбцов из строки в целое число, а затем добавить два дополнительных столбца в фреймворк данных:
Мне удалось рассчитать "_num_invalid_columns", но у меня возникли проблемы с "_invalid_columns_list". Код для воспроизведения ниже, я уменьшил его насколько это возможно.
'''
Uncomment these 2 lines if using Jupyter Notebook
import findspark
findspark.init()
'''
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as F
def tryparse_integer(integer_str):
'''
if integer_str is None returns None as IntegerType()
if integer_str is not None and cannot be parsed to integer, returns -9999 as IntegerType()
if integer_str is not None and can be parsed to integer, returns integer_str as IntegerType()
(that way we can tell between null data and invalid data)
'''
return F.when(F.isnull(integer_str), F.lit(None).cast(IntegerType())) \
.otherwise( \
F.when(F.isnull(integer_str.cast(IntegerType())), F.lit(-9999).cast(IntegerType())) \
.otherwise(integer_str.cast(IntegerType())) \
)
def is_invalid_number(col):
return F.when(col == -9999, 1).otherwise(0)
spark = SparkSession.builder.appName("RandallTest").getOrCreate()
data = [('1', '2','hello'), ('error','error','hello'), ('error','2','hello')]
schema = StructType([
StructField('column1', StringType()),
StructField('column2', StringType()),
StructField('column3', StringType())
])
df = spark.createDataFrame(data, schema = schema)
df.printSchema()
integerColumns = ['column1','column2']
df_parsed = df.select(*[
tryparse_integer(F.col(colName)).alias(colName) if (colName in integerColumns)
else colName
for colName in df.columns])
df_parsed.printSchema()
df_parsed_with_errorcount = df_parsed \
.withColumn('_num_invalid_columns', sum(
is_invalid_number(F.col(colName)) if (colName in integerColumns)
else 0
for colName in df_parsed.columns)) \
.withColumn('_invalid_columns_list', F.lit('--'.join(filter(None, (
##Not what I need, but works:
colName if (colName in integerColumns)
##Not working if I uncomment the actual logic I want. Something like any of these lines (59, 60 or 61 all produce errors)
##colName if (colName in integerColumns and F.col(colName) == -9999)
##colName if (colName in integerColumns & F.col(colName) == -9999)
##colName if (colName in integerColumns & is_invalid_number(F.col(colName)) == 1)
else None
for colName in df_parsed.columns)))))
df_parsed_with_errorcount.show()
df_parsed_with_errorcount.take(10)
Пример ввода:
column1 column2 column3
'1' '2' 'hello'
'error' 'error' 'hello'
'error' '2' 'hello'
Столбцы для "попробовать проанализировать": столбец1, столбец2
Ожидаемый результат:
column1 column2 column3 _num_invalid_columns _invalid_columns_list
1 2 'hello' 0
-9999 -9999 'hello' 2 column1,column2
-9999 2 'hello' 1 column1
Используйте F.lit(colName)
, чтобы поместить имена столбцов в фрейм данных:
df_parsed_with_errorcount = df_parsed.withColumn(
'_invalid_columns_list',
F.concat_ws(
',',
*[F.when(is_invalid_number(F.col(colName)) == 1, F.lit(colName)) for colName in df_parsed.columns]
)
)
df_parsed_with_errorcount.show()
+-------+-------+-------+---------------------+
|column1|column2|column3|_invalid_columns_list|
+-------+-------+-------+---------------------+
| 1| 2| hello| |
| -9999| -9999| hello| column1,column2|
| -9999| 2| hello| column1|
+-------+-------+-------+---------------------+
Расширяя ввод от mck, мой окончательный код выглядит так:
Сначала модификация вспомогательной функции, чтобы сделать ее, так сказать, "безопасной для типов".
def is_invalid_number(col):
##return F.when(col == -9999, 1).otherwise(0) --> does not work, no type safety
return F.when(col.cast(StringType()) == '-9999', 1).otherwise(0)
А затем фактическое вычисление столбца с использованием concat_ws() + array_contains() + моя вспомогательная функция
df_parsed_with_errorcount = df_parsed \
.withColumn('_num_invalid_columns', sum(
is_invalid_number(F.col(colName)) if (colName in integerColumns)
else 0
for colName in df_parsed.columns)) \
.withColumn('_invalid_columns_list', F.concat_ws(',', *[ \
(F.when(F.array_contains(F.array([F.lit(x) for x in integerColumns]), colName), F.when(is_invalid_number(F.col(colName)) == 1, F.lit(colName))) \
.otherwise(F.lit(None))
) for colName in df_parsed.columns]))
Из того, что я могу сказать из сообщений об ошибках, показанных перед тем, как сделать тип вспомогательной функции is_invalid_number безопасным; Spark не гарантирует порядок выполнения условий внутри функции when() даже после вложения одной функции when() в другую, в отличие от использования одной единственной функции when() с двумя условиями, разделенными символами & (и)
спасибо, есть ли способ использовать F.when, что-то похожее на «обложку», которую я использовал для «суммы»? Мой реальный вариант использования будет иметь больше, чем целочисленные «пробные синтаксические анализы», мои фактические _num_invalid_columns, например, выглядят так: df_parsed_with_errorcount = df_parsed.withColumn('_num_invalid_columns', sum( is_invalid_date(F.col(colName)) if (colName in dateColumns. keys()) else is_invalid_number(F.col(colName)) if (colName в списке(set().union(decimal164Columns, decimal96Columns, decimal84Columns, integerColumns))) else 0 для colName в df_parsed.columns))