Я хотел бы преобразовать линейный список в фрейм данных. т. е. учитывая следующий список,
a = ["a1", "a2", "a3", b1", "b2", "b3", "c1", "c2", "c3"]
Ожидаемый результат,
+--------------------+
| col1 | col2 | col3 |
+--------------------+
| a1 | a2 | a3 |
| b1 | b2 | b3 |
| c1 | c2 | c3 |
+--------------------+
Я попробовал следующее, но получил ошибку.
from pyspark.sql.types import *
a = ["a1", "a2", "a3", "b1", "b2", "b3", "c1", "c2", "c3"]
rdd = sc.parallelize(a)
schema = StructType([
StructField("a", StringType(), True),
StructField("b", StringType(), True),
StructField("c", StringType(), True)
])
df = sqlContext.createDataFrame(rdd, schema)
df.show()
Последний оператор show() получает ошибку «Задание прервано из-за сбоя этапа». Пожалуйста, кто-нибудь скажите мне решение? Спасибо.
Список дан, поэтому переписать его нельзя. Ищите что-то похожее на «изменить форму» в NumPy.
Вот способ, который, надеюсь, соответствует вашим критериям
# First get a 1 column DF
df = sql.createDataFrame(sc.parallelize(a).map(lambda x: [x]), schema=['col'])
# split each value into a number and letter e.g. 'a1' --> ['a','1'])
df = df.withColumn('letter', f.split('col', '').getItem(0))
df = df.withColumn('number', f.split('col', '').getItem(1))
# Now pivot to get what you want (dropping extraneous columns and ordering
# to get exact output
output = (df.groupBy('letter')
.pivot('number')
.agg(f.first('col'))
.select([f.col(column).alias('col%s'%(column)) for column in ['1','2','3']])
.orderBy('col1')
.drop('letter'))
Я вполне уверен, что a1
, a2
, ... и т. д. были примерами и не должны восприниматься как буквальные значения.
на самом деле я думаю, что это немного двусмысленно, я интерпретировал это как последовательность из 4 типов (a, b и c) с показанными только 3 лучшими примерами каждого (т.е. подмножество гораздо большего списка a, b и c с соответствующим индексом числа). Однако на самом деле ни один из нас не может сказать, что правильно.
Согласитесь, двусмысленно. Я исхожу из комментария, что OP ищет что-то похожее на изменение формы numpy.
Спасибо ages29 и pault, извините за двусмысленность. Я подтвердил, что решение pault было тем, что я ожидал. Спасибо еще раз!
Основываясь на вашем комментарий, я предполагаю, что вы начинаете с rdd
, а не со списка.
Я также предполагаю, что вы определяете порядок на основе индекса rdd
. Если эти предположения верны, вы можете использовать zipWithIndex()
, чтобы добавить номер строки к каждой записи.
Затем разделите номер строки на 3 (используйте целочисленное деление), чтобы сгруппировать каждые 3 последовательные записи. Затем используйте groupByKey()
, чтобы объединить записи с одним и тем же key
в кортеж.
Наконец, опустите ключ и позвоните toDF()
rdd.zipWithIndex()\
.map(lambda row: (row[1]//3, row[0]))\
.groupByKey()\
.map(lambda row: tuple(row[1]))\
.toDF(["a", "b", "c"])\
.show()
#+---+---+---+
#| a| b| c|
#+---+---+---+
#| a1| a2| a3|
#| c1| c2| c3|
#| b1| b2| b3|
#+---+---+---+
Вам нужно преобразовать
a
в форму:[('a1', 'a2', 'a3'), ('b1', 'b2', 'b3'), ('c1', 'c2', 'c3')]