Преобразование RDD в Dataframe в FPGrowth от Pyspark

У меня возникла ошибка, когда я сделал DataFrame из RDD.

from pyspark.ml.fpm import FPGrowth

sogou = sc.textFile("SogouQ.sample.utf8", use_unicode = False)

def parse(line):
    value = [ x for x in line.split(",") if x]
    return list(set(value))

rdd = sogou.map(parse)
df = sogou.toDF('items')

Я получаю следующую ошибку:

pyspark.sql.utils.ParseException: u"\nmismatched input '' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'FIRST', 'AFTER', 'LAST', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'DIRECTORY', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'COST', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IGNORE', 'BOTH', 'LEADING', 'TRAILING', 'IF', 'POSITION', 'DIV', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', 'GLOBAL', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 5)\n\n== SQL ==\nitems\n-----^^^\n"

Текст содержит Chinese. Это имеет значение? Текст такой:

360,安全卫士,
123,123,范冰冰,

Когда я использую pyspark.mllib.fpgrowth, rdd работает нормально. Как я могу преобразовать его в фрейм данных?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
0
479
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Здесь есть две разные проблемы:

  • toDF звонок. RDD.toDF имеет следующую подпись:

    Signature: rdd.toDF(schema=None, sampleRatio=None)
    

    где должен быть schema

    param schema: pyspark.sql.types.StructType or list of names of columns

    Итак, в вашем случае это должно быть:

    sogou.toDF(["items"])
    
  • parse метод:

    Метод createDataFrame, вызываемый df, ожидает RDD[tuple] или эквивалент, который может быть сопоставлен с structs, если не предоставлена ​​схема. Если вы хотите использовать только имя, он должен вернуть tuple

    def parse(line):
        value = [ x for x in line.split(",") if x]
        return list(set(value)),  
    

Комбинированный:

>>> def parse(line):
...     value = [ x for x in line.split(",") if x]
...     return list(set(value)),  
... 
... 
>>> rdd = sc.parallelize(["360,安全卫士,", "123,123,范冰冰,"])
>>> rdd.map(parse).toDF(["items"]).show()
+--------------+
|         items|
+--------------+
|   [安全卫士, 360]|
|[123,123,范冰冰,]|
+--------------+

Альтернативой (сохраняя текущую реализацию синтаксического анализа) было бы

>>> from pyspark.sql.types import ArrayType, StringType
>>> def parse(line):
...     value = [ x for x in line.split(",") if x]
...     return list(set(value))
    >>> rdd.map(parse).toDF(ArrayType(StringType())).toDF("items").show()
+--------------+     
|         items|
+--------------+
|   [安全卫士, 360]|
|[123,123,范冰冰,]|
+--------------+

Другие вопросы по теме