Я предполагаю, что большинство фреймворков, таких как keras/tensorflow/..., автоматически используют все ядра ЦП, но на практике это не так. Я просто смог найти несколько источников, которые могут привести нас к использованию всей мощности ЦП в процессе глубокого обучения. Я нашел статья, в котором написано об использовании
from multiprocessing import Pool
import psutil
import ray
с другой стороны, на основе этого отвечать для использования модели keras в нескольких процессах нет отслеживания вышеупомянутых библиотек. Есть ли более элегантный способ использовать Многопроцессорность для Keras, поскольку он очень популярен для реализации.
Например, как можно изменить следующую простую реализацию RNN, чтобы достичь не менее 50% мощности ЦП в процессе обучения?
Должен ли я использовать 2-ю модель в качестве многозадачной, такой как LSTM, которую я комментирую ниже? Я имею в виду, можем ли мы одновременно запускать несколько моделей, используя больше ресурсов ЦП?
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop
df = pd.read_csv("D:\Train.csv", header=None)
index = [i for i in list(range(1440)) if i%3==2]
Y_train= df[index]
df = df.values
#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
dataX,dataY = [],[]
print("Len:",len(dataset)-look_back-1)
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train=np.array(Y_train)
df=np.array(df)
look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)
#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
#Shape of train and test data
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
print("train size: {}".format(trainX.shape))
print("train Label size: {}".format(trainY.shape))
print("test size: {}".format(testX.shape))
print("test Label size: {}".format(testY.shape))
#train size: (23, 10, 1440)
#train Label size: (23, 960)
#test size: (6, 10, 1440)
#test Label size: (6, 960)
model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation('tanh'))
# Compile model
model_RNN.compile(loss='mean_squared_error', optimizer='adam')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
#predict
Y_train=np.array(trainY)
Y_test=np.array(testX)
Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)
train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)
# create and fit the Simple LSTM model as 2nd model for multi-tasking
#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation('tanh'))
#model_LSTM.compile(loss='mean_squared_error', optimizer='adam')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
#Y_train=np.array(trainY)
#Y_test=np.array(testX)
#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)
#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)
#plot losses for RNN + LSTM
f, ax = plt.subplots(figsize=(20, 15))
plt.subplot(1, 2, 1)
ax=plt.plot(hist_RNN.history['loss'] ,label='Train loss')
ax=plt.plot(hist_RNN.history['val_loss'],label='Test/Validation/Prediction loss')
plt.xlabel('Training steps (Epochs = 50)')
plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
plt.title(' RNN Loss on Train and Test data')
plt.legend()
plt.subplot(1, 2, 2)
ax=plt.plot(hist_LSTM.history['loss'] ,label='Train loss')
ax=plt.plot(hist_LSTM.history['val_loss'],label='Test/Validation/Prediction loss')
plt.xlabel('Training steps (Epochs = 50)')
plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
plt.title('LSTM Loss on Train and Test data')
plt.legend()
plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
#plt.savefig('All_Losses_history_.png')
plt.show()
Примечание У меня нет доступа к CUDA только у меня есть доступ к мощному серверу без VGA. Моя цель - использовать преимущества многопроцессорности и многопоточности для использования максимальной мощности ЦП вместо 30%, что означает только одно ядро, в то время как у меня четырехъядерный процессор! Мы будем очень признательны за любые советы. Я загрузил отформатированный набор данных CSV.
Обновлять: моя аппаратная конфигурация следующая:
@BaptistePouthier спасибо за совет, но мне действительно нужен кто-то, кто сформулирует ответ для простого вышеупомянутого примера, чтобы я мог понять концепцию и развить ее для других моих сложных моделей NN.
@BaptistePouthier, у тебя есть идеи?
Можете ли вы опубликовать вывод «grep cores /proc/cpuinfo»? также версия TF и Keras
Keras по умолчанию использует все ядра процессора. Я протестировал ваш код на своем 8-ядерном сервере, и он дает 86-процентную производительность процессора. Я думаю, у вас может быть другая проблема, вы проверяли htop/top? и опубликуйте свои результаты на основе rosettacode.org/wiki/Linux_CPU_utilization
Я думаю, что одна вещь, которую вы можете сделать, это увеличить размер партии.
@SalihKaragoz Я проверил производительность процессора на своем настольном компьютере в лаборатории, конфигурация которого, насколько я помню, четырехъядерный процессор с 16 ГБ ОЗУ, но я проверю и обновлю его завтра, версия Keras 2.2.4
и TF 1.10.0
Производительность процессора была 30% мощности!! Я также мог бы получить доступ к мощному серверному компьютеру без VGA, но кажется, что Keras должен автоматически использовать как минимум 80% мощности ЦП, на практике для меня это 30% !! когда я увеличиваю batch_size
на своем настольном компьютере, он падает! Кто-нибудь порекомендуйте мне использовать multiprocessing
при использовании серверного компьютера!
@SalihKaragoz можно ли изменить код таким образом, чтобы я мог одновременно обучать несколько моделей? например, RNN и LSTM одновременно с использованием библиотек с многопроцессорной обработкой? Можно ли выделить 1 или более ядер в процессоре для обработки/обучения моделей? Каково ваше мнение?
@SalihKaragoz Я обновил конфигурацию своего оборудования. Я был бы рад, если бы у вас есть какие-либо идеи, чтобы получить правильные результаты, а также.
@Mario, на самом деле я не играл на уровнях процессора. возможно, проблема связана с адаптацией framework-OS. обычно Keras обрабатывает его по умолчанию. но idw в вашем случае удачи.
Что касается многопроцессорности, это также может быть вам интересно, хотя это связано только с вашим вопросом: stackoverflow.com/questions/56441216/…
Хорошо, что обучение одной модели не использует все 100% вашего процессора! Теперь у нас есть место для параллельного обучения нескольких моделей и ускорения общего времени обучения.
NB: если вы хотите просто ускорить эту модель, изучите графические процессоры или измените гиперпараметры, такие как размер пакета и количество нейронов (размер слоя).
Вот как вы можете использовать multiprocessing
для одновременного обучения нескольких моделей (используя процессы, работающие параллельно на каждом отдельном ядре ЦП вашей машины).
multiprocessing.Pool
в основном создает пул рабочих мест, которые необходимо выполнить. Процессы подберут эти задания и запустят их. Когда задание завершено, процесс выберет другое задание из пула.
import time
import signal
import multiprocessing
def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(layer_size):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
import keras
from keras.models import Sequential
from keras.layers import Dense
print(f'Training a model with layer size {layer_size}')
# build your model here
model_RNN = Sequential()
model_RNN.add(Dense(layer_size))
# fit the model (the bit that takes time!)
model_RNN.fit(...)
# lets demonstrate with a sleep timer
time.sleep(5)
# save trained model to a file
model_RNN.save(...)
# you can also return values eg. the eval score
return model_RNN.evaluate(...)
num_workers = 4
hyperparams = [800, 960, 1100]
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, hyperparams)
print(scores)
Выход:
Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]
Это легко продемонстрировать с помощью time.sleep
в коде. Вы увидите, что все 3 процесса начинают обучающую работу, а затем завершаются примерно в одно и то же время. Если бы это было однократно обработано, вам пришлось бы ждать завершения каждого из них, прежде чем начинать следующий (зевок!).
РЕДАКТИРОВАТЬ ОП также хотел полный код. Это сложно в Stack Overflow, потому что я не могу тестировать в вашей среде и с вашим кодом. Я позволил себе скопировать и вставить ваш код в свой шаблон выше. Возможно, вам придется добавить некоторые импорты, но это максимально близко к «работоспособному» и «полному» коду.
import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(model_type):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
from keras.layers import LSTM, SimpleRNN, Dense, Activation
from keras.models import Sequential
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.layers.normalization import BatchNormalization
print(f'Training a model: {model_type}')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
]
model = Sequential()
if model_type == 'rnn':
model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
elif model_type == 'lstm':
model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model.add(Dense(480))
model.add(BatchNormalization())
model.add(Activation('tanh'))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(
trainX,
trainY,
epochs=50,
batch_size=20,
validation_data=(testX, testY),
verbose=1,
callbacks=callbacks,
)
# predict
Y_Train_pred = model.predict(trainX)
Y_Test_pred = model.predict(testX)
train_MSE = mean_squared_error(trainY, Y_Train_pred)
test_MSE = mean_squared_error(testY, Y_Test_pred)
# you can also return values eg. the eval score
return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}
# Your code
# ---------
df = pd.read_csv("D:\Train.csv", header=None)
index = [i for i in list(range(1440)) if i % 3 == 2]
Y_train = df[index]
df = df.values
# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
dataX, dataY = [], []
print("Len:", len(dataset) - look_back - 1)
for i in range(len(dataset) - look_back - 1):
a = dataset[i : (i + look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train = np.array(Y_train)
df = np.array(df)
look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)
# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
trainX, trainY, test_size=0.2, shuffle=False
)
# My Code
# -------
num_workers = 2
model_types = ['rnn', 'lstm']
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, model_types)
print(scores)
Вывод программы:
[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866},
{'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]
Комментарии не для расширенного обсуждения; этот разговор был перешел в чат.
@GeorgeStocker, не мог бы ты закрыть чат? Готово! Я собираюсь принять ответ. Спасибо
Может быть, это это полезно для вашей проблемы?