Я борюсь с проблемой, которая, вероятно, имеет очень простое решение. В моем (dsl2) рабочем процессе nextflow у меня есть несколько процессов, которые выводят кортеж файла и значение, которое указывает группу входного элемента. Затем у меня есть последний процесс, который для каждой группы собирает все сгенерированные файлы и работает с ними. Например:
workflow {
generatePDF(input_channel)
generateCSV(input_channel)
generateTXT(input_channel)
mergeprocess( /*grouped input files from previous 3 processes */ )
}
И, как я уже упоминал, вывод для каждого процесса generate*
tuple file(output_${samplename}.${extension}) val(${group})
Например, если у меня есть образцы 1, 2 и 3, принадлежащие к группе A, 4 к группе B и 5 и 6 к группе C, я хотел бы передать в качестве входных данных для последнего процесса
output_sample1.pdf output_sample2.pdf output_sample3.pdf output_sample1.csv output_sample2.csv output_sample3.csv output_sample1.txt output_sample2.txt output_sample3.txt
output_sample4.pdf output_sample4.csv output_sample4.txt
output_sample5.pdf output_sample6.pdf output_sample5.csv output_sample6.csv output_sample5.txt output_sample6.txt
Я протестировал комбинацию collect(), groupTuple и даже join(), но ничего не дало мне нужный мне канал.
Спасибо за ваше время.
Одно решение, которое, возможно, является «базовым» решением, над которым вы, возможно, работали, состоит в том, чтобы смешивание выводить ваш процесс, а затем вызывать оператор группаКортеж, указывая индекс элемента, который будет использоваться в качестве ключа группировки, используя параметр by
:
generatePDF.out
| mix( generateCSV.out, generateTXT.out )
| groupTuple( by: 1 )
| view()
Однако это решение будет «ждать» завершения всех процессов генерации (generatePDF, generateCSV, generateTXT), прежде чем приступить к объединению Любые сгруппированных выходных данных. Это связано с тем, что группаКортеж в идеале необходимо знать количество элементов, которые должны содержать сгруппированные списки:
You should always specify the number of expected elements in each tuple using the
size
attribute to allow thegroupTuple
operator to stream the collected values as soon as possible.
Если все ваши процессы генерации определяют одну и ту же группу для данной выборки (что они, вероятно, и делают), вы должны иметь возможность отделить эту логику таким образом, чтобы вы могли заранее определить размер каждой группы. Затем вы можете создать специальный ключ группировки (т. е. с помощью функции группаКлюч) и использовать его для группировки выходных данных процесса. Следующий пример просто предопределяет группу из родительского каталога, но, надеюсь, должен быть применим к вашему варианту использования:
Во-первых, давайте создадим некоторые тестовые данные:
mkdir -p input_files/{A,B,C}
touch input_files/A/sample{1,2,3}.ext
touch input_files/B/sample4.ext
touch input_files/C/sample{5,6}.ext
Затем поместите следующее в файл с именем script.nf
:
nextflow.enable.dsl=2
process generatePDF {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.pdf")
"""
touch "output_${sample_name}.pdf"
"""
}
process generateCSV {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.csv")
"""
touch "output_${sample_name}.csv"
"""
}
process generateTXT {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.txt")
"""
touch "output_${sample_name}.txt"
"""
}
process merge_process {
tag { group }
echo true
input:
tuple val(group), path(input_files)
"""
echo "group: ${group}"
ls ${input_files}
"""
}
workflow {
Channel.fromPath( './input_files/*/*.ext' )
| map { infile -> tuple( infile.parent.name, infile ) }
| groupTuple()
| map { group, files -> tuple( groupKey(group, files.size()), files) }
| transpose()
| set { input_ch }
input_ch
| map { key, infile -> tuple( infile.baseName, infile ) }
| ( generatePDF & generateCSV & generateTXT )
| mix
| groupTuple( size: 3 )
| set { outputs_ch }
input_ch
| map { key, infile -> tuple( infile.baseName, key ) }
| join( outputs_ch )
| map { sample, key, files -> tuple( key, files ) }
| groupTuple()
| map { key, grp_files -> tuple( key.toString(), grp_files.flatten() ) }
| merge_process
}
Полученные результаты:
$ nextflow run script.nf
N E X T F L O W ~ version 20.10.0
Launching `script.nf` [irreverent_lavoisier] - revision: aba248d32e
executor > local (21)
[5d/5cd70d] process > generatePDF (1) [100%] 6 of 6 ✔
[4c/c58256] process > generateCSV (2) [100%] 6 of 6 ✔
[d2/93402c] process > generateTXT (5) [100%] 6 of 6 ✔
[9b/04ea07] process > merge_process (C) [100%] 3 of 3 ✔
group: B
output_sample4.csv
output_sample4.pdf
output_sample4.txt
group: A
output_sample1.csv
output_sample1.pdf
output_sample1.txt
output_sample2.csv
output_sample2.pdf
output_sample2.txt
output_sample3.csv
output_sample3.pdf
output_sample3.txt
group: C
output_sample5.csv
output_sample5.pdf
output_sample5.txt
output_sample6.csv
output_sample6.pdf
output_sample6.txt
Я действительно не против дождаться завершения всех процессов перед запуском последнего, поэтому первое решение кажется наиболее простым. Но я определенно хочу протестировать и второе, хотя в моем случае я не могу воспользоваться идеей «отдельных входных папок», я вижу, что не так уж сложно найти другую отправную точку для использования нисходящего подхода. Спасибо вам за помощь!