У меня есть кластер Azure Databricks, который обрабатывает различные таблицы, а затем в качестве последнего шага я помещаю эти таблицы в Azure SQL Server для использования некоторыми другими процессами. У меня есть ячейка в databricks, которая выглядит примерно так:
def generate_connection():
jdbcUsername = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlUserName")
jdbcPassword = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlPassword")
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
return connectionProperties
def generate_url():
jdbcHostname = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlHostName")
jdbcDatabase = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlDatabase")
jdbcPort = 1433
return "jdbc:sqlserver://{0}:{1};database = {2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
def persist_table(table, sql_table, mode):
jdbcUrl = generate_url();
connectionProperties = generate_connection()
table.write.jdbc(jdbcUrl, sql_table, properties=connectionProperties, mode=mode)
persist_table(spark.table("Sales.OpenOrders"), "Sales.OpenOrders", "overwrite")
persist_table(spark.table("Sales.Orders"), "Sales.Orders", "overwrite")
Это работает, как и ожидалось. Проблема, с которой я сталкиваюсь, заключается в том, что таблица «Заказы» очень велика, и только небольшая часть строк может меняться каждый день, поэтому я хочу изменить режим перезаписи на режим добавления и изменить фрейм данных. всю таблицу только к тем строкам, которые могли измениться. Все это я знаю, как сделать достаточно легко, но я хочу запустить простую инструкцию SQL для базы данных Azure SQL, чтобы удалить строки, которые уже будут там, чтобы они, возможно, изменили строки, были вставлены обратно .
Я хочу запустить оператор SQL для базы данных Azure SQL, например
Delete From Sales.Orders Where CreateDate >= '01/01/2019'
Нет, я не хочу запускать этот sql для кластера, я хочу запускать его для удаленного сервера sql
Вам нужно использовать библиотеку pyodbc. Вы можете подключиться и использовать операторы sql.
import pyodbc
conn = pyodbc.connect( 'DRIVER = {ODBC Driver 17 for SQL Server};'
'SERVER=mydatabe.database.azure.net;'
'DATABASE=AdventureWorks;UID=jonnyFast;'
'PWD=MyPassword')
# Example doing a simple execute
conn.execute('INSERT INTO Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))
К сожалению, заставить его работать с блоками данных довольно сложно. Некоторое время назад я написал сообщение в блоге, которое должно помочь. https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark
привет, приятель, попробовал это, но получил ошибку file not found
- не знаю, в чем проблема, поскольку я могу подключиться к своему дБ с помощью JDBC?
Хочу поделиться своими находками.
1) pyodbc - я обратился в службу технической поддержки Microsoft и получил следующий ответ:
####========================================================
### cell 1: install pyodbc
####========================================================
%sh
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
apt-get update
ACCEPT_EULA=Y apt-get install msodbcsql17
apt-get -y install unixodbc-dev
sudo apt-get install python3-pip -y
pip3 install --upgrade pyodbc
####========================================================
### cell 2: connect
####========================================================
import pyodbc
conn = pyodbc.connect( 'DRIVER = {ODBC Driver 17 for SQL Server};'
'SERVER=xxxxx.database.windows.net;'
'DATABASE=xxxxx;UID=xxxxx;'
'PWD=xxxxx')
####========================================================
### cell 3: create table
####========================================================
conn.execute('CREATE TABLE dbo.Bob (Bob1 VARCHAR(30), Bob2 VARCHAR(30))')
####========================================================
### cell 4: insert into table
####========================================================
conn.execute('INSERT INTO dbo.Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))
Примечания: а) Время выполнения: 6.2 (Scala 2.11, Spark 2.4.4) б) Эта версия среды выполнения поддерживает только Python 3.
2) Соединитель Spark для базы данных SQL Azure и SQL Server - Пока гуглил решение по установке pyodbc, нашел вот это. Этот вариант мне больше нравится и я его попробую.
Вы должны иметь возможность запускать оператор SQL в ячейке с помощью магии %sql.