Я запускаю функцию для нескольких процессов, которая принимает в качестве входных данных большие кадры данных pandas. При запуске процессов дикт копируется в каждый процесс, но насколько я понимаю, дикт содержит только ссылки на датафреймы, поэтому сами датафреймы в каждый процесс не копируются. Это правильно, или каждый процесс получает глубокую копию dict?
import numpy as np
from multiprocessing import Pool, Process, Manager
def process_dataframe(df_dict, task_queue, result_queue):
while True:
try:
test_value = task_queue.get(block=False)
except:
break
else:
result = {df_name: df[df==test_value].sum() for df_name, df in df_dict.items()}
result_queue.put(result)
if __name__ == '__main__':
manager = Manager()
task_queue = manager.Queue()
result_queue = manager.Queue()
df_dict = {'df_1': some_large_df1, 'df_2': some_large_df2, ...}
test_values = np.random.rand(1000)
for val in test_values:
task_queue.put(val)
with Pool(processes=4) as pool:
processes = []
for _ in range(4):
# Is df_dict copied shallow or deep to each process?
p = pool.Process(target=process_dataframe, args=(df_dict,task_queue,result_queue))
processes.append(p)
p.start()
for p in processes:
p.join()
results = [result_queue.get(block=False) for _ in range(result_queue.qsize())]
По этой причине вы также должны сделать свой поток dict
безопасным.
@Error-SyntacticalRemorse Я где-то читал, что аргументы маринуются перед отправкой в каждый процесс. Это только указатель, который тогда маринуется?
TLDR: Он действительно передает достаточно интересную копию. Но не в обычном режиме. Дочерний и родительский процессы совместно используют одну и ту же память, если только один из них не изменяет объект (в системах, которые реализуют копирование при записи [это есть как в Windows, так и в Linux]). В этом случае память выделяется для объекта, который был изменен.
Я твердо верю в то, что лучше увидеть что-то в действии, чем просто услышать, мол, давай займемся этим.
Для этого я взял пример кода multiprocessing
из Интернета. Пример кода подходит для ответа на этот вопрос, но он не соответствует коду из вашего вопроса.
Весь следующий код представляет собой один сценарий, но я собираюсь разбить его на части, чтобы объяснить каждую часть.
Сначала давайте создадим dictionary
. Я буду использовать это вместо DataFrame
, так как они действуют одинаково, но мне не нужно устанавливать пакет, чтобы использовать его.
Примечание: Синтаксис id()
, он возвращает уникальный идентификатор объекта.
# importing the multiprocessing module
import multiprocessing
import sys # So we can see the memory we are using
myDict = dict()
print("My dict ID is:", id(myDict))
myList = [0 for _ in range(10000)] # Just a massive list of all 0s
print('My list size in bytes:', sys.getsizeof(myList))
myDict[0] = myList
print("My dict size with the list:", sys.getsizeof(myDict))
print("My dict ID is still:", id(myDict))
print("But if I copied my dic it would be:", id(myDict.copy()))
Для меня это выведено:
My dict ID is: 139687265270016
My list size in bytes: 87624
My dict size with the list: 240
My dict ID is still: 139687265270016
But if I copied my dic it would be: 139687339197496
Круто, поэтому мы видим, что id
изменится, если мы скопируем объект, и мы увидим, что dictionary
просто содержит указатель на list
(таким образом, dict
значительно меньше по размеру памяти).
Теперь давайте посмотрим, копирует ли Process
словарь.
def method1(var):
print("method1 dict id is:", str(id(var)))
def method2(var):
print("method2 dict id is:", str(id(var)))
if __name__ == "__main__":
# creating processes
p1 = multiprocessing.Process(target=method2, args=(myDict, ))
p2 = multiprocessing.Process(target=method1, args=(myDict, ))
# starting process 1
p1.start()
# starting process 2
p2.start()
# wait until process 1 is finished
p1.join()
# wait until process 2 is finished
p2.join()
# both processes finished
print("Done!")
Здесь я передаю myDict
как arg
обеим моим функциям подпроцесса.
Вот что я получаю на выходе:
method2 dict id is: 139687265270016
method1 dict id is: 139687265270016
Done!
Примечание:id
такой же, как когда мы определяли словарь ранее в коде.
Если id
никогда не меняется, значит, мы используем один и тот же объект во всех случаях. Так что теоретически, если я внесу изменения в Process
, это должно изменить основной объект. Но это происходит не так, как мы ожидаем.
Например: Давайте изменим наш method1
.
def method1(var):
print("method1 dict id is:", str(id(var)))
var[0][0] = 1
print("The first five elements of the list in the dict are:", var[0][:5])
А ТАКЖЕ
добавьте пару print
s после нашего p2.join()
:
p2.join()
print("The first five elements of the list in the dict are:", myDict[0][:5])
print("The first five elements of the list are:", myList[:5])
My dict ID is: 140077406931128
My list size in bytes: 87624
My dict size with the list: 240
My dict ID is still: 140077406931128
But if I copied my dic it would be: 140077455160376
method1 dict id is: 140077406931128
The first five elements of the list in the dict are: [1, 0, 0, 0, 0]
method2 dict id is: 140077406931128
The first five elements of the list in the dict are: [0, 0, 0, 0, 0]
The first five elements of the list are: [0, 0, 0, 0, 0]
Done!
Что ж, интересно... id
одинаковые, и я могу изменить объект в функции, но dict
не меняется в основном процессе...
Ну, после некоторого дальнейшего изучения я нашел этот вопрос/ответ: https://stackoverflow.com/a/14750086/8150685
При создании дочернего процесса дочерний процесс наследует копию родительского процесса (включая копию id
s!); однако (при условии, что используемая ОС поддерживает COW (копирование при записи), ребенок и родители будут использовать одну и ту же память, если только ребенок или родитель не внесет изменения в данные, и в этом случае память будет выделена только для переменная, которую вы изменили (в вашем случае она сделает копию DataFrame
, которую вы изменили).
Извините за длинный пост, но я подумал, что рабочий процесс будет хорош.
Надеюсь, это помогло. Не забудьте проголосовать за вопрос и ответить на https://stackoverflow.com/a/14750086/8150685, если он вам помог.
@connesy не забудьте отметить это как ответ, если он решил ваш вопрос.
Большое спасибо за обстоятельный ответ, это было поучительно! Большое спасибо за обстоятельный ответ, это было поучительно! У меня есть пара дополнительных вопросов: 1. Если я правильно понял ваш пример, id
из dict
копируется в дочерний процесс, то есть id
из dict
в дочернем процессе будет таким же, как id
в дочернем процессе. родительский процесс, даже если Я вношу изменения в dict
в одном из процессов? 2: Согласно [этому комментарию] (bit.ly/2U8U5IX) в вопросе, на который вы ссылаетесь, DataFrame
могут быть скопированы при их чтении?
Только если вы измените данные. Чтение не считается модификацией. Другими словами, если все, что вы делаете, это читаете фрейм данных, то дополнительная память использоваться не будет; однако, если вы внесете изменения, весь фрейм данных будет скопирован в фоновом режиме. Однако идентификатор фрейма данных по-прежнему не будет изменен. (ID является локальным для этого процесса, поэтому в ходе моего тестирования он не изменился, даже если я изменил объект в отдельном процессе).
Где вы его копируете?
df_dict
никогда не копируется. Я вижу, что он передается как аргументpool.Process
... Поскольку он передается как аргумент, весьdict
передается как указатель. Таким образом, даже поверхностная копия не возникает.