本文檔旨在解決 python Pub/Sub 訂閱者客戶端在應用訂閱過濾器后無法拉取消息的問題。通過分析問題原因,提供了一種簡單有效的解決方案,即在創建訂閱后添加短暫的延遲,確保訂閱完全生效后再創建訂閱者客戶端。
問題分析
在使用 Google Cloud Pub/Sub 的 Python 客戶端時,有時會遇到這樣的情況:當未對訂閱應用任何過濾器時,訂閱者客戶端能夠成功地從訂閱中拉取消息;然而,一旦應用了訂閱過濾器,客戶端就無法再拉取消息。盡管手動在 Pub/Sub 控制臺中拉取消息可以正常工作,表明消息確實符合過濾器條件并存在于訂閱中,但客戶端卻無法接收它們。
原因探究
根本原因在于,當使用過濾器創建訂閱時,可能需要一些時間才能使訂閱完全生效并正確注冊訂閱者客戶端。如果在訂閱創建后立即創建訂閱者客戶端,客戶端可能無法正確地與已過濾的訂閱關聯,導致無法拉取消息。
解決方案
解決此問題的最簡單有效的方法是在創建訂閱后添加一個短暫的延遲,然后再創建訂閱者客戶端。這可以通過在代碼中添加 time.sleep() 函數來實現。
立即學習“Python免費學習筆記(深入)”;
以下是一個示例代碼片段,演示了如何添加延遲:
import time from google.cloud import pubsub_v1 # 創建訂閱 (假設 subscription_path 已經定義) # subscriber.create_subscription(request={"name": subscription_path, "topic": topic_path, "filter": "attributes.key = 'value'"}) # 添加延遲,例如 5 秒 time.sleep(5) # 創建訂閱者客戶端 subscriber = pubsub_v1.SubscriberClient() # ... 訂閱和消息處理邏輯 ...
解釋:
- time.sleep(5):這行代碼會暫停程序的執行 5 秒鐘。您可以根據實際情況調整延遲時間。
完整代碼示例
下面是一個完整的示例,展示了如何在訂閱創建后添加延遲,以確保訂閱者客戶端能夠正常拉取消息:
import os import time from google.cloud import pubsub_v1 from app.services.subscription_service import save_bill_events from app.utils.constants import BILL_SUBSCRIPTION_GCP_PROJECT_ID, BILL_EVENT_SUBSCRIPTION_ID from app.utils.logging_tracing_manager import get_logger logger = get_logger(__file__) def callback(message: pubsub_v1.subscriber.message.Message) -> None: save_bill_events(message.data) message.ack() # 在創建訂閱后添加延遲 # 假設以下代碼在創建訂閱之后執行 time.sleep(5) subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(os.environ.get(BILL_SUBSCRIPTION_GCP_PROJECT_ID), BILL_EVENT_SUBSCRIPTION_ID) # Limit the subscriber to only have fixed number of outstanding messages at a time. flow_control = pubsub_v1.types.FlowControl(max_messages=50) streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control) async def poll_bill_subscription(): with subscriber: try: # When `timeout` is not set, result() will block indefinitely, # unless an exception is encountered first. streaming_pull_future.result() except Exception as e: # Even in case of an exception, subscriber should keep listening logger.error( f"An error occurred while pulling message from subscription {BILL_EVENT_SUBSCRIPTION_ID}", exc_info=True) pass
注意事項:
- 延遲時間的選擇取決于您的具體環境和 Pub/Sub 服務的響應速度。建議根據實際情況進行調整。
- 在某些情況下,可能需要更長的延遲時間才能確保訂閱完全生效。
- 除了添加延遲之外,您還可以嘗試在創建訂閱者客戶端之前顯式地檢查訂閱的狀態,以確保訂閱已成功創建并已啟用過濾器。
總結
當 Python Pub/Sub 訂閱者客戶端在應用訂閱過濾器后無法拉取消息時,通常是因為訂閱尚未完全生效。通過在創建訂閱后添加一個短暫的延遲,可以確保訂閱者客戶端能夠正確地與已過濾的訂閱關聯,從而解決此問題。這種方法簡單有效,可以避免不必要的錯誤和麻煩。在實際應用中,請根據您的具體環境和 Pub/Sub 服務的響應速度調整延遲時間,以達到最佳效果。