Я новичок в Python, и это моя первая сеть DQN NN. Я использую тренажерный зал, и агент учится тому, какие действия лучше всего предпринять, чтобы получить наибольшую прибыль при торговле.
мой код использует процессор и около 500 МБ ОЗУ на каждого агента и может использовать 200 МБ видеопамяти графического процессора при использовании графического процессора.
МОЙ ВОПРОС. как я могу вычислить все это на графическом процессоре и использовать параллельные вычисления. или мне следует просто использовать ядра ЦП, поскольку они быстрее без параллельных вычислений, которыми, как я полагаю, сейчас является этот код.
если вы используете графический процессор, вы можете получить эту ошибку. RuntimeError: невозможно повторно инициализировать CUDA в разветвленном подпроцессе. Чтобы использовать CUDA с многопроцессорной обработкой, вы должны использовать метод запуска «spawn».
есть ли у кого-нибудь возможные улучшения скорости?
import os
import pandas as pd
import numpy as np
import gym
from gym import spaces
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data.dataset import IterableDataset
from pytorch_lightning import LightningModule, Trainer
from typing import List, Tuple, Generator
import random
import sqlite3
import time
import subprocess as sp
import psutil
import multiprocessing as mp
avg_reward_list = [] # List to store average rewards
avg_total_actions = [] # List to store average total actions
episode_times = []
num_episodes = 5000
if torch.cuda.is_available():
gpu_true = 1
else:
gpu_true = 0
# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if gpu_true == 0:
# Create or connect to the SQLite database
conn = sqlite3.connect('trading_data.db')
cursor = conn.cursor()
# Create a table to store episode data if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS data (
episode REAL,
net_worth REAL,
steps INTEGER,
epsilon REAL,
avg_reward REAL,
avg_total_actions REAL,
profit_factor REAL
)
''')
# Clear existing data in the table
cursor.execute('DELETE FROM data')
# Commit changes and close the connection
conn.commit()
conn.close()
def get_usage():
# Getting loadover15 minutes
load1, load5, load15 = psutil.getloadavg()
cpu_usage = psutil.cpu_percent(interval=1)
if gpu_true == 1:
output_to_list = lambda x: x.decode('ascii').split('\n')[:-1]
COMMAND = "nvidia-smi --query-gpu=utilization.gpu --format=csv"
try:
gpu_usage_info = output_to_list(sp.check_output(COMMAND.split(), stderr=sp.STDOUT))[1:]
except sp.CalledProcessError as e:
raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output))
gpu_usage_values = [int(x.split()[0]) for i, x in enumerate(gpu_usage_info)]
print("The CPU usage is : ", cpu_usage, 'GPU',gpu_usage_values)
else:
print("The CPU usage is : ", cpu_usage)
# Add CUDA-related environment variable settings
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
os.environ["TORCH_USE_CUDA_DSA"] = "1"
#print(torch.cuda.is_available())
# Define the calculate_profit function
def calculate_profit(next_price, current_price, amount_held):
if amount_held > 0:
return (current_price - next_price) * amount_held
elif amount_held < 0:
return (next_price - current_price) * abs(amount_held)
else:
return 0
class DQN(nn.Module):
def __init__(self, in_states, h1_nodes, out_actions, dropout_prob=0.0):
super().__init__()
self.fc1 = nn.Linear(in_states, h1_nodes)
self.dropout = nn.Dropout(dropout_prob)
self.fc2 = nn.Linear(h1_nodes, out_actions)
def forward(self, x):
x = x.to(next(self.parameters()).device)
x = torch.relu(self.fc1(x.float()))
x = self.dropout(x)
x = self.fc2(x)
return x
class CustomEnv(gym.Env):
def __init__(self, df, initial_balance=10000, lookback_window_size=1):
super(CustomEnv, self).__init__()
self.df = df
self.df_total_steps = 10000
self.initial_balance = initial_balance
self.lookback_window_size = lookback_window_size
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.lookback_window_size,), dtype=np.float32)
self.current_step = None
self.total_actions = 0
self.step_count = 0
self.current_price = 0
self.net_worth = 0
self.gamma = 0.90
self.epsilon = 1.0
self.epsilon_min = 0.01
self.epsilon_decay = 0.999998
self.model = DQN(1, 80, 3).to(device)
self.target_model = DQN(1, 80, 3).to(device)
self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
self.batch_size = 3000
self.training_data = []
self.episode_rewards_array = [] # List to store rewards for this episode
self.action_total_array = [] # List to store total actions for this episode
self.net_worth_array = [] # List to store net worth for this episode
self.not_profitable_profits = 0
self.profitable_profits = 0
def reset(self):
self.net_worth = self.initial_balance
self.current_step = self.lookback_window_size
self.total_actions = random.uniform(-10, 10)
self.current_price = self.df[self.current_step, 0] # Assuming 'close' is the first column
return self._next_observation()
def _next_observation(self):
close_prices = self.df[self.current_step - self.lookback_window_size:self.current_step, 1] # Assuming 'Normalized_SMA_Slope' is the second column
return close_prices.clone().to(device)
def step(self, action):
self.current_step += 1
self.step_count += 1
if self.current_step >= len(self.df):
return self._next_observation(), 0, True, {}
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
obs = self._next_observation()
self.total_actions = (action - 1) * 10 # Convert action index to total actions
next_price = self.df[self.current_step + 1, 0] if self.current_step + 1 < len(self.df) else self.current_price
self.current_price = self.df[self.current_step, 0] # Assuming 'close' is the first column
holding = self.total_actions / 10
profit = calculate_profit(next_price, self.current_price, holding)
self.net_worth += profit
reward = profit
if self.current_step >= self.df_total_steps or self.net_worth < 100:
done = 1
else:
done = 0
if self.step_count > self.batch_size:
#self.target_model.load_state_dict(self.model.state_dict())
self.step_count = 0
get_usage()
self.episode_rewards_array.append(reward)
self.action_total_array.append(self.total_actions)
self.net_worth_array.append(self.net_worth)
if profit > 0: #and avg_action < 1 and avg_action > -1:
#self.profitable_profits.append(profit)
self.profitable_profits += profit
#profit = profit*2
else:
self.not_profitable_profits -= profit
self.profit_factor = ((self.profitable_profits / self.not_profitable_profits) if self.not_profitable_profits != 0 else 0.0) - 1
# Append data for training
self.training_data.append((obs, action, reward))
if len(self.training_data) >= 100:
self.train_dqn_batch()
self.training_data = []
return obs, reward, done, {}
def train_dqn_batch(self):
if len(self.training_data) == 0:
return
obs_batch, action_batch, reward_batch = zip(*self.training_data)
obs_batch = torch.stack(obs_batch)
action_batch = torch.tensor(action_batch, dtype=torch.int64).unsqueeze(1).to(device)
reward_batch = torch.tensor(reward_batch, dtype=torch.float32).unsqueeze(1).to(device)
q_values = self.model(obs_batch)
q_value = q_values.gather(1, action_batch).squeeze(1)
expected_q_values = reward_batch
# Ensure expected_q_values has the same shape as q_value
expected_q_values = expected_q_values.view(-1)
# Compute MSE loss
loss = nn.functional.mse_loss(q_value, expected_q_values)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
class RLDataset(IterableDataset):
def __init__(self, env: CustomEnv, sample_size: int = 200) -> None:
self.epoch_count = 0
self.env = env
self.sample_size = sample_size
def __iter__(self) -> Generator[Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], None, None]:
obs = self.env.reset().to(device) # Move initial observation to the correct device
for _ in range(self.env.df_total_steps):
if random.random() < self.env.epsilon:
action = self.env.action_space.sample()
else:
with torch.no_grad(): # Ensure no gradient tracking for inference
action = self.env.model(obs).argmax().item()
next_obs, reward, done, _ = self.env.step(action)
reward = torch.tensor([reward], dtype=torch.float32).to(device) # Ensure reward tensor has correct shape and device
next_obs = next_obs.to(device) # Move next observation to the correct device
yield obs.view(1), torch.tensor([action]).to(device), reward.squeeze(), next_obs.view(1), done
if done == 1:
if gpu_true ==0:
self.epoch_count += 1
# Calculate average reward for this episode
avg_reward = np.mean(self.env.episode_rewards_array) # No need to move to CPU if it's a list
avg_reward_list.append(avg_reward) # Append average reward to list
avg_total_actions = np.mean(self.env.action_total_array) # No need to move to CPU if it's a list
profit_factor = self.env.profit_factor.item()
# Save episode data to the database
conn = sqlite3.connect('trading_data.db')
cursor = conn.cursor()
rounded_net_worth = self.env.net_worth.item()
cursor.execute('''
INSERT INTO data (episode, net_worth, steps, epsilon, avg_reward, avg_total_actions, profit_factor)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (self.epoch_count, rounded_net_worth, self.env.current_step, self.env.epsilon, avg_reward, avg_total_actions, profit_factor))
conn.commit()
conn.close()
obs = self.env.reset().to(device) # Move reset observation to the correct device
else:
obs = next_obs
class DQNLit(LightningModule):
def __init__(self, env: CustomEnv, batch_size) -> None:
super().__init__()
self.env = env
self.model = self.env.model
self.target_net = self.env.target_model
self.batch_size = batch_size
def forward(self, x):
return self.model(x.float())
def dqn_mse_loss(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]) -> torch.Tensor:
obs, action, reward, next_obs, done = batch
# Move tensors to GPU and convert to float32
obs = obs.to(device).float()
action = action.squeeze(1).to(device) # Squeeze to remove extra dimension
reward = reward.to(device).float()
next_obs = next_obs.to(device).float()
done = done.to(device).float()
# Calculate Q-values
q_values = self.model(obs)
next_q_values = self.target_net(next_obs).max(1)[0]
# Calculate expected Q-values
expected_q_values = reward + (1 - done) * self.env.gamma * next_q_values
# Ensure expected_q_values has the same shape as q_values
expected_q_values = expected_q_values.view(-1, 1)
# Compute MSE loss
return nn.MSELoss()(q_values.gather(1, action.unsqueeze(1)), expected_q_values.detach())
def training_step(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], batch_idx: int):
loss = self.dqn_mse_loss(batch)
return loss
def configure_optimizers(self):
return optim.Adam(self.model.parameters(), lr=0.001)
def train_dataloader(self):
return DataLoader(RLDataset(self.env), batch_size=self.batch_size)
def train_agent():
try:
df = pd.read_pickle("/content/drive/MyDrive/Copy of binance-BTCUSDT-1m.pkl")
df = df.sort_values('date_open')
print('started')
except Exception as e:
df = pd.read_pickle("data/binance-BTCUSDT-1m.pkl")
df = df.sort_values('date_open')
window_size = 10
df['SMA'] = df['close'].rolling(window=window_size).mean()
sma_slope = df['SMA'] - df['SMA'].shift(1)
window_long = window_size * 10
sma_slope_long = df['SMA'] - df['SMA'].shift(window_long)
stdev_sma_slope_long = sma_slope_long.rolling(window=window_long, min_periods=1).std()
normalized_sma_slope = sma_slope / stdev_sma_slope_long
df['Normalized_SMA_Slope'] = normalized_sma_slope
# Keep only 'close' and 'Normalized_SMA_Slope' columns
df = df[['close', 'Normalized_SMA_Slope']]
# Drop all NaN values
df.dropna(inplace=True)
# Convert DataFrame columns to float32
df = df.astype(np.float32)
lookback_window_size = 1
train_df = df[:-lookback_window_size]
# Convert DataFrame to PyTorch tensor and move to GPU
df = torch.tensor(df.values, dtype=torch.float32).to(device)
# Pass the PyTorch tensor to CustomEnv
train_env = CustomEnv(df, lookback_window_size=lookback_window_size)
batch_size = 1000
model = DQNLit(train_env, batch_size)
trainer = Trainer(accelerator='cpu', max_epochs=1000)
trainer.fit(model)
def train_worker(train_func):
train_func()
if __name__ == '__main__':
# Number of training agents to run in parallel
num_agents = 1 # can bug out if using gpu/ using multiple for multiple cpu cores
# Create a pool of worker processes
pool = mp.Pool(processes=num_agents)
# Map the train_worker function to the pool
pool.map(train_worker, [train_agent] * num_agents)
# Close the pool of worker processes
pool.close()
#pool.join()
Я профилировал код вручную, чтобы увидеть, что требует больше всего времени для вычислений. Я измеряю время, затрачиваемое на каждую функцию, используя такой код. Я попробовал import line_profiler
, но мне не удалось заставить работать ни этот, ни альтернативные модули.
dqn_mse_loss_timer = []
start_time = time.time()
end_time = time.time()
elapsed_time = (end_time - start_time)
dqn_mse_loss_timer.append(elapsed_time)
sum_dqn_mse_loss_timer = sum(dqn_mse_loss_timer)
print("sum_dqn_mse_loss_timer:", sum_dqn_mse_loss_timer, "seconds")
Первое, что вам нужно сделать, — это профилировать свой код, чтобы увидеть, где находятся фактические узкие места. Например, вы не получите многого от перехода на графический процессор, если у вас есть узкие места в области генерации данных или ввода-вывода. Для параллельного запуска агентов ознакомьтесь с документацией pytorch параллельная среда .