Недавно я отключил устаревший API Cloud Messaging и переключился на новый API Firebase Cloud Messaging (V1), как показано на снимке экрана ниже:
Ранее я использовал следующий код, чтобы подписаться на тему, а затем отправлять сообщения в эту тему, и это работало хорошо:
...
@classmethod
def subscribe_to_topic(cls, device_id, topics: list):
success = []
for topic in topics:
try:
url = f'https://iid.googleapis.com/iid/v1/{device_id}/rel/topics/{topic}'
headers = _get_header()
result = requests.post(url, headers=headers)
success.append(True) if result.status_code == 200 else success.append(False)
logger.info(f'subscribe user <{device_id}> in topic <{topic}> with status code: {result.status_code}')
except Exception as e:
logger.error(f'{e}')
return all(success)
После перехода на новый API я попытался обновить свой код следующим образом, но это не сработало:
...
@classmethod
def subscribe_to_topic(cls, device_id, topics: list):
"""Subscribes a device to a topic using FCM v1 API."""
payload = {"registrationTokens": [device_id]}
success = []
for topic in topics:
try:
url = (f"https://fcm.googleapis.com/v1/"
f"projects/{environ.get('FCM_PROJECT_NAME')}/topics/{topic}:subscribe")
headers = _get_header()
result = requests.post(url, headers=headers, json=payload)
success.append(True) if result.status_code == 200 else success.append(False)
logger.info(f'subscribe user <{device_id}> in topic <{topic}> with status code: {result.status_code}')
except Exception as e:
logger.error(f'{e}')
return all(success)
Я прочитал документацию Firebase, но до сих пор не знаю, как правильно перенести подписку на темы и функции отправки сообщений в новый API.
Может ли кто-нибудь предоставить руководство или пример того, как добиться этого с помощью нового API Firebase Cloud Messaging (V1)?
Любая помощь или примеры будут очень признательны!
Пожалуйста, обратитесь к этому.
Недавно я также перешел с устаревшей версии на v1, я использовал admin sdk для добавления/отмены подписки. Вот ссылка для установки admin sdk.
Вот как я решил эту проблему (шаг за шагом) с помощью Firebase Admin SDK.
Установите Firebase Admin SDK:
pip install firebase-admin
Аутентификация с использованием учетной записи службы:
Убедитесь, что у вас есть JSON-файл сервисной учетной записи для аутентификации. Установите переменную среды GOOGLE_APPLICATION_CREDENTIALS
так, чтобы она указывала на этот файл (добавьте ее в файл .env (переменные среды)):
export GOOGLE_APPLICATION_CREDENTIALS = "/path/to/serviceAccountKey.json" # got this from your firebase console project.
Обновите код для использования Firebase Admin SDK:
Вместо создания HTTP-запросов вручную используйте Firebase Admin SDK для обработки подписок на темы и отправки сообщений.
Вот обновленный код:
import logging
from os import environ
from firebase_admin import messaging, initialize_app, credentials
logger = logging.getLogger('firebase')
class Firebase:
_firebase_app = None
@classmethod
def _get_app(cls):
"""Initializes the Firebase app instance."""
if not cls._firebase_app:
cred = credentials.Certificate(environ.get("GOOGLE_APPLICATION_CREDENTIALS"))
cls._firebase_app = initialize_app(cred)
return cls._firebase_app
@classmethod
def subscribe_to_topic(cls, device_id, topics: list):
"""Subscribes a device to a topic using FCM v1 API."""
_ = cls._get_app()
for topic in topics:
response = messaging.subscribe_to_topic(device_id, topic)
if response.failure_count > 0: # mean an error occured
logger.info("Error") # log its reason
@classmethod
def unsubscribe_to_topic(cls, device_id, topics: list):
"""Unsubscribes a device from a topic."""
for topic in topics:
response = messaging.unsubscribe_from_topic(device_id, topic)
@classmethod
def send_to_topic(cls, topic, message, priority='normal'):
"""Sending message to a topic by topic name."""
try:
_ = cls._get_app()
message = messaging.Message(
data=message,
topic=topic,
)
response = messaging.send(message)
return response
except Exception as e:
logger.error(f"Error sending message to topic '{topic}': {e}")
raise # Re-raise the exception for handling
Вот пример использования класса Firebase
:
device_id = "your_device_registration_token"
topics = ["news", "updates", "sports"]
result = Firebase.subscribe_to_topic(device_id, topics) # First of all subscribe device_id to the topics...
topic = "news" # Our desired topic
message = {
"title": "Breaking News",
"body": "Stay informed with the latest updates."
}
response = Firebase.send_to_topic(topic, message) # Send message to the news topic
print(f"Message sent. Response: {response}")