Чтение бинарного файла в PySpark

У меня есть двоичный файл, который я могу прочитать с помощью numpy и pandas, используя:

dt = numpy.dtype([('col1', numpy.int64), ('col2', numpy.float32), ('col3', numpy.int32)])

df = pandas.from_array(
    numpy.fromfile(file, dtype=dt),
    columns=data.dtype.names)
)

Вместо этого я хочу использовать PySpark без предварительного создания кадра данных pandas, поскольку файл может быть больше, чем память.

Я видел, что один из рекомендуемых способов:

df = spark.read.format("binaryFile").load(file)
df.printSchema()
df.show()

Но это не позволяет мне указывать типы каждого столбца. Кроме того, даже с этим тестовым файлом я получил java.lang.OutOfMemoryError.

Итак, теперь я пытаюсь загрузить его в RDD:

rdd = spark.sparkContext.binaryFiles(file)

А затем примените карту, как предложено здесь со Scala:

import java.nio.ByteBuffer

val result = YourRDD.map(x=>(ByteBuffer.wrap(x.take(4)).getInt,
             ByteBuffer.wrap(x.drop(4).take(2)).getShort,
             ByteBuffer.wrap(x.drop(6)).getLong))

Но у меня проблемы с его работой. Например, когда я пытаюсь rdd.first(), я получаю файл целиком. Вот что я пробовал:

rdd = spark.sparkContext.binaryFiles(file)

def func1(x):
    
    dt = numpy.dtype([('time', numpy.int64), ('e', numpy.float32), ('id', numpy.int32)])

    df = pandas.DataFrame(
        numpy.frombuffer(x, dtype=dt),
        columns=dt.names
    )

    return (df.col1,df.col2,df.col3)

result = rdd.mapValues(lambda x: func1(x))

result.first()

Но это дает мне одну запись с полным столбцом:

('file',
 (0          2317613314222
                 ...      
  4026940    7317606063913
  Name: col1, Length: 4026941, dtype: int64,
  0          1.551823
               ...   
  4026940    2.379845
  Name: col2, Length: 4026941, dtype: float32,
  0             556
              ...  
  4026940    131336
  Name: col3, Length: 4026941, dtype: int32))

Как я могу загрузить этот файл?

Обновлено: Небольшой отрывок из файла:

with open(file, mode = "rb") as open_file:
    contents = open_file.readlines()
    
contents[0:5]

Результат:

