Набор данных, который я сохранил, представляет собой просто координаты последовательности ДНК.
дф:
chr start stop label
chr1 9000 9100 1
chr1 8803 8903 1
chr1 8903 9000 0
Моя цель — расширить исходный набор данных, создав скользящее окно вокруг каждой координаты для захвата контекстных последовательностей.
новый_дф:
chr start stop label
chr1 9000-5000 9000+5000 1
chr1 9001-5000 9001+5000 1
chr1 9002-5000 9002+5000 1
...
chr1 9100-5000 9100+5000 1
...
используя эту функцию:
def expand_coordinates(element_locs, context=3):
# Vectorized expansion of coordinates
start = element_locs['Start'].astype(int)
end = element_locs['End'].astype(int)
expanded_data = []
for idx, row in element_locs.iterrows():
chr_name = row['Chromosome']
chr_start = start[idx]
chr_end = end[idx]
for i in range(chr_start, chr_end + 1):
expanded_data.append({
'Chromosome': chr_name,
'Start': max((i - 1) - context, 0),
'End': min(i + context, max_sizes[chr_name])
})
expanded_df = pd.DataFrame(expanded_data)
return expanded_df
def get_element_seqs(element_locs, context=3):
expanded_df = expand_coordinates(element_locs, context=context)
# Optimize genome fetching
genome = pysam.Fastafile(ref_genome)
def fetch_sequences(row):
return genome.fetch(row['Chromosome'], row['Start'], row['End'])
# Fetch sequences in a vectorized way
expanded_df['sequence'] = expanded_df.apply(fetch_sequences, axis=1)
return element_seqs
dataset = Dataset.from_pandas(element_final[['Chromosome', 'sequence', 'label']])
dataset = dataset.shuffle(seed=42)
tokenizer = AutoTokenizer.from_pretrained(f"InstaDeepAI/nucleotide-transformer-500m-human-ref")
def tokenize_function(examples):
outputs = tokenizer.batch_encode_plus(examples["sequence"], return_tensors = "pt", truncation=False, padding=False, max_length=80)
return outputs
# Creating tokenized dataset
tokenized_dataset = dataset.map(
tokenize_function,
batched=True, batch_size=2000)
input_file = f"tokenized_elements/tokenized_{ELEMENT_LABEL}/{filename}.arrow"
# Load input data
d1 = Dataset.from_file(input_file)
def embed_function(examples):
torch.cuda.empty_cache()
gc.collect()
inputs = torch.tensor(examples['input_ids']) # Convert to tensor
inputs = inputs.to(device)
with torch.no_grad():
outputs = model(input_ids=inputs, output_hidden_states=True)
# Step 3: Extract the embeddings
hidden_states = outputs.hidden_states # List of hidden states from all layers
embeddings = hidden_states[-1] # Assuming you want embeddings from the last layer
averaged_embeddings = torch.mean(embeddings, dim=1) # Calculate mean along dimension 1 (the dimension with size 86)
averaged_embeddings = averaged_embeddings.to(torch.float32) # Ensure float32 data type
return {'embeddings': averaged_embeddings}
# Map embeddings function to input data
embeddings = d1.map(embed_function, batched=True, batch_size=1550)
embeddings = embeddings.remove_columns(["input_ids", "attention_mask"])
# Save embeddings to disk
output_dir = f"embedded_elements/embeddings_{ELEMENT_LABEL}/{filename}" # Assuming ELEMENT_LABEL is defined elsewhere
В результате я получаю огромные наборы данных, что приводит к сбою моего кода (например, я начинаю с 700 тысяч строк, а они расширяются до 1000 миллионов строк). Я использую панд, так что, может быть, в этом тоже проблема? Другая проблема в том, что я, кажется, не использую пакетную обработку? К сожалению, мой код продолжает давать сбой между шагами 2+3. Я думаю, что мне нужно реализовать пакетную обработку, но я не уверен, как все будет работать, поскольку в конечном итоге мне нужно будет передать выходные данные в LLM.
Перепишите функцию expand_coordinates
, так как ваш процесс завершается сбоем между шагами 2 и 3. На шаге 3 следует заменить expanded_df['sequence'] = expanded_df.apply(fetch_sequences, axis=1)
чем-то вроде expanded_df.merge(fetch_sequences: pd.DataFrame, ...)
, поскольку слияние векторизовано. Это заблуждение, что размещение ЛЮБОЙ функции внутри приложения — это векторизованный подход!
def expand_coordinates(element_locs: pd.DataFrame, context: int = 3):
# create a column of ranges (memory efficient since ranges are lazy)
element_locs['range'] = element_locs.apply(lambda row: range(row['start'], row['stop'] + 1), axis=1)
# explode is a vectorized operation
element_locs = element_locs.explode('range')
element_locs['start'] = np.maximum(element_locs['start']-context, 0)
element_locs['stop'] = np.minimum(element_locs['stop']+context, 5000) # <-- 5000 is an arbitrary maximum for demo
return element_locs
Я провел несколько нагрузочных тестов, и все прошло хорошо (хотя это далеко не тот масштаб, с которым вы имеете дело, поскольку я тестирую это на своем ноутбуке - Ubuntu 16 ГБ). Результаты ниже. Обратите внимание: в тестировании существует значительная вариативность, поскольку я генерирую данные с помощью генераторов случайных чисел. Важнейшим моментом здесь является коэффициент взрыва (target rows/initial rows
), который зависит исключительно от значений запуска и остановки (в моем случае генерируемых случайным образом).
Initial rows: 100
Generation time: 0.0 sec
Target rows: 511,628
Explosion time: 0.1 sec
Initial rows: 1,000
Generation time: 0.0 sec
Target rows: 4,965,974
Explosion time: 0.77 sec
Initial rows: 10,000
Generation time: 0.0 sec
Target rows: 50,074,976
Explosion time: 7.32 sec
Initial rows: 11,000
Generation time: 0.0 sec
Target rows: 54,922,952
Explosion time: 8.1 sec
Initial rows: 12,000
Generation time: 0.0 sec
Target rows: 59,966,220
Explosion time: 8.98 sec
Initial rows: 15,000
Generation time: 0.0 sec
Target rows: 75,115,987
Explosion time: 11.51 sec
Initial rows: 20,000
Generation time: 0.0 sec
Target rows: 100,010,662
Process finished with exit code 137 (interrupted by signal 9:SIGKILL)
В том масштабе, с которым вы имеете дело, PANDAS — определенно плохая идея. Посмотрите в Spark, если у вас есть доступ к соответствующей инфе.