У меня есть группа обеспечения доступности баз данных, которая проверяет, был ли файл загружен в Azure DataLake в определенном каталоге. Если это так, он разрешает запуск других DAG.
Я думал об использовании FileSensor, но полагаю, что параметра fsconnid недостаточно для аутентификации в DataLake.
В Поставщик Azure нет AzureDataLakeSensor
, но вы можете легко реализовать его, поскольку AzureDataLakeHook
имеет функцию check_for_file
, поэтому все, что нужно, — это обернуть эту функцию классом Sensor, реализующим функцию poke()
BaseSensorOperator
. Таким образом, вы можете напрямую использовать Подключение к озеру данных Microsoft Azure.
Я не проверял это, но это должно работать:
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator
class MyAzureDataLakeSensor(BaseSensorOperator):
"""
Sense for files in Azure Data Lake
:param path: The Azure Data Lake path to find the objects. Supports glob
strings (templated)
:param azure_data_lake_conn_id: The Azure Data Lake conn
"""
template_fields: Sequence[str] = ('path',)
ui_color = '#901dd2'
def __init__(
self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
) -> None:
super().__init__(**kwargs)
self.path = path
self.azure_data_lake_conn_id = azure_data_lake_conn_id
def poke(self, context: "Context") -> bool:
hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
self.log.info('Poking for file in path: %s', self.path)
try:
hook.check_for_file(file_path=self.path)
return True
except FileNotFoundError:
pass
return False
Пример использования:
MyAzureDataLakeSensor(
task_id='adls_sense',
path='folder/file.csv',
azure_data_lake_conn_id='azure_data_lake_default',
mode='reschedule'
)
Поскольку OP спросил о FileSensor, я предположил, что вариант использования — дождаться файла, поэтому Sensor — это способ сделать это. Если вариант использования состоит в том, чтобы просто проверить файл один раз, то датчик не нужен, и его можно упростить до простого пользовательского оператора.
Я пытаюсь проверить это решение. Я сталкиваюсь с этой ошибкой: stackoverflow.com/questions/72144463/…
Нет, я использовал другое решение
Прежде всего, взгляните на официальные операторы Microsoft для Airflow.
Мы видим, что для Хранилище Azure DataLake есть специальные операторы, к сожалению, на данный момент доступны только ADLSDeleteOperator
.
Этот ADLSDeleteOperator
использует Азуредаталейкхук, который вы должны повторно использовать в своем собственном пользовательском операторе для проверки наличия файла.
Мой совет для вас — создать дочерний класс CheckOperator, используя хук ADLS, чтобы проверить, существует ли файл, указанный во входных данных, с функцией check_for_file
хука.
ОБНОВЛЕНИЕ: как указано в комментариях, CheckOperator, по-видимому, привязан к SQL-запросам и устарел. Лучше всего использовать собственный настраиваемый датчик или настраиваемый оператор.
CheckOperator
устарел. Теперь он называется SQLCheckOperator
, чтобы лучше отражать то, что он делает. Озеро данных Azure не связано с SQL, поэтому создание подклассов SQLCheckOperator
не имеет смысла. Вы можете создать собственный оператор, просто не создавая подклассы того, о котором вы упомянули - вероятно, достаточно подкласса BaseOperator
.
У меня были серьезные проблемы с использованием предложенного API. Поэтому я внедрил Microsoft API в Airflow. Это работало нормально. Все, что вам нужно сделать, это использовать этот оператор и передать account_url и access_token.
from azure.storage.filedatalake import DataLakeServiceClient
from airflow.sensors.base import BaseSensorOperator
class AzureDataLakeSensor(BaseSensorOperator):
def __init__(self, path, filename, account_url, access_token, **kwargs):
super().__init__(**kwargs)
self._client = DataLakeServiceClient(
account_url=account_url,
credential=access_token
)
self.path = path
self.filename = filename
def poke(self, context):
container = self._client.get_file_system_client(file_system = "raw")
dir_client = container.get_directory_client(self.path)
file = dir_client.get_file_client(self.filename)
return file.exists()
Почти тот же ответ, ты опередил меня вовремя. Разница только в том, что Sensor vs CheckOperator.