[b'\xae\xb0\x84\x9c\x1b\x02\x00\x00 \xa2\xc6?,\x02\x00\x00\x0cB\x95\x9c\x1b\x02\x00\x00\xe0a\x9a?\x19\x02\x02\x00\x0f\xf7\xa4\x9c\x1b\x02\x00\x00`\xe9\x82?0\x03\x02\x00\x96@\x03\x9d\x1b\x02\x00\x00\xd0H\x05@;\x02\x02\x00\xd5\n',
 b'\n',
 b'\x9d\x1b\x02\x00\x00\x00^\xa1?\x0f\x01\x00\x00nq,\x9d\x1b\x02\x00\x00\xe0\x89\xad?\xae\x03\x02\x00F\x8e\xb6\x9d\x1b\x02\x00\x00@U\xd1?<\x03\x02\x00\xc3_\xfa\x9d\x1b\x02\x00\x00@}\x87?)\x02\x02\x00\xac\x92K\x9e\x1b\x02\x00\x00P/\x1f@\n',
 b"\x02\x04\x00\x07Q\x9a\x9e\x1b\x02\x00\x00PI\x04@,\x01\x02\x00\x04-\xb2\x9e\x1b\x02\x00\x00\x80\xdc\xf0?\x1d\x00\x04\x00\x0cw\xbd\x9e\x1b\x02\x00\x00\xa0-\xef?\x0c\x02\x02\x00\xb0\x86\xcf\x9e\x1b\x02\x00\x00 \xc2\xb4?,\x02\x00\x00\x12\x03\x1e\x9f\x1b\x02\x00\x00\x80\xb6\x85?)\x02\x02\x00E\xc9w\x9f\x1b\x02\x00\x000\xf3\x03@\x13\x00\x04\x00P\x1b\x91\x9f\x1b\x02\x00\x00\x00\xea\x06@%\x00\x00\x00\x9b:\x9c\x9f\x1b\x02\x00\x00\xe0T\x0b@\x06\x03\x00\x00\x9b\x9f\xa4\x9f\x1b\x02\x00\x00\xc0/\xf4?\x06\x03\x00\x00Z\xcb\xb8\x9f\x1b\x02\x00\x00\x00A\xe1?!\x02\x02\x00\xbcJ\xbd\x9f\x1b\x02\x00\x00\xe0\xc9\xd2?!\x02\x04\x00\x06]\xd0\x9f\x1b\x02\x00\x00`D\xda?\x1d\x00\x04\x00hB\xde\x9f\x1b\x02\x00\x00\xe0\x10\xff?\x1d\x01\x02\x00\x9fi0\xa0\x1b\x02\x00\x00\xa0f\xec?\x86\x03\x00\x00\xf5Ws\xa0\x1b\x02\x00\x00 \xd5\xca?\x1d\x00\x04\x00L\xa0\x8d\xa0\x1b\x02\x00\x00\xc0#\xda?\x1d\x00\x04\x00|<,\xa1\x1b\x02\x00\x00 \xbc\xd1?\x1d\x00\x04\x00\x8b\xfb2\xa1\x1b\x02\x00\x00\xa0\xbf\xcb?\x08\x02\x02\x00d\xd2X\xa1\x1b\x02\x00\x00 \xc6\xb4?5\x00\x04\x00\xae\x1fc\xa1\x1b\x02\x00\x00@\x07\x90?1\x03\x02\x00\xf3\x80g\xa1\x1b\x02\x00\x00`\xbd\xde?4\x00\x04\x00g\x1dm\xa1\x1b\x02\x00\x00@7\x96?\x98\x03\x00\x00\xb8@|\xa1\x1b\x02\x00\x00PK\x11@\x06\x03\x00\x00\xedj\x83\xa1\x1b\x02\x00\x00\xc0\x11\xdd?,\x02\x00\x00\xb1\xbd\x8e\xa1\x1b\x02\x00\x00\xa0\xc7\xc5?\r\x02\x02\x00\xbd\x0f\xba\xa1\x1b\x02\x00\x00 \xe3\xc1?\x1f\x01\x02\x00\xf5\xa6\xf5\xa1\x1b\x02\x00\x00\x80\xdf\xcd?\x06\x01\x00\x00'\xb5 \xa2\x1b\x02\x00\x00\x00\x02\xb6?\x1d\x00\x04\x00\xfas/\xa2\x1b\x02\x00\x00\xc0\xb2\xbb?\x98\x03\x02\x00=\xaan\xa2\x1b\x02\x00\x00`\xaf\xe8?\x08\x02\x02\x00\xa2\x83\x8f\xa2\x1b\x02\x00\x00\x00\x02\xcd?\x1d\x00\x04\x00\xb2\xce\xcb\xa2\x1b\x02\x00\x00`\x9e\xc1?\x1a\x03\x00\x00\x95\x9f\xef\xa2\x1b\x02\x00\x00\xe0\xa4\x8c?)\x02\x02\x005\x86\xfa\xa2\x1b\x02\x00\x00 \x86\xc8?\x98\x03\x02\x00bH\x12\xa3\x1b\x02\x00\x00\xf0\x1b\x1e@\x8c\x02\x02\x00\xa6\xfa\x1b\xa3\x1b\x02\x00\x00\xe0\n",
 b'\xaf?\x1d\x00\x04\x00\xfb\xcd3\xa3\x1b\x02\x00\x00`\xd2\xde?\x84\x03\x00\x00\x81\xcaQ\xa3\x1b\x02\x00\x00 \xc0\xdb?8\x01\x02\x00t\x01\x9a\xa3\x1b\x02\x00\x00\x803\xf8?#\x01\x02\x00@\xdb\xa8\xa3\x1b\x02\x00\x00@k\x02@\x84\x03\x00\x00r\x8e&\xa4\x1b\x02\x00\x00\xe0\x96\xc8?\x1d\x01\x02\x00\xa3\x05?\xa4\x1b\x02\x00\x00\xa0v\xd2?\x1d\x00\x04\x00E\xc3\x8c\xa4\x1b\x02\x00\x000#\x0c@\x02\x01\x02\x00\xf3n\x9f\xa4\x1b\x02\x00\x00\xf0\x06\x13@8\x01\x02\x00\xac<\xc5\xa4\x1b\x02\x00\x00\x80A\x9f?\x8a\x03\x00\x00}\xfc\xe5\xa4\x1b\x02\x00\x00\x80\x00\xc4?\x19\x02\x02\x00\x126\xff\xa4\x1b\x02\x00\x00 \x1d\xac?\n']

