У меня есть фрейм данных (скажем, ac_df), в котором 32 разных столбца. Я хочу получить конкретный столбец и разделить значения на кусок по 3 как одно новое значение и создать из него еще один df.
ac_df['payment_history_1']
дает результат ниже
Мне нужен новый df со структурой ниже.
Например: если я возьму первую строку «000000000000», она будет сгруппирована как
«000», «000», «000», «000»
и это создает первую строку нового df.
Эквивалентный код Python для выполнения этой задачи выглядит следующим образом:
temp1 = ac_df['payment_history_1'].str.split(r'(...)', expand=True)
В искре я попробовал что-то ниже:
temp1 = ac_df.select(ac_df['payment_history_1']).rdd.map(lambda each_row: str(each_row[0])).map(lambda y: y.split(r'(...)')).collect()
Выход:
[['000000000000'], ['000000000003000000000'], ['000000000003000000000000000']]
Однако я не могу двигаться вперед и добиться желаемого результата. Может кто подскажет?
Попробуйте это, вы сможете построить над этим:
df = spark.createDataFrame(
[
[1, '000000000000'],
[2, '000000000003000000000'],
[3, '000000000003000000000000000']
]
, ["id", "numbers"]
)
df.show()
Должно получиться что-то похожее на фрейм данных, с которого вы начинаете:
+---+--------------------+
| id| numbers|
+---+--------------------+
| 1| 000000000000|
| 2|00000000000300000...|
| 3|00000000000300000...|
+---+--------------------+
взяв столбец чисел, вы сможете проанализировать его в строку, разделенную ",", откуда мы можем применить: Posexplode (expr) - разделяет элементы массива expr на несколько строк с позициями или элементы map expr на несколько строк и столбцов с позициями.
from pyspark.sql.functions import posexplode
df.select(
"id",
f.split("numbers", ",").alias("numbers"),
f.posexplode(f.split("numbers", ",")).alias("pos", "val")
).show()
что должно привести к:
+---+--------------------+---+---+
| id| numbers|pos|val|
+---+--------------------+---+---+
| 1|[000, 000, 000, 000]| 0|000|
| 1|[000, 000, 000, 000]| 1|000|
| 1|[000, 000, 000, 000]| 2|000|
| 1|[000, 000, 000, 000]| 3|000|
| 2|[000, 000, 000, 0...| 0|000|
| 2|[000, 000, 000, 0...| 1|000|
| 2|[000, 000, 000, 0...| 2|000|
| 2|[000, 000, 000, 0...| 3|003|
| 2|[000, 000, 000, 0...| 4|000|
| 2|[000, 000, 000, 0...| 5|000|
| 2|[000, 000, 000, 0...| 6|000|
| 3|[000, 000, 000, 0...| 0|000|
| 3|[000, 000, 000, 0...| 1|000|
| 3|[000, 000, 000, 0...| 2|000|
| 3|[000, 000, 000, 0...| 3|003|
| 3|[000, 000, 000, 0...| 4|000|
| 3|[000, 000, 000, 0...| 5|000|
| 3|[000, 000, 000, 0...| 6|000|
| 3|[000, 000, 000, 0...| 7|000|
| 3|[000, 000, 000, 0...| 8|000|
+---+--------------------+---+---+
Затем мы используем: pyspark.sql.functions.expr, чтобы захватить элемент с индексом pos в этом массиве.
Первый - это имя нашего нового столбца, который будет объединением числа и индекса в массиве. Второй столбец будет значением соответствующего индекса в массиве. Мы получаем последнее, используя функциональность pyspark.sql.functions.expr, которая позволяет нам использовать значения столбцов в качестве параметров.
df.select(
"id",
f.split("numbers", ",").alias("numbers"),
f.posexplode(f.split("numbers", ",")).alias("pos", "val")
)\
.drop("val")\
.select(
"id",
f.concat(f.lit("numbers"),f.col("pos").cast("string")).alias("number"),
f.expr("numbers[pos]").alias("val")
)\
.show()
что приводит к:
+---+--------+---+
| id| number|val|
+---+--------+---+
| 1|numbers0|000|
| 1|numbers1|000|
| 1|numbers2|000|
| 1|numbers3|000|
| 2|numbers0|000|
| 2|numbers1|000|
| 2|numbers2|000|
| 2|numbers3|003|
| 2|numbers4|000|
| 2|numbers5|000|
| 2|numbers6|000|
| 3|numbers0|000|
| 3|numbers1|000|
| 3|numbers2|000|
| 3|numbers3|003|
| 3|numbers4|000|
| 3|numbers5|000|
| 3|numbers6|000|
| 3|numbers7|000|
| 3|numbers8|000|
+---+--------+---+
Наконец, мы можем просто сгруппировать по идентификатору и развернуть DataFrame.
df.select(
"id",
f.split("numbers", ",").alias("numbers"),
f.posexplode(f.split("numbers", ",")).alias("pos", "val")
)\
.drop("val")\
.select(
"id",
f.concat(f.lit("numbers"),f.col("pos").cast("string")).alias("number"),
f.expr("numbers[pos]").alias("val")
)\
.groupBy("id").pivot("number").agg(f.first("val"))\
.show()
давая окончательный фрейм данных:
взял детали из: Разделить строковый столбец Spark Dataframe на несколько столбцов
Вы можете использовать pyspark.sql.functions.split в столбце фрейма данных аналогично тому, как вы используете
str.split
в python.