Предположим, у меня есть каналы nextflow:
Channel.fromFilePairs( "test/read*_R{1,2}.fa" )
.set{ reads }
reads.view()
Channel.fromPath(['test/lib_R1.fa','test/lib_R2.fa'] )
.set{ libs }
libs.view()
Что приводит к:
// reads channel
[read_b, [<path>/test/read_b_R1.fa, <path>/test/read_b_R2.fa]]
[read_a, [<path>/test/read_a_R1.fa, <path>/test/read_a_R2.fa]]
// libs channel
<path>/test/lib_R1.fa
<path>/test/lib_R2.fa
Как мне запустить процесс foo
, который выполняет соответствующую пару чтения-библиотеки, где одна и та же библиотека используется для всех пар чтения? Итак, в основном я хочу выполнить foo
4 раза:
foo(test/read_b_R1.fa, test/lib_R1.fa)
foo(test/read_b_R2.fa, test/lib_R2.fa)
foo(test/read_a_R1.fa, test/lib_R1.fa)
foo(test/read_a_R2.fa, test/lib_R2.fa)
process FOO {
debug true
input:
tuple val(files), path(lib)
output:
stdout
script:
file_a = files[0]
file_b = files[1]
"""
echo $file_a with $lib
echo $file_b with $lib
"""
}
workflow {
Channel
.of(['read_b', [file('/test/read_b_R1.fa'), file('/test/read_b_R2.fa')]],
['read_a', [file('/test/read_a_R1.fa'), file('/test/read_a_R2.fa')]]
)
.set { reads }
Channel
.of(file('/test/lib_R1.fa'),
file('/test/lib_R2.fa')
)
.set { libs }
reads
.map { sample, files -> files }
.flatten()
.map { file -> [file.name.split('_')[2].split('.fa')[0], file]}
.groupTuple()
.set { reads }
libs
.map { file -> [file.name.split('_')[1].split('.fa')[0], file]}
.set { libs }
reads
.join(libs)
.map { Rx, path, lib -> [path, lib] }
| FOO
}
Вывод скрипта выше:
N E X T F L O W ~ version 22.10.4
Launching `ex.nf` [elegant_wiles] DSL2 - revision: 00862286fd
executor > local (2)
[58/9b3cf1] process > FOO (2) [100%] 2 of 2 ✔
/test/read_b_R1.fa with lib_R1.fa
/test/read_a_R1.fa with lib_R1.fa
/test/read_b_R2.fa with lib_R2.fa
/test/read_a_R2.fa with lib_R2.fa
РЕДАКТИРОВАТЬ в ответ на комментарий ниже. Если вы хотите, чтобы процесс запускался один раз для каждого элемента в канале, проверьте измененную версию ниже:
process FOO {
debug true
input:
tuple val(file), path(lib)
output:
stdout
script:
"""
echo $file with $lib
"""
}
workflow {
Channel
.of(['read_b', [file('/test/read_b_R1.fa'), file('/test/read_b_R2.fa')]],
['read_a', [file('/test/read_a_R1.fa'), file('/test/read_a_R2.fa')]]
)
.set { reads }
Channel
.of(file('/test/lib_R1.fa'),
file('/test/lib_R2.fa')
)
.set { libs }
reads
.map { sample, files -> files }
.flatten()
.map { file -> [file.name.split('_')[2].split('.fa')[0], file]}
.groupTuple()
.set { reads }
libs
.map { file -> [file.name.split('_')[1].split('.fa')[0], file]}
.set { libs }
reads
.join(libs)
.map { Rx, path, lib -> [path, lib] }
.map { x, y -> [[x[0], y], [x[1], y]] }
.flatMap()
| FOO
}
Выход:
N E X T F L O W ~ version 22.10.4
Launching `ex.nf` [sharp_ekeblad] DSL2 - revision: 1412af632e
executor > local (4)
[a0/416f59] process > FOO (1) [100%] 4 of 4 ✔
/test/read_b_R2.fa with lib_R2.fa
/test/read_a_R2.fa with lib_R2.fa
/test/read_a_R1.fa with lib_R1.fa
/test/read_b_R1.fa with lib_R1.fa
только что узнал, кажется, я могу использовать транспонирование после присоединения
Он дважды вызывает foo
, потому что в предоставленном вами канале есть два элемента. Если бы на вашем канале было 4 элемента (4 файла вместо тех 2 кортежей), foo
вызывался бы 4 раза. Вы можете использовать операторы для разделения канала по-разному, если вам нужна одна задача для каждого файла.
Если вы хотите использовать одну и ту же библиотеку для всех пар чтения, вам действительно нужен канал значений , который можно читать неограниченное количество раз без использования. Обратите внимание, что канал значений неявно создается процессом, когда он вызывается с простым значением. Это действительно может быть список файлов, но похоже, что вам нужен только один из них, соответствующий каждому чтению R1 или R2. Я думаю, что самое простое решение здесь — просто включить ваш процесс, используя псевдоним , чтобы вы могли передавать необходимые каналы/файлы без особых усилий:
params.reads = 'test/read*_R{1,2}.fa'
include { foo as foo_r1 } from './modules/foo.nf'
include { foo as foo_r2 } from './modules/foo.nf'
workflow {
Channel
.fromFilePairs( params.reads )
.multiMap { sample, reads ->
def (r1, r2) = reads
read1:
tuple(sample, r1)
read2:
tuple(sample, r2)
}
.set { reads }
lib_r1 = file('test/lib_R1.fa')
lib_r2 = file('test/lib_R2.fa')
foo_r1(reads.read1, lib_r1)
foo_r2(reads.read2, lib_r2)
}
Содержимое ./modules/foo.nf
:
process foo {
debug true
input:
tuple val(sample), path(fasta)
path(lib)
"""
echo $sample, $fasta, $lib
"""
}
Полученные результаты:
$ nextflow run main.nf
N E X T F L O W ~ version 22.10.0
Launching `main.nf` [confident_boyd] DSL2 - revision: 8c81e2d743
executor > local (6)
[a8/e8a752] process > foo_r1 (2) [100%] 3 of 3 ✔
[75/2b32f5] process > foo_r2 (3) [100%] 3 of 3 ✔
readC, readC_R2.fa, lib_R2.fa
readA, readA_R1.fa, lib_R1.fa
readC, readC_R1.fa, lib_R1.fa
readB, readB_R2.fa, lib_R2.fa
readA, readA_R2.fa, lib_R2.fa
readB, readB_R1.fa, lib_R1.fa
поэтому кажется, что foo вызывается дважды, один раз для R1 и один раз для R2. Как сделать так, чтобы вместо этого foo вызывался 4 раза? например Я хочу иметь отдельные вызовы для foo(read_a_R1.fa, lib_R1.fa) и foo(read_b_R1.fa, lib_R1.fa)