И head(5) кадра данных:

            col1      col2    col3
0  2317613314222  1.551823     556
1  2317614400012  1.206112  131609
2  2317615429391  1.022747  131888
3  2317621608598  2.082569  131643
4  2317622053589  1.260681     271

Можете ли вы предоставить небольшой образец сайта, чтобы помочь другим пользователям протестировать свои решения? С фиктивными данными, но в том же формате, если необходимо

Caridorc 04.04.2023 21:14

Что-то вроде этого? b'\xae\xb0\x84\x9c\x1b\x02\x00\x00 \xa2\xc6?,\x02\x00\x00\x0cB\x95\x9c\x1b\x02\x00\x00\xe0a\x9a‌​?\x19\x02\x02\x00\x0‌​f\xf7\xa4\x9c\x1b\x0‌​2\x00\x00`\xe9\x82?0‌​\x03\x02\x00\x96@\x0‌​3\x9d\x1b\x02\x00\x0‌​0\xd0H\x05@;\x02\x02‌​\x00\xd5\n\n\x9d\x1b‌​\x02\x00\x00\x00^\xa‌​1?\x0f\x01\x00\x00nq‌​'

BBG 04.04.2023 21:20

Пожалуйста, добавьте файл в тело вашего вопроса и используйте кнопку {} в редакторе, чтобы отформатировать его правильно, также убедитесь, что усеченный файл все еще читается вашей программой (даже если в неправильном формате)

Caridorc 04.04.2023 21:20

Я не могу предоставить весь файл. Я добавлю несколько строк, уже прочитанных как байты, к вопросу, если это нормально.

BBG 04.04.2023 21:22

конечно, делитесь только тем, что вам удобно. Это все равно поможет пользователям сайта помочь вам

Caridorc 04.04.2023 21:23

Хорошо, я добавил более полный пример

BBG 04.04.2023 21:32
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
6
134
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Одна проблема заключалась в том, что spark.sparkContext.binaryFiles() предназначен для нескольких файлов. В этом случае следует использовать spark.sparkContext.binaryRecord().

Для моего случая:

rdd = spark.sparkContext.binaryRecords(file,16)

def func1(x):

    dt = numpy.dtype([('col1', numpy.int64), ('col2', numpy.float32), ('col3', numpy.int32)])

    df = pandas.DataFrame(
        numpy.frombuffer(x, dtype=dt),
        columns=dt.names
    )

    return df.to_dict("index")[0]

result = rdd.map(func1)

columns = types.StructType([
         types.StructField('col1', types.LongType(), True),
         types.StructField('col2', types.FloatType(), True),
         types.StructField('col3', types.IntegerType(), True)
         ])

df = result.toDF(schema=columns)

Модуль struct также можно использовать вместо numpy.

Другие вопросы по теме