Я применяю словари на двух датафреймах dask, а затем объединяю их между собой — это делается без compute().
Позже я использую to_csv, которая является единственной точкой, где вычисляются мои кадры данных.
Я хочу иметь возможность обнаруживать KeyErrors и вести их журнал - отдельные журналы для двух фреймов данных.
Есть ли способ, какой фрейм данных в настоящее время вычисляется?
Суть моего кода
def convert(identifier):
try:
return alias_dict[str(identifier)]
except KeyError as err:
if current_df_converting == 'pileup':
conversion_error_handler(err.args[0], 'pileup_log')
elif current_df_converting == 'lists':
conversion_error_handler(err.args[0], 'lists_log')
return identifier
pileup_df = dd.read_csv(pileup, sep='\t', header=None, quoting=csv.QUOTE_NONE, encoding='utf-8')
lists_df = dd.read_csv(lists, sep='\t', header=None, quoting=csv.QUOTE_NONE, encoding='utf-8')
pileup_df['identifier'] = pileup_df.identifier.map(convert, meta=('identifier', str))
lists_df['identifier'] = lists_df.identifier.map(convert, meta=('identifier', str))
intersection_df = pileup_df.merge(lists_df, on=['identifier', 'position'])
dd.to_csv(intersection_df, output, header=None, index=None, single_file=True, sep='\t')






Вы можете использовать partial из functools
from functools import partial
def convert(identifier, current_df_converting):
...
conv1 = partial(convert, current_df_converting='pileup')
conv2 = partial(convert, current_df_converting='lists')
...
pileup_df['identifier'] = pileup_df.identifier.map(conv1, meta=('identifier', str))
lists_df['identifier'] = lists_df.identifier.map(conv2, meta=('identifier', str))
...