Как сделать CNN инвариантным к положению паттерна в последовательности ДНК?

Я пытаюсь провести бинарную классификацию, находя шаблон (скажем, «CTCATGTCA») в последовательности ДНК с использованием CNN. Я написал модель в Pytorch. Когда шаблон находится в центре последовательности, модель обнаруживает его. Но если узор расположен в случайных местах, модель не работает. Как сделать CNN инвариантным к положению шаблона?

Это мой код:

import logging

import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from sklearn import metrics
from skorch import NeuralNetClassifier
from skorch.callbacks import EpochScoring
from torch.utils.data import DataLoader, Dataset
import numpy as np

import constants

timber = logging.getLogger()
logging.basicConfig(level=logging.INFO)  # change to level=logging.DEBUG to print more logs...


# utils

def one_hot_e(dna_seq: str) -> np.ndarray:
  mydict = {'A': np.asarray([1.0, 0.0, 0.0, 0.0]), 'C': np.asarray([0.0, 1.0, 0.0, 0.0]),
            'G': np.asarray([0.0, 0.0, 1.0, 0.0]), 'T': np.asarray([0.0, 0.0, 0.0, 1.0]),
            'N': np.asarray([0.0, 0.0, 0.0, 0.0]), 'H': np.asarray([0.0, 0.0, 0.0, 0.0]),
            'a': np.asarray([1.0, 0.0, 0.0, 0.0]), 'c': np.asarray([0.0, 1.0, 0.0, 0.0]),
            'g': np.asarray([0.0, 0.0, 1.0, 0.0]), 't': np.asarray([0.0, 0.0, 0.0, 1.0]),
            'n': np.asarray([0.0, 0.0, 0.0, 0.0]), '-': np.asarray([0.0, 0.0, 0.0, 0.0])}

  size_of_a_seq: int = len(dna_seq)

  # forward = np.zeros(shape=(size_of_a_seq, 4))

  forward_list: list = [mydict[dna_seq[i]] for i in range(0, size_of_a_seq)]
  encoded = np.asarray(forward_list)
  return encoded


def one_hot_e_column(column: pd.Series) -> np.ndarray:
  tmp_list: list = [one_hot_e(seq) for seq in column]
  encoded_column = np.asarray(tmp_list)
  return encoded_column


def reverse_dna_seq(dna_seq: str) -> str:
  # m_reversed = ""
  # for i in range(0, len(dna_seq)):
  #     m_reversed = dna_seq[i] + m_reversed
  # return m_reversed
  return dna_seq[::-1]


def complement_dna_seq(dna_seq: str) -> str:
  comp_map = {"A": "T", "C": "G", "T": "A", "G": "C",
              "a": "t", "c": "g", "t": "a", "g": "c",
              "N": "N", "H": "H", "-": "-",
              "n": "n", "h": "h"
              }

  comp_dna_seq_list: list = [comp_map[nucleotide] for nucleotide in dna_seq]
  comp_dna_seq: str = "".join(comp_dna_seq_list)
  return comp_dna_seq


def reverse_complement_dna_seq(dna_seq: str) -> str:
  return reverse_dna_seq(complement_dna_seq(dna_seq))


def reverse_complement_dna_seqs(column: pd.Series) -> pd.Series:
  tmp_list: list = [reverse_complement_dna_seq(seq) for seq in column]
  rc_column = pd.Series(tmp_list)
  return rc_column


