Как разбить большой файл (5 ГБ) на маленькие файлы в соответствии с его содержимым?

У меня 100 больших файлов, каждый размером около 5 ГБ. Мне нужно разбить их на файлы в зависимости от их содержимого. В больших файлах много строк, каждая строка выглядит так

{"task_op_id": 143677789, "task_op_time": 1530927931, "task_op_tag": 1, "create_time": 1530923701, "status": 2}

и мне нужно разделить контент на основе task_op_id, каждый большой файл имеет 350 разных task_op_id, поэтому каждый должен сгенерировать 350 разных маленьких файлов, каждый из которых имеет одинаковое содержимое task_op_id.

Мой проверенный метод:

def split_to_id_file(original_file):
    destination_file = 'processed_data2/data_over_one_id/break_into_ids/'
    with open(original_file) as f1:
        for line in f1:
            data_dict = json.loads(line)
            task_op_id = data_dict['task_op_id']
            with open(destination_file+str(task_op_id), 'a+') as f2:
                json.dump(data_dict, f2, ensure_ascii=False)
                f2.write('\n')
# multiprocessing with pool
def multiprocessing_pool(workers_number, job, files_list):
    p = Pool(workers_number)
    p.map(job, files_list)


def main():
    input_path = 'processed_data2/data_over_one_id'
    files_list = [join(input_path, f) for f in listdir(input_path)
              if isfile(join(input_path, f))
              and join(input_path, f).split('/')[-1].startswith('uegaudit')]
    multiprocessing_pool(80, split_to_id_file, files_list)


if __name__ == '__main__':
    main()

Но скорость слишком мала, для обработки 10 ГБ данных требуется 2 часа.

Так есть ли лучший способ обработки данных?

Большое спасибо за помощь.

По крайней мере, если не переформатировать строку из JSON, это, вероятно, ускорит процесс. Просто запишите line в новый файл.

Sami Kuhmonen 18.12.2018 13:48

Вы можете попробовать использовать библиотеку Pandas, чтобы читать фрагменты данных, а затем организовывать их. Кстати, если ваш код уже работает, и вы хотите его улучшить, правильное место - Проверка кода (codereview.stackexchange.com). Задача Stack Overflow - исправить неработающий код :)

Pedro Martins de Souza 18.12.2018 13:50

Каждую строку вы открываете и закрываете целевой файл. Пишите партиями, это ускорит процесс.

Aaron_ab 18.12.2018 13:52
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
3
70
2

Ответы 2

Я предполагаю, что основной процесс, требующий времени, - это операции ввода-вывода файлов. Можете ли вы разбить время работы и проверить это?

Другой причиной может быть парсер JSON. Загляните в ветку это для получения дополнительной информации.

Вы можете отсортировать эти файлы? Если да, попробуйте не разбирать каждую строку как JSON, а только с новым идентификатором.

Что-то вроде этого?

def get_id(json_line): 
  data_dict = json.loads(json_line)
  return data_dict['task_op_id']

def split_to_id_file(original_file):
  current_id = 'blabla_xxxxxxxx'
  destination_file = 'processed_data2/data_over_one_id/break_into_ids/'
  with open(original_file) as f1:
    for line in f1:
        if current_id not in line:
          if not f2.closed:
            f2.close()
          task_op_id = get_id(line)
          current_id = "\"task_op_id\": " + task_op_id
          f2 = open(destination_file+str(task_op_id), 'a+')
        f2.write(line+'\n')

Другие вопросы по теме