Большая часть кода написана в PySpark для выполнения на Databricks.
Я оцениваю SnowFlake с его способностью запускать Python с помощью Snowpark.
Может ли кто-нибудь сообщить мне, как я могу реорганизовать следующую функцию PySpark в Snowpark?
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def checkHiveTableExists(tableName,stageName = "base"):
try:
spark.sql(f"describe {stageName}{tableName}")
return True
except Exception as e:
print(f"An error was thrown, <<{e}>>")
return False
Моя попытка заключается в следующем:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
from snowflake.snowpark import session
def checkHiveTableExists(tableName,stageName = "base"):
try:
spark.sql(f"describe {stageName}{tableName}")
return True
except Exception as e:
print(f"An error was thrown, <<{e}>>")
return False
def main(session: snowpark.Session):
return checkHiveTableExists(False)
Но это не удалось.
Есть предположения?





Оригинальный код:
def checkHiveTableExists(tableName,stageName = "base"):
try:
spark.sql(f"describe {stageName}{tableName}")
return True
except Exception as e:
print(f"An error was thrown, <<{e}>>")
return False
Подход «копировать-вставить» в Snowflake не сработает.
sparkDESC ... => DESC TABLE ...Сам исходный код какой-то хитрый, потому что для существования объекта используется запрос для описания объекта.
Во-вторых: describe {stageName}{tableName} интерполяция строк делает его подверженным SQL-инъекциям — кто-нибудь может попытаться назвать это как checkHivetableExists("someName'; DROP TABLE ...")
При переводе кода основное внимание следует уделять поведению.
Лично я бы выполнил проверку метаданных INFORMATION_SCHEMA.TABLES и посмотрел, существует ли строка — пользователь, вызывающий этот код, должен иметь разрешение на доступ к этой таблице.
В любом случае, пытаясь быть как можно ближе к исходному коду:
import snowflake.snowpark as snowpark
def checkTableExists(session: snowpark.Session, tableName, schemaName = "BASE"):
try:
session.sql(f"DESC TABLE IDENTIFIER('{schemaName}.{tableName}')").collect()
return True
except Exception as e:
#print(f"An error was thrown, <<{e}>>")
return False
def main(session: snowpark.Session):
return checkTableExists(session, 'TEST')
@Patterson Потому что он возвращает на рабочий лист логическое значение, а не фрейм данных. Ошибка не требует пояснений Settings -> Return Type -> String
Привет @Lukasz, спасибо за обращение. Я попробовал ваш код, но получил следующую ошибку
Handler did not return a Snowpark DataFrame. Return a DataFrame from the handler or change the return type for the Python worksheet.