У меня есть конвейер DSL2 Nextflow, который разветвляется на 2 FILTER
процесса. Затем в процессе CONCAT
я повторно использую два предыдущих вывода процесса в качестве входных данных. Также в процессе SUMMARIZE
я повторно использую выходные данные предыдущего процесса в качестве входных данных.
Я обнаружил, что когда я запускаю конвейер с 2 или более парами сэмплов fastq, входные данные перепутаны.
Например, на шаге CONCAT
я в конечном итоге объединяю bwa_2_ch
вывод одной пары сэмплов fastq с filter_1_ch
другой пары сэмплов fastq вместо сэмплов с одним и тем же идентификатором пары.
Я считаю, что не совсем правильно пишу workflow { }
каналы и ввод, рабочий процесс правильно проходит шаги без смешивания сэмплов. Но я не уверен, как определить входы, чтобы не было путаницы.
//trimmomatic read trimming
process TRIM {
tag "trim ${pair_id}"
publishDir "${params.outdir}/$pair_id/trim_results"
input:
tuple val(pair_id), path(reads)
output:
tuple val(pair_id), path("trimmed_${pair_id}_...")
script:
"""
"""
}
//bwa alignment
process BWA_1 {
tag "align-1 ${pair_id}f"
publishDir "${params.outdir}/$pair_id/..."
input:
tuple val(pair_id), path(reads)
path index
output:
tuple val(pair_id), path("${pair_id}_...}")
script:
"""
"""
}
process FILTER_1 {
tag "filter ${pair_id}"
publishDir "${params.outdir}/$pair_id/filter_results"
input:
tuple val(pair_id), path(reads)
output:
tuple val(pair_id),
path("${pair_id}_...")
script:
"""
"""
}
process FILTER_2 {
tag "filter ${pair_id}"
publishDir "${params.outdir}/$pair_id/filter_results"
input:
tuple val(pair_id), path(reads)
output:
tuple val(pair_id),
path("${pair_id}_...")
script:
"""
"""
}
//bwa alignment
process BWA_2 {
tag "align-2 ${pair_id}"
publishDir "${params.outdir}/$pair_id/bwa_2_results"
input:
tuple val(pair_id), path(reads)
path index
output:
tuple val(pair_id), path("${pair_id}_...}")
script:
"""
"""
}
//concatenate pf and non_human reads
process CONCAT{
tag "concat ${pair_id}"
publishDir "${params.outdir}/$pair_id"
input:
tuple val(pair_id), path(program_reads)
tuple val(pair_id), path(pf_reads)
output:
tuple val(pair_id), path("${pair_id}_...")
script:
"""
"""
}
//summary
process SUMMARY{
tag "summary ${pair_id}"
publishDir "${params.outdir}/$pair_id"
input:
tuple val(pair_id), path(trim_reads)
tuple val(pair_id), path(non_human_reads)
output:
file("summary_${pair_id}.csv")
script:
"""
"""
}
workflow {
Channel
.fromFilePairs(params.reads, checkIfExists: true)
.set {read_pairs_ch}
// trim reads
trim_ch = TRIM(read_pairs_ch)
// map to pf genome
bwa_1_ch = BWA_1(trim_ch, params.pf_index)
// filter mapped reads
filter_1_ch = FILTER_1(bwa_1_ch)
filter_2_ch = FILTER_2(bwa_1_ch)
// map to pf and human genome
bwa_2_ch = BWA_2(filter_2_ch, params.index)
// concatenate non human reads
concat_ch = CONCAT(bwa_2_ch,filter_1_ch)
// summarize
summary_ch = SUMMARY(trim_ch,concat_ch)
}
Подобная путаница обычно возникает, когда процесс ошибочно получает два или более каналов очереди . В большинстве случаев вам нужен один канал очереди и один или несколько значений каналов, когда вам требуется несколько входных каналов. Здесь я точно не уверен, к чему будет привязано имя пара_id, но, скорее всего, это будет не то, что вы ожидаете:
input:
tuple val(pair_id), path(program_reads)
tuple val(pair_id), path(pf_reads)
Что вы хотите сделать, это заменить вышеуказанное на:
input:
tuple val(pair_id), path(program_reads), path(pf_reads)
А затем используйте оператор join для создания необходимых входных данных. Например:
workflow {
Channel
.fromFilePairs( params.reads, checkIfExists: true )
.set { read_pairs_ch }
pf_index = file( params.pf_index )
bwa_index = file( params.bwa_index )
// trim reads
trim_ch = TRIM( read_pairs_ch )
// map to pf genome
bwa_1_ch = BWA_1( trim_ch, pf_index)
// filter mapped reads
filter_1_ch = FILTER_1(bwa_1_ch)
filter_2_ch = FILTER_2(bwa_1_ch)
// map to pf and human genome
bwa_2_ch = BWA_2(filter_2_ch, bwa_index)
// concatenate non human reads
concat_ch = bwa_2_ch \
| join( filter_1_ch ) \
| CONCAT
// summarize
summary_ch = trim_ch \
| join( concat_ch ) \
| SUMMARY
}