Как можно использовать преимущества многопроцессорности и многопоточности в глубоком обучении с помощью Keras?

Я предполагаю, что большинство фреймворков, таких как keras/tensorflow/..., автоматически используют все ядра ЦП, но на практике это не так. Я просто смог найти несколько источников, которые могут привести нас к использованию всей мощности ЦП в процессе глубокого обучения. Я нашел статья, в котором написано об использовании

from multiprocessing import Pool 
import psutil
import ray 

с другой стороны, на основе этого отвечать для использования модели keras в нескольких процессах нет отслеживания вышеупомянутых библиотек. Есть ли более элегантный способ использовать Многопроцессорность для Keras, поскольку он очень популярен для реализации.

  • Например, как можно изменить следующую простую реализацию RNN, чтобы достичь не менее 50% мощности ЦП в процессе обучения?

  • Должен ли я использовать 2-ю модель в качестве многозадачной, такой как LSTM, которую я комментирую ниже? Я имею в виду, можем ли мы одновременно запускать несколько моделей, используя больше ресурсов ЦП?

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i%3==2]

Y_train= df[index]
df = df.values

#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
    dataX,dataY = [],[]
    print("Len:",len(dataset)-look_back-1)
    for i in range(len(dataset)-look_back-1):
        a = dataset[i:(i+look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back,  :])
    return np.array(dataX), np.array(dataY)

Y_train=np.array(Y_train)
df=np.array(df)

look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)

#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)

#Shape of train and test data
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
print("train size: {}".format(trainX.shape))
print("train Label size: {}".format(trainY.shape))
print("test size: {}".format(testX.shape))
print("test Label size: {}".format(testY.shape))
#train size: (23, 10, 1440)
#train Label size: (23, 960)
#test size: (6, 10, 1440)
#test Label size: (6, 960)


model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation('tanh'))
# Compile model
model_RNN.compile(loss='mean_squared_error', optimizer='adam')
callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)


#predict

Y_train=np.array(trainY)
Y_test=np.array(testX)

Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)

train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)

# create and fit the Simple LSTM model as 2nd model for multi-tasking

#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation('tanh'))
#model_LSTM.compile(loss='mean_squared_error', optimizer='adam')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)

#Y_train=np.array(trainY)
#Y_test=np.array(testX)

#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)

#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)

#plot losses for RNN + LSTM
f, ax = plt.subplots(figsize=(20, 15))
    plt.subplot(1, 2, 1)
    ax=plt.plot(hist_RNN.history['loss']    ,label='Train loss')
    ax=plt.plot(hist_RNN.history['val_loss'],label='Test/Validation/Prediction loss')
    plt.xlabel('Training steps (Epochs = 50)')
    plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
    plt.title(' RNN Loss on Train and Test data')
    plt.legend()
    plt.subplot(1, 2, 2)
    ax=plt.plot(hist_LSTM.history['loss']    ,label='Train loss')
    ax=plt.plot(hist_LSTM.history['val_loss'],label='Test/Validation/Prediction loss')
    plt.xlabel('Training steps (Epochs = 50)')
    plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
    plt.title('LSTM Loss on Train and Test data')
    plt.legend()

    plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
    #plt.savefig('All_Losses_history_.png')
    plt.show()

Примечание У меня нет доступа к CUDA только у меня есть доступ к мощному серверу без VGA. Моя цель - использовать преимущества многопроцессорности и многопоточности для использования максимальной мощности ЦП вместо 30%, что означает только одно ядро, в то время как у меня четырехъядерный процессор! Мы будем очень признательны за любые советы. Я загрузил отформатированный набор данных CSV.

Обновлять: моя аппаратная конфигурация следующая:

  • Процессор: AMD A8-7650K Radeon R7, 10 вычислительных ядер, 4 ядра + 6 ГБ, 3,30 ГГц.
  • Оперативная память: 16 ГБ
  • ОС: Вин 7
  • Python версии 3.6.6
  • Тензорный поток версии 1.8.0
  • Керас версия 2.2.4

Может быть, это это полезно для вашей проблемы?

Baptiste Pouthier 28.05.2019 16:41

@BaptistePouthier спасибо за совет, но мне действительно нужен кто-то, кто сформулирует ответ для простого вышеупомянутого примера, чтобы я мог понять концепцию и развить ее для других моих сложных моделей NN.

Mario 28.05.2019 16:53

@BaptistePouthier, у тебя есть идеи?

Mario 31.05.2019 15:10

Можете ли вы опубликовать вывод «grep cores /proc/cpuinfo»? также версия TF и ​​Keras

Salih Karagoz 01.06.2019 00:54

Keras по умолчанию использует все ядра процессора. Я протестировал ваш код на своем 8-ядерном сервере, и он дает 86-процентную производительность процессора. Я думаю, у вас может быть другая проблема, вы проверяли htop/top? и опубликуйте свои результаты на основе rosettacode.org/wiki/Linux_CPU_utilization

Salih Karagoz 01.06.2019 01:06

Я думаю, что одна вещь, которую вы можете сделать, это увеличить размер партии.

Salih Karagoz 01.06.2019 01:08

@SalihKaragoz Я проверил производительность процессора на своем настольном компьютере в лаборатории, конфигурация которого, насколько я помню, четырехъядерный процессор с 16 ГБ ОЗУ, но я проверю и обновлю его завтра, версия Keras 2.2.4 и TF 1.10.0 Производительность процессора была 30% мощности!! Я также мог бы получить доступ к мощному серверному компьютеру без VGA, но кажется, что Keras должен автоматически использовать как минимум 80% мощности ЦП, на практике для меня это 30% !! когда я увеличиваю batch_size на своем настольном компьютере, он падает! Кто-нибудь порекомендуйте мне использовать multiprocessing при использовании серверного компьютера!

