Как сделать цикл в pyspark

у меня есть код:

list_files = glob.glob("/t/main_folder/*/file_*[0-9].csv")
test = sorted(list_files, key = lambda x:x[-5:])

поэтому этот код помог мне найти файлы, с которыми мне нужно работать. Я нашел 5 файлов csv в разных папках. следующий шаг - я использую код ниже, чтобы работать с каждым файлом, который я нашел, мне нужно использовать полное внешнее соединение для каждого файла, сначала для main_folder/folder1/file1.csv, затем для main_folder/folder2/file2 и т. д. и т. д. до последнего файла, который был найден один за другим. вот почему мне нужна петля

  
df_deltas = spark.read.format("csv").schema(schema).option("header","true")\
.option("delimiter",";").load(test)
df_mirror = spark.read.format("csv").schema(schema).option("header","true")\
.option("delimiter",",").load("/t/org_file.csv").cache()
df_deltas.createOrReplaceTempView("deltas")
df_mirror.createOrReplaceTempView("mirror")
df_mir2=spark.sql("""select 
coalesce (deltas.DATA_ACTUAL_DATE,mirror.DATA_ACTUAL_DATE) as DATA_ACTUAL_DATE,
coalesce (deltas.DATA_ACTUAL_END_DATE,mirror.DATA_ACTUAL_END_DATE) as DATA_ACTUAL_END_DATE,
coalesce (deltas.ACCOUNT_RK,mirror.ACCOUNT_RK) as ACCOUNT_RK,
coalesce (deltas.ACCOUNT_NUMBER,mirror.ACCOUNT_NUMBER) as ACCOUNT_NUMBER,
coalesce (deltas.CHAR_TYPE,mirror.CHAR_TYPE) as CHAR_TYPE,
coalesce (deltas.CURRENCY_RK,mirror.CURRENCY_RK) as CURRENCY_RK,
coalesce (deltas.CURRENCY_CODE,mirror.CURRENCY_CODE) as CURRENCY_CODE,
coalesce (deltas.CLIENT_ID,mirror.CLIENT_ID) as CLIENT_ID,
coalesce (deltas.BRANCH_ID,mirror.BRANCH_ID) as BRANCH_ID,
coalesce (deltas.OPEN_IN_INTERNET,mirror.OPEN_IN_INTERNET) as OPEN_IN_INTERNET
from mirror
full outer join deltas on
deltas.ACCOUNT_RK=mirror.ACCOUNT_RK

                """)

df_deltas = spark.read.format("csv").schema(schema).option("header","true")\
.option("delimiter",";").load(test)--HERE I'M USING MY CODE TO FILL THE .LOAD WITH FILES

как можно сделать цикл для первого найденного файла, потом для второго и так далее?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
68
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы можете использовать цикл for для этого,

for idx, file in enumerate(test):
    globals()[f"df_{idx}"] = spark.read.format("csv").schema(schema).option("header","true").option("delimiter",";").load(file)

Это создаст DF в глобальном пространстве имен с именами df_0 для первого файла, df_1 для второго файла и так далее. Затем вы можете использовать этот DF, чтобы делать все, что хотите.

это помогло мне, большое спасибо. может быть, вы знаете, как я могу избежать использования фреймов данных, которые уже использовались? например, если я использовал этот скрипт в первый раз, я заполнил свою таблицу данными из своих файлов, как я могу дать своему коду понять, что мне не нужно использовать, например, эти 3 файла, которые уже использовались, а только новые файлы, которые пришли например сегодня или вчера? может быть, мне нужно создать новый файл как log.csv и поместить туда все мои используемые кадры данных с именем файла? возможно ли это сделать?

nox8315 20.02.2023 09:04

Хм, вы можете использовать время создания файла для достижения этого, например, у вас вчера было 3 файла, и вы их обработали, после этого сегодня у вас есть еще 2 файла, затем вы применяете фильтр, говорящий, если время создания больше, чем или равным сегодняшнему дню, тогда только вы сохраните его, иначе пропустите. Вы можете обратиться к этому, чтобы узнать время создания файла stackoverflow.com/a/39501288/11713502

Tushar Patil 20.02.2023 09:13

но, например, если я использую этот скрипт, после того, как скрипт будет выполнен, я немедленно создам новую папку с новым файлом и снова запущу скрипт? это не будет работать, как я вижу? но если мой df уже находится в log.csv, он не будет использовать этот df в скрипте. Является ли это возможным?

nox8315 20.02.2023 09:27

Если имя файла не существует в столбце имя файла в log.csv, используйте его в цикле, иначе пропустите, возможно ли что-то подобное? :)

nox8315 20.02.2023 10:41

Я не могу понять ваш поток, как приходят файлы и как вы их обрабатываете и как вы сохраняете результаты? и какая частота файлов?

Tushar Patil 20.02.2023 11:00

У меня есть файлы в папках, и мне нужно использовать соединения для всех файлов, которые существуют в пути, как я сделал с вашей помощью, но теперь, если я снова запущу скрипт, он сделает то же самое, но мне нужно, например, просто сообщение из лута, если все файлы были использованы лутом, что больше нет файлов для добавления в скрипт, или если я создам новый файл и начну цикл - цикл будет делать только созданный файл, а не все файлы снова. Поэтому я думаю, что я могу сделать как дополнительный файл csv, где я буду писать все используемые пути, и по этому файлу скрипт должен понимать, что я уже использовал этот файл, и его нужно пропустить.

nox8315 20.02.2023 12:24

Да, тогда вы можете использовать log.csv, о котором вы говорили. После выполнения всей обработки вы можете сохранить имена обрабатываемых файлов в log.csv и в следующий раз перед чтением проверить, присутствует ли имя файла в log.csv

Tushar Patil 20.02.2023 13:54

Другие вопросы по теме