У меня есть функция Airflow, которая отправляет уведомление о сбое в группу Teams. В соответствии с новой политикой Microsoft мой старый вызов веб-перехватчика больше не будет работать через несколько месяцев, поэтому я хочу прямо сейчас поработать над адаптацией кода. Вот мой текущий код:
from airflow.models import Variable
import logging
import requests
def failed_task_notify_teams(context):
logging.info("Send notification to the Teams Group")
payload = {
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"title": "Airflow Task Error",
"summary": f"Task {context['task_instance_key_str']} : Failed",
"themeColor": "F44336",
"sections": [
{
"activityTitle": f"Task {context['task_instance_key_str']} : Failed",
"activitySubtitle": f"DAG: {context['dag'].dag_id}",
"facts": [
{
"name": "Date",
"value": context['ds']
},
{
"name": "Log URL",
"value": context['task_instance'].log_url
}
]
}
],
"potentialAction": [{
"@type": "OpenUri",
"name": "See Logs",
"targets": [{
"os": "default",
"uri": context['task_instance'].log_url
}]
}]
}
headers = {"content-type": "application/json"}
requests.post(Variable.get('teams_webhook_secret'), json=payload, headers=headers)
logging.info("Teams notification sent to the group!")
На мой взгляд, код должен выглядеть примерно так же, но сейчас он не работает с Microsoft 365 Workflow.
Посмотрев немного дальше, я смог найти решение. Код выглядит практически так же, мне нужно только изменить «параметры» полезной нагрузки. Вот код, работающий с модификациями:
from airflow.models import Variable
import logging
import requests
def failed_task_notify_teams(context):
logging.info("Send notification to the Teams Group")
payload = {
"type": "message",
"attachments" : [{
"contentType": "application/vdn.microsoft.card.adaptive",
"contentUrl": None,
"content": {
"$schema": "http://adaptivecard.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.2",
"body": [
{
"type": "TextBlock",
"text": f"Task {context['task_instance_key_str']} : Failed",
"weight": "bolder",
"size": "medium",
"color": "attention",
},
{
"type": "TextBlock",
"text": f"DAG : {context['dag'].dag_id}",
"spacing": "none"
},
{
"type": "FactSet",
"facts": [
{
"title": "Date :",
"value": context['ds']
},
{
"title": "Log URL :",
"value": f"[Click here]({context['task_instance'].log_url})"
}
]
}
],
"actions": [
{
"type": "Action.OpenUrl",
"title": "See logs",
"url": context['task_instance'].log_url
}
]
}
}]
}
headers = {"content-type": "application/json"}
response = requests.post(Variable.get('teams_azure_webook_secret'), json=payload, headers=headers)
if response.status_code == 200:
logging.info("Notification sent to Teams Group")
else:
logging.error(f"Failed to sent notification to Teams Group: {response.status_code} - {response.text}")