У меня есть простой csv, созданный как часть рабочего процесса, как показано ниже:
sample,value
A,1
B,0.5
Отдельно у меня есть еще один канал с именами файлов, совпадающими с именами образцов. Я хотел бы иметь возможность использовать значения, связанные с каждым именем образца, в новом процессе.
Я пытался разделить CSV с помощью .splitCsv, но (что неудивительно) иногда неправильное значение используется с образцом, хотя он выполняется правильное количество раз. Я также пытался просто использовать awk в сценарии, чтобы извлечь соответствующее значение и сохранить его в переменной, и это приводит к использованию правильного значения, но он использует файл CSV, поэтому обрабатывается только один образец.
Сверхупрощенный сценарий nextflow (DSL2):
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
process foo {
input:
path input_file
output:
path 'file.csv', emit csv
"""
script that creates csv
"""
}
process bar {
input:
path input_file2
output:
path 'file.bam', emit bam
"""
script that creates bam files
"""
}
process help_me {
input:
path csv
path bam
output:
path 'result'
"""
script that uses value from csv on associated bam file
"""
}
workflow {
foo(params.input)
bar(params.input2)
help_me(foo.out.csv, bar.out.bam)
}
Спасибо!!
Обновлено: по сути, есть ли способ синхронизировать два канала, чтобы я мог использовать отдельные строки csv со связанными файлами?
Если у вас есть канал ценности, вы можете повторно использовать файл (например, CSV) неограниченное количество раз, не используя канал. Например:
workflow {
input1 = file( params.input1 )
input2 = file( params.input2 )
foo( input1 )
bar( input2 )
help_me(foo.out.csv, bar.out.bam)
}
Здесь и input1
, и input2
являются ценностными каналами. Кроме того, (выделено мной):
A value channel is implicitly created by a process when an input specifies a simple value in the
from
clause. Moreover, a value channel is also implicitly created as output for a process whose inputs are only value channels.
Означает, что и foo.out.csv
, и bar.out.bam
также являются каналами ценности. Кроме того, help_me.out
также является каналом ценности. Если вместо input2
было канал очереди, вы можете видеть, что input1
можно повторно использовать неограниченное количество раз:
$ mkdir -p ./path/to/bams
$ touch ./path/to/bams/{A,B,C}.bam
$ touch ./foo.txt
params.input1 = './foo.txt'
params.input2 = './path/to/bams/*.bam'
workflow {
input1 = file( params.input1 )
input2 = Channel.fromPath( params.input2 )
foo( input1 )
bar( input2 )
help_me(foo.out.csv, bar.out.bam)
}
Полученные результаты:
$ nextflow run script.nf
N E X T F L O W ~ version 22.04.0
Launching `script.nf` [trusting_allen] DSL2 - revision: 75209e4c85
executor > local (7)
[24/d459f7] process > foo [100%] 1 of 1 ✔
[04/a903e4] process > bar (2) [100%] 3 of 3 ✔
[24/7a9a1d] process > help_me (3) [100%] 3 of 3 ✔
Обратите внимание, что bar.out.bam
и help_me.out
теперь являются каналами очереди.
Если вместо этого у вас есть один CSV для каждого образца (или аналогичная конфигурация), вам потребуется каким-то образом предварительно присоединиться к этим каналам и соответствующим образом настроить входное объявление вашего нового процесса. Чего вы хотите избежать, так это объявления двух (или более) каналов очереди в вашем блоке ввода. Эта часть документации стоит потраченного времени: Понять, как работают несколько входных каналов, и она объяснит, почему вы увидели неправильное значение, связанное с конкретным образцом при использовании выходных данных splitCsv. Чтобы присоединиться к этим каналам, вы можете использовать оператор присоединиться. Например, учитывая ваш простой CSV (как «foo.csv») и ранее созданные тестовые блоки:
nextflow.enable.dsl=2
params.input1 = './foo.csv'
params.input2 = './path/to/bams/*.bam'
process help_me {
debug true
input:
tuple val(sample), val(myval), path(bam)
output:
path 'result'
"""
echo -n "sample: ${sample}, myval: ${myval}, bam: ${bam}"
touch result
"""
}
workflow {
Channel.fromPath( params.input1 ) \
| splitCsv( header:true ) \
| map { row -> tuple( row.sample, row.value ) } \
| set { rows_ch }
Channel.fromPath( params.input2 ) \
| map { bam -> tuple( bam.baseName, bam ) } \
| join( rows_ch ) \
| map { sample, bam, myval -> tuple( sample, myval, bam ) } \
| help_me
}
Полученные результаты:
$ nextflow run script.nf
N E X T F L O W ~ version 22.04.0
Launching `script.nf` [lethal_mayer] DSL2 - revision: 395732babc
executor > local (2)
[c5/e96085] process > help_me (1) [100%] 2 of 2 ✔
sample: B, myval: 0.5, bam: B.bam
sample: A, myval: 1, bam: A.bam
Если ваш CSV имеет более одного значения для конкретного образца, и они указаны в отдельных строках, вы, вероятно, захотите вместо этого использовать оператор комбинировать. Например, если ваш 'foo.csv' содержит:
sample,value
A,1
B,0.5
B,2
И замените join( rows_ch )
на combine( rows_ch, by:0 )
в приведенном выше примере. Полученные результаты:
nextflow run script.nf
N E X T F L O W ~ version 22.04.0
Launching `script.nf` [festering_miescher] DSL2 - revision: f8de1e0d20
executor > local (3)
[ee/8af543] process > help_me (3) [100%] 3 of 3 ✔
sample: A, myval: 1, bam: A.bam
sample: B, myval: 0.5, bam: B.bam
sample: B, myval: 2, bam: B.bam
@AnteMeta Это может быть связано с ошибкой присоединения. Есть ли общий элемент, к которому можно присоединиться? Если вы можете включить в свой вопрос Посмотреть каждого канала, я могу помочь в отладке, если это необходимо.
Спасибо! Используя .view, я смог это понять. Поскольку я использовал модуль nf-core, в канал были включены метаданные, которые я должен был учитывать. «map { meta, bam -> tuple( bam.name, bam ) }» позволяет завершить соединение, как и ожидалось, и теперь оно работает!
Огромное спасибо. В итоге мне нужно было просто отредактировать часть Channel.fromPath, поскольку каналы уже существовали. Однако теперь он по какой-то причине пропускает процесс, соответствующий help_me...