Я буквально новичок в разработке данных, где я использую Python, PySpark и Pandas для создания фрейма данных, и с тех пор я очень долго был заблокирован и не мог вникнуть в это. Это простая проблема, но я застрял здесь.
Вот мой фрагмент кода, который работает нормально (при условии, что существует только один первичный ключ) без каких-либо итераций цикла и в конце генерирует фрейм данных.
primary_keys.append(primary_key)
primary_keys = [primary_key]
df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value})
# Define the schema for the Spark DataFrame
schema = T.StructType([
T.StructField("primary_key", T.IntegerType(), True), # Integer primary key
T.StructField(mapped_column[0], T.StringType(), True) # Integer primary key
])
self.logger.info("$$$$$$$$$$$$$$$ Creating DF $$$$$$$$$$$$$$$$$$$$$")
# Create the Spark DataFrame from the Pandas DataFrame
spark_df = spark.createDataFrame(df, schema)
# (Optional) Verify the Spark DataFrame
spark_df.printSchema()
spark_df.show()
Однако мое требование — получить это primary_keys из цикла, в котором каждый первичный ключ генерируется с динамическим значением для каждой итерации. Я пытался сделать следующее, но он сохраняет только последний объект, и в итоге я получил те же значения (значения для первичного ключа и сопоставленного_столбца) в окончательном сгенерированном фрейме данных.
primary_keys = []
# Iterate over the list and access values directly
for row in column_values:
### some logic to generate the primary key from the loop iteration
primary_keys.append(primary_key)
df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value})
self.logger.info("$$$$$$$$$$$$$$$ For Loop Completed $$$$$$$$$$$$$$$$$$$$$")
# Define the schema for the Spark DataFrame
schema = T.StructType([
T.StructField("primary_key", T.IntegerType(), True), # Integer primary key
T.StructField(mapped_column[0], T.StringType(), True) # Integer primary key
])
self.logger.info("$$$$$$$$$$$$$$$ Creating DF $$$$$$$$$$$$$$$$$$$$$")
# Create the Spark DataFrame from the Pandas DataFrame
spark_df = spark.createDataFrame(df, schema)
# (Optional) Verify the Spark DataFrame
spark_df.printSchema()
spark_df.show()
Я подозреваю, что проблема здесь, в объекте df в этом операторе df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value}), поскольку он заменяет его, а не добавляет к существующему.
Я был бы очень признателен, если бы кто-нибудь мог мне помочь здесь, спасибо
Я думаю, вам нужно удалить отступ в строке df = pd.DataFrame...? Вы воссоздаете фрейм данных после каждой итерации.






Ваше подозрение верно. Проблема заключается в переопределении DataFrame df на каждой итерации цикла. Следовательно, сохраняются только значения последней итерации. Чтобы обеспечить включение данных из каждой итерации, вам следует добавить данные в DataFrame, а не переопределять их. Вот как можно решить эту проблему:
# Initialize an empty list to store dictionaries
data = []
# Iterate over the list and access values directly
for row in column_values:
# Generate primary key and value dynamically
primary_key = generate_primary_key() # replace generate_primary_key() with your logic
value = generate_value() # replace generate_value() with your logic
# Append data to the list as a dictionary
data.append({'primary_key': primary_key, mapped_column[0]: value})
# Create a DataFrame from the list of dictionaries
df = pd.DataFrame(data)
# Define the schema for the Spark DataFrame
schema = T.StructType([
T.StructField("primary_key", T.IntegerType(), True), # Integer primary key
T.StructField(mapped_column[0], T.StringType(), True) # Integer primary key
])
# Create the Spark DataFrame from the Pandas DataFrame
spark_df = spark.createDataFrame(df, schema)
# (Optional) Verify the Spark DataFrame
spark_df.printSchema()
spark_df.show()
Это гарантирует, что все данные каждой итерации сохраняются в DataFrame.
каков ваш ожидаемый ввод/вывод?