■pubsub
アプリで簡単にPubsubにパブリッシュや、サブスクもできるので、アプリ間の連携にPubsubが使える
• 非同期処理(画像処理とか重めのもの
• IDの種類 (message id, subscription id, topic id, ack id, project idあたりがアプリでは使われるっぽい
※ack idはpull時のみでPushのときはhttpステータスコードが200でackとなる
トピック(メッセージのパブリッシュ先)
• スキーマ/外部アクセス許可/リテンション/GCS/バックアップの設定がある (Push/Pullの設定はない)
• パブリッシュ側のベストプラクティス (JWT)
サブスクライバのPushとPull (PushはEndpointが必要、デフォルトはpull)
• at-least-once (少なくとも1回) 配信を提供します
• 同じ順序指定キーを持ち、同じリージョンに存在している場合は、メッセージの順序指定を有効にできます
• サブスクライバーが31日間未使用、またはサブスクリプションが未更新の場合、サブスクリプションは期限切れ
pushはhttpsが必要?
• push エンドポイントのサーバーには、認証局が署名した有効な SSL証明書が必要でhttps
• Cloud run でEvent Arcを設定するとサブスクが自動作成されrunのデフォルトhttpsのURLが使われるが、これはPullよりPushで安定した
• CronバッチならPullで安定するのでは?大量リクエストはPull向きとある(Pullは失敗処理込みの話かも知れん)
トピックのリテンション:デフォルトなし、最小値:10分、最大値:31日
サブスクのリテンション:デフォルト値:7日、最小值:10分、最大値:7日
pubsub ack期限(Ack Deadline)
•デフォルト60秒> 設定10分>ack延長で最大1時間まで伸ばせると思われる
•exactly onceを設定しなければ期限の延長は保証されない
•ack期限を過ぎる、あるいはNackを返す場合、メッセージは再配送される
•ack応答期限の延長は99パーセンタイル(上位1%の値よりも小さい値のうち最大の値)で
modifyAckDeadlineを返し、延長してもMaxExtension (ack期限を延長 する最大値) 60minまで?
modifyAckDeadlineリクエストを定期的に発行すればよいらしい
メッセージの再試行を強制するには
•nack リクエストを送信
•高レベルのクライアント ライブラリを使用していない場合は、ackDeadlineSeconds を0に設定して modifyAckDeadline リクエストを送信する
•pullなら設定できる。他には、Cloud Dataflowを組み合わせる(プログラムコードでDataflowを使う感じかり、あるいはmessageについているunique idを利用して、KVS を用いたステート管理をして自前で重複を排除する
•再配信は、メッセージに対してクライアントによる否定確認応答が行われた場合、または確認応答期限が切れる前にクライアントが確認応答期限を延長しな かった場合のいずれかか原因で発生することがある。
※exactly onceはエラーでも再配信でPubsubパニックしないようにしたいために使うものではない?
pubsubはトピックにPublishされたメッセージをDataflowに引き継げる
Dataflow (Apache Beam) を大量のメッセージをバッチ処理する場合に使える
Pub/Sub→Dataflow→処理
•Apache Beamのウィンドウ処理とセッション分析とコネクタのエコシスエムがある
•メッセージ重複の削除ができる
•pubsub>dataflow>BQやGCS: この流れでログ等をストーリミングで入れ込める
BQサブスクリプション (PubSubはBigQuery Storage Write API を使用してデータを BigQueryテーブルに送信、GCSサブスクもある)
サブスクライバーApp側のコードでのフロー制御によりちょっと待てよのトラフィック急増対応
デッドレタートピック (配信試行回数が見れる)やエラーでの再配信
• Pub/Subサブスクリプションにデッドレタートピックを設定しておくと、一定の回数再送信が失敗したメッセージの宛先がデッドレタートピックに変更され貯められる
メッセージのフィルタ、同時実行制御により多いメッセージに対応
Pubsubをローカルでエミュレートする
pubsubのスナップショットやリテンション
トピックにリテンションを設定しスナップショット作成> 過去のサブスクしたメッセは見えなさそう
サブスクにリテンションを設定しスナップショット作成> 過去のAckしたメッセは見えなさそう
スナップショットでどう使うのか?
キューがたまっているときに撮るものと思われる。またシーク時間のポイントを設定する意味がある
スナップショットとシークを使いこなして特定期間の再実行を行う機能
スナップショットで再実行する
シークは指定時間か最後のスナップショット以降のサブスク再実行(実際pushでrunが再実行された)
Pubsubにどんなメッセージが入ってきているか確認する方法
pull形式ならAckしなければpullボタンで拾い見れる (トピックでパブリッシュしてサブスクでPull し見る)
トラブルシュートはログを見るかデッドレタートピックかGCSバックアップを見る?
デッドレターキュー(ドロップしたものの確認と救済?)
サブスクでDLQのONしデッドレタートピックを設定し転送する>GCSにもバックアップできる
DLTでメッセージ(実行済みOR未実行)の再生
データ形式:スキーマを使うか、スキーマなしならdataで取得できる
from google cloud import pubsub_v1
from avro.io import DatumReader, BinaryDecoder
from avro schema import Parse
project_id="your-project-id"
subscription id="your-subscription-id"
subscriber pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
avro_schema = Parse("""
{
"type": "record",
"name": "Avro".
"fields": [
{
"name": "ProductName",
"type": "string",
"default":""
},
{
"name": "SKU",
"type": "int",
"default": 0
}
}
def callback(message):
print(f"Received message: {message}")
reader = DatumReader(avro_schema)
decoder = Binary Decoder (message.data)
avro_record = reader.read(decoder)
message_id=message.message id
message.ack()
print("Message ID: (message_id}")
product_name = avro_record['ProductName']
sku= avro_record['SKU']
print("Product Name: (product_name}")
print("SKU: (sku}")
subscriber.subscribe(subscription_path, callback=callback)
def callback(message):
print("Received message: (message)")
data message data
message_id=message.message_id
message.ack()
print("Date (data)")
print("Message ID: (message_id)")
Pub/SubでStreamingPull APIを使用してメッセージをリアルタイムで処理する - G-gen Tech BlogStreamingPull API を使用するとアプリとの間で永続的な双方向接続が維持され、Pub/Sub でメッセージが利用可能になるとすぐに pullされる。1 つの pull リクエストで 1 つの pull レスポンスが返る通常の 単項 Pull と比較すると、高スループット・低レイテンシ。必要なメッセージを残す処理をしたりも?GCP側の問題であっても通信が切れた場合は別サーバに繋ぎなおすためmodifyAckDeadlineも切れ再配信されるバグがある
+++
メッセージのTTL (Time-To-Live) はメッセージ保持期間(message retention duration) に依存
メッセージが TTLを超えると、自動的に削除され、Subscriberが受信できなくなる
ackDeadlineSeconds (デフォルトは10秒、最大600秒) を超えたACKのメッセージは再配信されますが、TTL期限を超えた場合は消える
#TTLを最大7日間に設定
gcloud pubsub subscriptions update my-subscription message-retention-duration=604800s
DLQ (Dead Letter Queue)
Subscriberが指定回数(最大100回) メッセージのACKを行わなかった場合に、メッセージを隔離する仕組み
DLQもサブスクなので期間やTTL設定方法は同じ
#DLQ topic 作成
gcloud pubsub topics create my-dlq-topic
#5回失敗したらDLQへ
gcloud pubsub subscriptions update my-subscription dead-letter-topic=projects/my-project/topics/my-diq-topic max-delivery-attempts=5
#DLQ subsc作成
gcloud pubsub subscriptions create my-diq-subscription--topic-my-diq-topic
#サブスクの詳細確認
gcloud pubsub subscriptions describe my-diq-subscription
#DLQメッセージの確認、-auto-ackも付けられるが、
gcloud pubsub subscriptions pull my-dlq-subscription -limit=10
[{
"ackld": "Y3g49NfY...=",
"message": {
"data": "SGVsbG8gd29ybGQ=", #Base64 エンコードされたデータ
"messageld": "1234567890",
"publish Time": "2024-02-18T12:34:56.789Z"
}
}]
#base64のでコードが必要
echo "SGVsbG8gd29ybGQ=" | base64-decode
#ack-idによりackを返しDLQメッセージを削除
gcloud pubsub subscriptions acknowledge my-diq-subscription--ack-ids=Y3g49NfFY
モニタリング > アラートポリシーから新しいアラートを作成し
pubsub.subscription.outstanding_messages を監視対象に選択し、閾値を設定するとよい
#DLQ メッセージの再処理をfunctionsに設定 (トピックに入れなおす)
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")
def republish_message(message):
future = publisher.publish(topic_path, message.data)
print(f"Republished message ID: {future.result()}")
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-dlq-subscription")
def callback(message):
print(f"Received message: {message.data}")
republish_message(message)
message.ack()
subscriber.subscribe(subscription_path, callback=callback)