у меня есть код:
list_files = glob.glob("/t/main_folder/*/file_*[0-9].csv")
test = sorted(list_files, key = lambda x:x[-5:])
поэтому этот код помог мне найти файлы, с которыми мне нужно работать. Я нашел 5 файлов csv в разных папках. следующий шаг - я использую код ниже, чтобы работать с каждым файлом, который я нашел, мне нужно использовать полное внешнее соединение для каждого файла, сначала для main_folder/folder1/file1.csv, затем для main_folder/folder2/file2 и т. д. и т. д. до последнего файла, который был найден один за другим. вот почему мне нужна петля
df_deltas = spark.read.format("csv").schema(schema).option("header","true")\
.option("delimiter",";").load(test)
df_mirror = spark.read.format("csv").schema(schema).option("header","true")\
.option("delimiter",",").load("/t/org_file.csv").cache()
df_deltas.createOrReplaceTempView("deltas")
df_mirror.createOrReplaceTempView("mirror")
df_mir2=spark.sql("""select
coalesce (deltas.DATA_ACTUAL_DATE,mirror.DATA_ACTUAL_DATE) as DATA_ACTUAL_DATE,
coalesce (deltas.DATA_ACTUAL_END_DATE,mirror.DATA_ACTUAL_END_DATE) as DATA_ACTUAL_END_DATE,
coalesce (deltas.ACCOUNT_RK,mirror.ACCOUNT_RK) as ACCOUNT_RK,
coalesce (deltas.ACCOUNT_NUMBER,mirror.ACCOUNT_NUMBER) as ACCOUNT_NUMBER,
coalesce (deltas.CHAR_TYPE,mirror.CHAR_TYPE) as CHAR_TYPE,
coalesce (deltas.CURRENCY_RK,mirror.CURRENCY_RK) as CURRENCY_RK,
coalesce (deltas.CURRENCY_CODE,mirror.CURRENCY_CODE) as CURRENCY_CODE,
coalesce (deltas.CLIENT_ID,mirror.CLIENT_ID) as CLIENT_ID,
coalesce (deltas.BRANCH_ID,mirror.BRANCH_ID) as BRANCH_ID,
coalesce (deltas.OPEN_IN_INTERNET,mirror.OPEN_IN_INTERNET) as OPEN_IN_INTERNET
from mirror
full outer join deltas on
deltas.ACCOUNT_RK=mirror.ACCOUNT_RK
""")
df_deltas = spark.read.format("csv").schema(schema).option("header","true")\
.option("delimiter",";").load(test)--HERE I'M USING MY CODE TO FILL THE .LOAD WITH FILES
как можно сделать цикл для первого найденного файла, потом для второго и так далее?
Вы можете использовать цикл for для этого,
for idx, file in enumerate(test):
globals()[f"df_{idx}"] = spark.read.format("csv").schema(schema).option("header","true").option("delimiter",";").load(file)
Это создаст DF в глобальном пространстве имен с именами df_0
для первого файла, df_1
для второго файла и так далее. Затем вы можете использовать этот DF, чтобы делать все, что хотите.
Хм, вы можете использовать время создания файла для достижения этого, например, у вас вчера было 3 файла, и вы их обработали, после этого сегодня у вас есть еще 2 файла, затем вы применяете фильтр, говорящий, если время создания больше, чем или равным сегодняшнему дню, тогда только вы сохраните его, иначе пропустите. Вы можете обратиться к этому, чтобы узнать время создания файла stackoverflow.com/a/39501288/11713502
но, например, если я использую этот скрипт, после того, как скрипт будет выполнен, я немедленно создам новую папку с новым файлом и снова запущу скрипт? это не будет работать, как я вижу? но если мой df уже находится в log.csv, он не будет использовать этот df в скрипте. Является ли это возможным?
Если имя файла не существует в столбце имя файла в log.csv, используйте его в цикле, иначе пропустите, возможно ли что-то подобное? :)
Я не могу понять ваш поток, как приходят файлы и как вы их обрабатываете и как вы сохраняете результаты? и какая частота файлов?
У меня есть файлы в папках, и мне нужно использовать соединения для всех файлов, которые существуют в пути, как я сделал с вашей помощью, но теперь, если я снова запущу скрипт, он сделает то же самое, но мне нужно, например, просто сообщение из лута, если все файлы были использованы лутом, что больше нет файлов для добавления в скрипт, или если я создам новый файл и начну цикл - цикл будет делать только созданный файл, а не все файлы снова. Поэтому я думаю, что я могу сделать как дополнительный файл csv, где я буду писать все используемые пути, и по этому файлу скрипт должен понимать, что я уже использовал этот файл, и его нужно пропустить.
Да, тогда вы можете использовать log.csv
, о котором вы говорили. После выполнения всей обработки вы можете сохранить имена обрабатываемых файлов в log.csv
и в следующий раз перед чтением проверить, присутствует ли имя файла в log.csv
это помогло мне, большое спасибо. может быть, вы знаете, как я могу избежать использования фреймов данных, которые уже использовались? например, если я использовал этот скрипт в первый раз, я заполнил свою таблицу данными из своих файлов, как я могу дать своему коду понять, что мне не нужно использовать, например, эти 3 файла, которые уже использовались, а только новые файлы, которые пришли например сегодня или вчера? может быть, мне нужно создать новый файл как log.csv и поместить туда все мои используемые кадры данных с именем файла? возможно ли это сделать?