Mario 01.06.2019 01:20

@SalihKaragoz можно ли изменить код таким образом, чтобы я мог одновременно обучать несколько моделей? например, RNN и LSTM одновременно с использованием библиотек с многопроцессорной обработкой? Можно ли выделить 1 или более ядер в процессоре для обработки/обучения моделей? Каково ваше мнение?

Mario 01.06.2019 01:28

@SalihKaragoz Я обновил конфигурацию своего оборудования. Я был бы рад, если бы у вас есть какие-либо идеи, чтобы получить правильные результаты, а также.

Mario 04.06.2019 18:33

@Mario, на самом деле я не играл на уровнях процессора. возможно, проблема связана с адаптацией framework-OS. обычно Keras обрабатывает его по умолчанию. но idw в вашем случае удачи.

Salih Karagoz 05.06.2019 00:01

Что касается многопроцессорности, это также может быть вам интересно, хотя это связано только с вашим вопросом: stackoverflow.com/questions/56441216/…

Markus 05.06.2019 19:27
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
10
11
7 658
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Хорошо, что обучение одной модели не использует все 100% вашего процессора! Теперь у нас есть место для параллельного обучения нескольких моделей и ускорения общего времени обучения.

NB: если вы хотите просто ускорить эту модель, изучите графические процессоры или измените гиперпараметры, такие как размер пакета и количество нейронов (размер слоя).

Вот как вы можете использовать multiprocessing для одновременного обучения нескольких моделей (используя процессы, работающие параллельно на каждом отдельном ядре ЦП вашей машины).

multiprocessing.Pool в основном создает пул рабочих мест, которые необходимо выполнить. Процессы подберут эти задания и запустят их. Когда задание завершено, процесс выберет другое задание из пула.

import time
import signal
import multiprocessing

def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(layer_size):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    import keras
    from keras.models import Sequential
    from keras.layers import Dense

    print(f'Training a model with layer size {layer_size}')

    # build your model here
    model_RNN = Sequential()
    model_RNN.add(Dense(layer_size))

    # fit the model (the bit that takes time!)
    model_RNN.fit(...)

    # lets demonstrate with a sleep timer
    time.sleep(5)

    # save trained model to a file
    model_RNN.save(...)

    # you can also return values eg. the eval score
    return model_RNN.evaluate(...)


num_workers = 4
hyperparams = [800, 960, 1100]

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, hyperparams)

print(scores)

Выход:

Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]

Это легко продемонстрировать с помощью time.sleep в коде. Вы увидите, что все 3 процесса начинают обучающую работу, а затем завершаются примерно в одно и то же время. Если бы это было однократно обработано, вам пришлось бы ждать завершения каждого из них, прежде чем начинать следующий (зевок!).

РЕДАКТИРОВАТЬ ОП также хотел полный код. Это сложно в Stack Overflow, потому что я не могу тестировать в вашей среде и с вашим кодом. Я позволил себе скопировать и вставить ваш код в свой шаблон выше. Возможно, вам придется добавить некоторые импорты, но это максимально близко к «работоспособному» и «полному» коду.

import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score


def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(model_type):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    from keras.layers import LSTM, SimpleRNN, Dense, Activation
    from keras.models import Sequential
    from keras.callbacks import EarlyStopping, ReduceLROnPlateau
    from keras.layers.normalization import BatchNormalization

    print(f'Training a model: {model_type}')

    callbacks = [
        EarlyStopping(patience=10, verbose=1),
        ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ]

    model = Sequential()

    if model_type == 'rnn':
        model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
    elif model_type == 'lstm':
        model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))

    model.add(Dense(480))
    model.add(BatchNormalization())
    model.add(Activation('tanh'))
    model.compile(loss='mean_squared_error', optimizer='adam')
    model.fit(
        trainX,
        trainY,
        epochs=50,
        batch_size=20,
        validation_data=(testX, testY),
        verbose=1,
        callbacks=callbacks,
    )

    # predict
    Y_Train_pred = model.predict(trainX)
    Y_Test_pred = model.predict(testX)

    train_MSE = mean_squared_error(trainY, Y_Train_pred)
    test_MSE = mean_squared_error(testY, Y_Test_pred)

    # you can also return values eg. the eval score
    return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}


# Your code
# ---------

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i % 3 == 2]

Y_train = df[index]
df = df.values

# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
    dataX, dataY = [], []
    print("Len:", len(dataset) - look_back - 1)
    for i in range(len(dataset) - look_back - 1):
        a = dataset[i : (i + look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back, :])
    return np.array(dataX), np.array(dataY)


Y_train = np.array(Y_train)
df = np.array(df)

look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)

# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
    trainX, trainY, test_size=0.2, shuffle=False
)

# My Code
# -------

num_workers = 2
model_types = ['rnn', 'lstm']

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, model_types)

print(scores)

Вывод программы:

[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866}, 
 {'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]

Комментарии не для расширенного обсуждения; этот разговор был перешел в чат.

George Stocker 05.06.2019 19:35

@GeorgeStocker, не мог бы ты закрыть чат? Готово! Я собираюсь принять ответ. Спасибо

Mario 06.06.2019 12:44

Другие вопросы по теме