У меня есть два df -
first содержит начальную и конечную позицию, например
id start end
1 4 8
2 2 6
2 5 7
И второй df с id-строкой
id string
1 my beautiful data
2 lorem ipsum
Присоединитесь «как есть», а затем обрежьте строку до нужных позиций — не хватает памяти. В первом df около 1кк записей, во втором - около 10 записей, но каждая строка весит около 100Мб.
Итак, я думаю, можно ли разрезать строку во время соединения, и тогда каждая строка будет состоять из нескольких символов - и это приемлемый размер.
Результат должен быть таким:
идентификатор начала конца последовательности
1 | 4 | 8 | beaut
2 | 2 | 6 | orem i
2 | 5 | 7 | m i
Большое спасибо!
УПД:
В настоящее время я просто присоединяюсь 1 к 1, а затем получаю подстроку типа
df1 = df1.join(df2, "id")
df1.withColumn("substring" df1['string'].substr(df1.start, df1.end))).show()
Но соединение с OOM не удается, вот оптимизированный физический план (я пробовал трансляцию, перераспределение), но в любом случае такой размер для промежуточного результата неприемлем. Отдельно этот dfs отображается и обрабатывается без ошибок:
exonsDF = exons_raw.join(dnaDF, "seq_region_id").explain("cost")
== Optimized Logical Plan ==
Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148], Statistics(sizeInBytes=12.3 PiB)
+- Join Inner, (seq_region_id#63L = cast(seq_region_id#147 as bigint)), Statistics(sizeInBytes=14.4 PiB)
:- Repartition 10, true, Statistics(sizeInBytes=13.6 MiB)
: +- Filter isnotnull(seq_region_id#63L), Statistics(sizeInBytes=13.6 MiB)
: +- Relation [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] orc, Statistics(sizeInBytes=13.6 MiB)
+- Repartition 10000, true, Statistics(sizeInBytes=1083.8 MiB)
+- Filter isnotnull(seq_region_id#147), Statistics(sizeInBytes=1083.8 MiB)
+- Relation [seq_region_id#147,sequence#148] orc, Statistics(sizeInBytes=1083.8 MiB)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148]
+- SortMergeJoin [seq_region_id#63L], [cast(seq_region_id#147 as bigint)], Inner
:- Sort [seq_region_id#63L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(seq_region_id#63L, 200), ENSURE_REQUIREMENTS, [plan_id=335]
: +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=330]
: +- Filter isnotnull(seq_region_id#63L)
: +- FileScan orc [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] Batched: true, DataFilters: [isnotnull(seq_region_id#63L)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/19tmp/exons], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<exon_id:bigint,seq_region_id:bigint,seq_region_start:bigint,seq_region_end:bigint,seq_regi...
+- Sort [cast(seq_region_id#147 as bigint) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(seq_region_id#147 as bigint), 200), ENSURE_REQUIREMENTS, [plan_id=336]
+- Exchange RoundRobinPartitioning(10000), REPARTITION_BY_NUM, [plan_id=331]
+- Filter isnotnull(seq_region_id#147)
+- FileScan orc [seq_region_id#147,sequence#148] Batched: true, DataFilters: [isnotnull(seq_region_id#147)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/6tmp/sequence], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<seq_region_id:string,sequence:string>
UPD2 - с новым планом
== Optimized Logical Plan ==
Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148], Statistics(sizeInBytes=12.3 PiB)
+- Join Inner, (seq_region_id#63L = cast(seq_region_id#147 as bigint)), Statistics(sizeInBytes=14.4 PiB)
:- RepartitionByExpression [seq_region_id#63L], Statistics(sizeInBytes=13.6 MiB)
: +- Filter isnotnull(seq_region_id#63L), Statistics(sizeInBytes=13.6 MiB)
: +- Relation [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] orc, Statistics(sizeInBytes=13.6 MiB)
+- RepartitionByExpression [cast(seq_region_id#147 as bigint)], Statistics(sizeInBytes=1083.8 MiB)
+- Filter isnotnull(seq_region_id#147), Statistics(sizeInBytes=1083.8 MiB)
+- Relation [seq_region_id#147,sequence#148] orc, Statistics(sizeInBytes=1083.8 MiB)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148]
+- SortMergeJoin [seq_region_id#63L], [cast(seq_region_id#147 as bigint)], Inner
:- Sort [seq_region_id#63L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(seq_region_id#63L, 200), REPARTITION_BY_COL, [plan_id=330]
: +- Filter isnotnull(seq_region_id#63L)
: +- FileScan orc [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] Batched: true, DataFilters: [isnotnull(seq_region_id#63L)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/0tmp/exons], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<exon_id:bigint,seq_region_id:bigint,seq_region_start:bigint,seq_region_end:bigint,seq_regi...
+- Sort [cast(seq_region_id#147 as bigint) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(seq_region_id#147 as bigint), 200), REPARTITION_BY_COL, [plan_id=331]
+- Filter isnotnull(seq_region_id#147)
+- FileScan orc [seq_region_id#147,sequence#148] Batched: true, DataFilters: [isnotnull(seq_region_id#147)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/0tmp/sequence], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<seq_region_id:string,sequence:string>
@Chris 40 г памяти — использование одного диска. извините, первый df равен 1кк, а не 100к. Поэтому я не думаю, что это хорошая идея объединять каждую строку длиной около 100 миллионов, если нам действительно нужно всего несколько символов из нее.
это полезно, покажите, пожалуйста, sql/код, который вы используете
@Крис, сейчас у меня нет хорошей идеи, просто df1 = df1.jon(df2, "id") df1.with columns("substring" df1['string'].substr(df1.start, df1.end))). show() и происходит сбой при соединении с Java Heap OOM
какая версия Spark и можете ли вы опубликовать план запроса?
@Chris, пожалуйста, посмотрите UPD, о котором идет речь, он оценивает размер соединения в Ptb, что ужасно
какая версия спарка используется? Я предполагаю, что это версия 3.5 без какой-либо дополнительной информации, но можете ли вы также подтвердить, используете ли вы reparition(100) и repartition(1000) в своем коде?
@Крис, да на все. Я использую перераспределение - и пробовал много вариантов - с перераспределением, без, разными значениями или перераспределением от 4 до 1000, широковещательно... это не имело никакого значения
Вместо перераспределения по номеру попытайтесь сделать это через поле seq_region_id:
exon = exon.repartition(exon['seq_region_id'])
sequence = sequence.repartition(sequence['seq_region_id'].cast("bigint"))
Актерский состав необходим, так как
приведение (seq_region_id#147 как bigint)
требуется из последовательности, которая имеет строку, экзон имеет bigint на диске. Возможно, это сделано намеренно, но, учитывая имена и этот пример, это выглядит как ошибка, это тоже ускорит исправление на диске.
Учитывая использование версии 3.5, сортированное объединение слиянием определенно является самым быстрым и эффективным способом справиться с этим, и Spark выберет его за вас.
Наконец, не забудьте удалить «последовательность» после операции подстроки. Это позволит искре узнать, что это не требуется в конечном выводе, а также есть вероятность того, что будет меньше использования памяти.
Большое спасибо! Я еще не пробовала - но это все лучшее, что можно сделать, это точно. Отвечу здесь и после попытки
На данный момент - это не дает сбоя, который обычно случался через 5 минут, но все еще работал (например, 8 часов уже на 30 ГБ памяти для драйвера, одиночный режим). Надеюсь, все будет в полном порядке.
не могли бы вы добавить новый план запроса после этого изменения? Хотя это может быть очень хорошо, вы также можете попробовать использовать group by (для seq_region_id и последовательности) и Collect_set для смещений, а затем использовать array_transform.
Все еще не закончил, был случайно прерван, и, пожалуйста, посмотрите обновление с новым планом - у меня в Pit он выглядит так же, - но определенно работает по-другому.
А также объяснение() не включает drop("sequence"), только join
вы используете «только присоединение» или после сброса? В любом случае стоит попробовать группу по подходу.
Пока ждал - я применил другой подход - в цикле: прочитайте одну последовательность из базы данных, отфильтруйте экзоны с этим seq_id - и примените udf() к экзонам, тогда как последовательность - это просто транслируемая переменная (не искровая), передаваемая в этот udf . В конце итерации — объединение группы экзонов с результатом. Закончил весь пробег примерно за 10 минут. Да, этот цикл не является параллельным и вечно одноузловым, но скорость непобедима.
пожалуйста, покажите, что вы сейчас используете, и размер памяти машины.