class CNN1D(nn.Module):
  def __init__(self,
               in_channel_num_of_nucleotides=4,
               kernel_size_k_mer_motif=4,
               dnn_size=256,
               num_filters=1,
               lstm_hidden_size=128,
               *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.conv1d = nn.Conv1d(in_channels=in_channel_num_of_nucleotides, out_channels=num_filters,
                            kernel_size=kernel_size_k_mer_motif, stride=2)
    self.activation = nn.ReLU()
    self.pooling = nn.MaxPool1d(kernel_size=kernel_size_k_mer_motif, stride=2)

    self.flatten = nn.Flatten()
    # linear layer

    self.dnn2 = nn.Linear(in_features=14 * num_filters, out_features=dnn_size)
    self.act2 = nn.Sigmoid()
    self.dropout2 = nn.Dropout(p=0.2)

    self.out = nn.Linear(in_features=dnn_size, out_features=1)
    self.out_act = nn.Sigmoid()

    pass

  def forward(self, x):
    timber.debug(constants.magenta + f"h0: {x}")
    h = self.conv1d(x)
    timber.debug(constants.green + f"h1: {h}")
    h = self.activation(h)
    timber.debug(constants.magenta + f"h2: {h}")
    h = self.pooling(h)
    timber.debug(constants.blue + f"h3: {h}")
    timber.debug(constants.cyan + f"h4: {h}")

    h = self.flatten(h)
    timber.debug(constants.magenta + f"h5: {h},\n shape {h.shape}, size {h.size}")
    h = self.dnn2(h)
    timber.debug(constants.green + f"h6: {h}")

    h = self.act2(h)
    timber.debug(constants.blue + f"h7: {h}")

    h = self.dropout2(h)
    timber.debug(constants.cyan + f"h8: {h}")

    h = self.out(h)
    timber.debug(constants.magenta + f"h9: {h}")

    h = self.out_act(h)
    timber.debug(constants.green + f"h10: {h}")
    # h = (h > 0.5).float()  # <---- should this go here?
    # timber.debug(constants.green + f"h11: {h}")

    return h


class CustomDataset(Dataset):
  def __init__(self, dataframe):
    self.x = dataframe["Sequence"]
    self.y = dataframe["class"]

  def __len__(self):
    return len(self.y)

  def preprocessing(self, x1, y1) -> (torch.Tensor, torch.Tensor, torch.Tensor):
    forward_col = x1

    backward_col = reverse_complement_dna_seqs(forward_col)

    forward_one_hot_e_col: np.ndarray = one_hot_e_column(forward_col)
    backward_one_hot_e_col: np.ndarray = one_hot_e_column(backward_col)

    tr_xf_tensor = torch.Tensor(forward_one_hot_e_col).permute(1, 2, 0)
    tr_xb_tensor = torch.Tensor(backward_one_hot_e_col).permute(1, 2, 0)
    # timber.debug(f"y1 {y1}")
    tr_y1 = np.array([y1])  # <--- need to put it inside brackets

    return tr_xf_tensor, tr_xb_tensor, tr_y1

  def __getitem__(self, idx):
    m_seq = self.x.iloc[idx]
    labels = self.y.iloc[idx]
    xf, xb, y = self.preprocessing(m_seq, labels)
    timber.debug(f"xf -> {xf.shape}, xb -> {xb.shape}, y -> {y}")
    return xf, xb, y


def test_dataloader():
  df = pd.read_csv("todo.csv")
  X = df["Sequence"]
  y = df["class"]

  ds = CustomDataset(df)
  loader = DataLoader(ds, shuffle=True, batch_size=16)

  train_loader = loader

  for data in train_loader:
    timber.debug(data)
    # xf, xb, y = data[0], data[1], data[2]
    # timber.debug(f"xf -> {xf.shape}, xb -> {xb.shape}, y -> {y.shape}")
  pass


def get_callbacks() -> list:
  # metric.auc ( uses trapezoidal rule) gave an error: x is neither increasing, nor decreasing. so I had to remove it
  return [
    ("tr_acc", EpochScoring(
      metrics.accuracy_score,
      lower_is_better=False,
      on_train=True,
      name = "train_acc",
    )),

    ("tr_recall", EpochScoring(
      metrics.recall_score,
      lower_is_better=False,
      on_train=True,
      name = "train_recall",
    )),
    ("tr_precision", EpochScoring(
      metrics.precision_score,
      lower_is_better=False,
      on_train=True,
      name = "train_precision",
    )),
    ("tr_roc_auc", EpochScoring(
      metrics.roc_auc_score,
      lower_is_better=False,
      on_train=False,
      name = "tr_auc"
    )),
    ("tr_f1", EpochScoring(
      metrics.f1_score,
      lower_is_better=False,
      on_train=False,
      name = "tr_f1"
    )),
    # ("valid_acc1", EpochScoring(
    #   metrics.accuracy_score,
    #   lower_is_better=False,
    #   on_train=False,
    #   name = "valid_acc1",
    # )),
    ("valid_recall", EpochScoring(
      metrics.recall_score,
      lower_is_better=False,
      on_train=False,
      name = "valid_recall",
    )),
    ("valid_precision", EpochScoring(
      metrics.precision_score,
      lower_is_better=False,
      on_train=False,
      name = "valid_precision",
    )),
    ("valid_roc_auc", EpochScoring(
      metrics.roc_auc_score,
      lower_is_better=False,
      on_train=False,
      name = "valid_auc"
    )),
    ("valid_f1", EpochScoring(
      metrics.f1_score,
      lower_is_better=False,
      on_train=False,
      name = "valid_f1"
    ))
  ]


def start():

  # df = pd.read_csv("data64.csv")  # use this line
  df = pd.read_csv("data64random.csv")
  X = df["Sequence"]
  y = df["class"]

  npa = np.array([y.values])

  torch_tensor = torch.tensor(npa)  # [0, 1, 1, 0, ... ... ] a simple list
  print(f"torch_tensor: {torch_tensor}")
  # need to transpose it!

  yt = torch.transpose(torch_tensor, 0, 1)

  ds = CustomDataset(df)
  loader = DataLoader(ds, shuffle=True)

  # train_loader = loader
  # test_loader = loader  # todo: load another dataset later

  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  model = CNN1D().to(device)
  m_criterion = nn.BCEWithLogitsLoss
  # optimizer = optim.Adam(model.parameters(), lr=0.001)
  m_optimizer = optim.Adam

  net = NeuralNetClassifier(
    model,
    max_epochs=200,
    criterion=m_criterion,
    optimizer=m_optimizer,
    lr=0.01,
    # decay=0.01,
    # momentum=0.9,

    device=device,
    classes=["no_mqtl", "yes_mqtl"],
    verbose=True,
    callbacks=get_callbacks()
  )

  ohe_c = one_hot_e_column(X)
  print(f"ohe_c shape {ohe_c.shape}")
  ohe_c = torch.Tensor(ohe_c)
  ohe_c = ohe_c.permute(0, 2, 1)
  ohe_c = ohe_c.to(device)
  print(f"ohe_c shape {ohe_c.shape}")

  net.fit(X=ohe_c, y=yt)
  y_proba = net.predict_proba(ohe_c)
  # timber.info(f"y_proba = {y_proba}")
  pass


if __name__ == '__main__':
  start()
  # test_dataloader()
  pass

И вы можете найти 2 набора данных

  1. dna64random.csv (модель с этим не работает)
  2. dna64.csv (модель работает с ним)

Вы можете быстро скачать все, воспользовавшись этой основной ссылкой

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
70
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В приведенной ниже модели используются слои Conv1d, а точность проверки составляет от 99% до 100%. Чтобы добиться этого, не потребовалось много настроек.

Я тренировался на 60% данных, из которых 30% использовалось для проверки. Модель относительно небольшая, около 700 параметров.

Модель имеет широкое рецептивное поле на входе и соответствует длине паттерна. Производительность модели была весьма чувствительна к этой начальной конфигурации. Последующие свёрточные слои удваивают размер объекта до 8 и слегка обрезают последовательность. Последний слой maxpool уменьшает вдвое оставшуюся длину последовательности, прежде чем она сглаживается и сопоставляется со скаляром через плотный слой.

Я обнаружил, что NAdam работает лучше, чем Adam, и больше не изучал оптимизаторы. Размер партии 8 и ниже работал хорошо.

...
[epoch  13] trn loss: 0.011 [acc: 100.000%] | val loss: 0.011 [acc: 100.000%]
[epoch  14] trn loss: 0.014 [acc: 100.000%] | val loss: 0.009 [acc:  99.667%]
[epoch  15] trn loss: 0.008 [acc: 100.000%] | val loss: 0.006 [acc: 100.000%]

Я попробовал конвальные слои, а не повторяющиеся ячейки, поскольку они, как правило, работают быстрее и работают лучше. Поскольку производительность была хорошей, я не рассматривал LSTM. Ваша модель кажется довольно большой — попробуйте уменьшить ее и посмотрите, улучшится ли это.


Код для подготовки данных, определения и обучения модели, а также печати/построения результатов.

import numpy as np
from matplotlib import pyplot as plt
import pandas as pd

import torch
from torch.utils.data import DataLoader
from torch import nn

np.random.seed(0)

def one_hot_e(dna_seq: str) -> np.ndarray:
  mydict = {'A': np.asarray([1.0, 0.0, 0.0, 0.0]), 'C': np.asarray([0.0, 1.0, 0.0, 0.0]),
            'G': np.asarray([0.0, 0.0, 1.0, 0.0]), 'T': np.asarray([0.0, 0.0, 0.0, 1.0]),
            'N': np.asarray([0.0, 0.0, 0.0, 0.0]), 'H': np.asarray([0.0, 0.0, 0.0, 0.0]),
            'a': np.asarray([1.0, 0.0, 0.0, 0.0]), 'c': np.asarray([0.0, 1.0, 0.0, 0.0]),
            'g': np.asarray([0.0, 0.0, 1.0, 0.0]), 't': np.asarray([0.0, 0.0, 0.0, 1.0]),
            'n': np.asarray([0.0, 0.0, 0.0, 0.0]), '-': np.asarray([0.0, 0.0, 0.0, 0.0])}

  size_of_a_seq: int = len(dna_seq)

  # forward = np.zeros(shape=(size_of_a_seq, 4))

  forward_list: list = [mydict[dna_seq[i]] for i in range(0, size_of_a_seq)]
  encoded = np.asarray(forward_list)
  return encoded

#
#Load and prepare data    "CTCATGTCA"
#
df = pd.read_csv('data64random.csv')

#To numpy arrays, and encode X
X = np.stack([one_hot_e(row) for row in df.Sequence], axis=0)
y = df['class'].values

#Shuffle and split
train_size = int(0.6 * len(X))
val_size = int(0.3 * len(X))

shuffle_ixs = np.random.permutation(len(X))
X, y = [arr[shuffle_ixs] for arr in [X, y]]

X_train, y_train = [arr[:train_size] for arr in [X, y]]
X_val, y_val = [arr[train_size:train_size + val_size] for arr in [X, y]]

#As tensors. Useful for passing directly to model as a single large batch.
X_train_t, y_train_t = [torch.tensor(arr).float() for arr in [X_train, y_train]]
X_val_t, y_val_t = [torch.tensor(arr).float() for arr in [X_val, y_val]]


#
#Define the model
#

#Lambda layer useful for simple manipulations
class LambdaLayer(nn.Module):
    def __init__(self, func):
        super().__init__()
        self.func = func
    
    def forward(self, x):
        return self.func(x)

#batch, 64, chan

#The model
seq_len = X[0].shape[0]    #64 characters long
n_features = X[0].shape[1] #4-dim onehot encoding

torch.manual_seed(0)
model = nn.Sequential(
    #> (batch, seq_len, channels)
    
    LambdaLayer(lambda x: x.swapdims(1, 2)),
    #> (batch, channels, seq_len)
    
    #Initial wide receptive field (and it matches length of the pattern)
    nn.Conv1d(in_channels=n_features, out_channels=4, kernel_size=9, padding='same'),
    nn.ReLU(),
    nn.BatchNorm1d(num_features=4),
    
    #Conv block 1 doubles features
    nn.Conv1d(in_channels=4, out_channels=8, kernel_size=3),
    nn.ReLU(),
    nn.BatchNorm1d(num_features=8),
    
    #Conv block 2, then maxpool
    nn.Conv1d(in_channels=8, out_channels=8, kernel_size=3),
    nn.ReLU(),
    nn.BatchNorm1d(num_features=8),
    
    #Output layer: flatten, linear
    nn.MaxPool1d(kernel_size=2, stride=2), #batch, feat, seq
    nn.Flatten(start_dim=1), #batch, feat*seq
    nn.Linear(8 * 30, 1),
)
print(
    'Model size is',
    sum([p.numel() for p in model.parameters() if p.requires_grad]),
    'trainable parameters'
)

#Train loader for batchifying train data
train_loader = DataLoader(list(zip(X_train_t, y_train_t)), shuffle=True, batch_size=8)

optimiser = torch.optim.NAdam(model.parameters())
loss_fn = nn.BCEWithLogitsLoss()

from collections import defaultdict
metrics_dict = defaultdict(list)

for epoch in range(n_epochs := 15):
    model.train()
    cum_loss = 0
    
    for X_minibatch, y_minibatch in train_loader:
        logits = model(X_minibatch).ravel()
        loss = loss_fn(logits, y_minibatch)
        
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()
        
        cum_loss += loss.item() * len(X_minibatch)
    #/end of epoch
    
    time_to_print = (epoch == 0) or ((epoch + 1) % 1) == 0
    if not time_to_print:
        continue
    
    model.eval()
    
    with torch.no_grad():
        val_logits = model(X_val_t).ravel()
        trn_logits = model(X_train_t).ravel()
    
    val_loss = loss_fn(val_logits, y_val_t).item()
    trn_loss = cum_loss / len(X_train_t)
    
    val_acc = ((nn.Sigmoid()(val_logits) > 0.5).int() == y_val_t.int()).float().mean().item()
    trn_acc = ((nn.Sigmoid()(trn_logits) > 0.5).int() == y_train_t.int()).float().mean().item()
    print(
        f'[epoch {epoch + 1:>3d}]',
        f'trn loss: {trn_loss:>5.3f} [acc: {trn_acc:>8.3%}] |',
        f'val loss: {val_loss:>5.3f} [acc: {val_acc:>8.3%}]'
    )
    
    #Record metrics
    metrics_dict['epoch'].append(epoch + 1)
    metrics_dict['trn_loss'].append(trn_loss)
    metrics_dict['val_loss'].append(val_loss)
    metrics_dict['trn_acc'].append(trn_acc)
    metrics_dict['val_acc'].append(val_acc)

#View training curves
metrics_df = pd.DataFrame(metrics_dict).set_index('epoch')
ax = metrics_df.plot(
    use_index=True, y=['trn_loss', 'val_loss'], ylabel='loss',
    figsize=(8, 4), legend=False, linewidth=3, marker='s', markersize=7
)

metrics_df.mul(100).plot(
    use_index=True, y=['trn_acc', 'val_acc'], ylabel='acc', 
    linestyle='--', marker='o', ax=ax.twinx(), legend=False
)
ax.figure.legend(ncol=2)
ax.set_title('training curves')

Спасибо за помощь мне. Я все еще новичок в машинном обучении. Как вы это поняли?

Qazi Fahim Farhan 05.05.2024 05:50

Без проблем. Это широко используемая архитектура, в которой вы используете уровни конверсии для увеличения количества каналов, а затем используете maxpooling для уменьшения длины последовательности/размера изображения. Часто за результат отвечает последний плотный слой. Этот общий подход принят большинством сетей, и здесь он хорошо сработал. Если бы это не сработало, мне пришлось бы экспериментировать с другими вариантами; в выяснении этих вещей часто приходится методом проб и ошибок.

MuhammedYunus 05.05.2024 11:09

Другие вопросы по теме