May 9, 2024 [ Web ]
■pubsub
アプリで簡単縺?Pubsubにパブリッシュや、サブスクもできるので、アプリ間の連携縺?Pubsubが使える
窶? 非同期処理(画蜒?処理とか重めのも縺?
窶? IDの種類 (message id, subscription id, topic id, ack id, project idあたりがアプリでは使繧?れるっぽい
窶?ack id縺?pull時のみ縺?Pushのとき縺?httpステータスコードが200縺?ackとなる
トピック・??メッセージのパブリッシュ先)
窶? スキーマ/外部アクセス許藹??/リテンショ繝?/GCS/バックアップの設定がある (Push/Pullの設定はない)
窶? パブリッシュ側のベストプラクティ繧? (JWT)
サブスクライバ縺?Push縺?Pull (Push縺?Endpointが必要、デフォルト縺?pull)
窶? at-least-once (少な縺?とも1回) 配信を觸??供します
窶? 同じ順蠎?指定キーを持ち、同じリージョンに藹??在している場合は、メッセージの順蠎?指定を有効にできます
窶? サブスクライバーが31日間未使用、またはサブスクリプションが未更新の場合、サブスクリプションは期限切れ
push縺?httpsが必要?
窶? push エンドポイントのサーバーには、認証藹??が署名した有効縺? SSL証譏?書が必要縺?https
窶? Cloud run 縺?Event Arcを設藹??するとサブスクが自動作成されrunのデフォルトhttps縺?URLが使繧?れるが、これ縺?PullよりPushで藹??定した
窶? CronバッチならPullで藹??定するので縺??大驥?リクエスト縺?Pull向きとある(Pullは失敗処理込みの話かも知れん)
トピックのリテンショ繝?:デフォルトなし、最蟆?蛟?:10分、最大蛟?:31譌?
サブスクのリテンショ繝?:デフォルト蛟?:7日、最蟆?蛟?:10分、最大蛟?:7譌?
pubsub ack期限(Ack Deadline)
•デフォルト60秒> 設藹??10分>ack延長で最螟?1時間まで伸ばせると思繧?れる
窶?exactly onceを設藹??しなければ期限の延長は臀??証されない
窶?ack期限を驕?縺?る、あるい縺?Nackを返す場合、メッセージは再配送される
窶?ack応答期限の延長縺?99パーセンタイ繝?(上位1%の値よりも蟆?さい値のうち最大の蛟?)縺?
modifyAckDeadlineを返し、延長してもMaxExtension (ack期限を延髟? する最大蛟?) 60minま縺??
modifyAckDeadlineリクエストを定期的に発鐔??すればよいらしい
メッセージの再試鐔??を強制するに縺?
窶?nack リクエストを送菫?
•饅??レベルのクライアント ライブラリを使用していない場合は、ackDeadlineSeconds を0に設定し縺? modifyAckDeadline リクエストを送信する
窶?pullなら設藹??できる。他には、Cloud Dataflowを組み合繧?せる(プログラムコード縺?Dataflowを使う感じかり、あるい縺?messageについているunique idを利用して、KVS を用いたステート管理をして自前で重複を觸??除する
•再配信は、メッセージに対してクライアントによる否藹??確鐔??応答が行繧?れた場合、または確認応答期限が切れる前にクライアントが確鐔??応答期限を延長し縺? かった場合のいずれかか藹??因で発生することがある。
窶?exactly onceはエラーでも再配信縺?Pubsubパニックしないようにしたいために使うものではない?
pubsubはトピック縺?PublishされたメッセージをDataflowに藹??き継げる
Dataflow (Apache Beam) を大驥?のメッセージをバッチ処理する場合に使える
Pub/Sub→Dataflow→処理
窶?Apache Beamのウィンドウ処理とセッション分析とコネクタのエコシスエムがある
•メッセージ重複の削除ができる
窶?pubsub>dataflow>BQやGCS: この觸??れでログ軆??をストーリミングで入れ込める
BQサブスクリプショ繝? (PubSub縺?BigQuery Storage Write API を使用してデータを BigQueryテーブルに送信、GCSサブスクもある)
サブスクライバ繝?App側のコードでのフロー制御によりちょっと藹??てよのトラフィック急藹??対藹??
デッドレタートピッ繧? (配信試行回数が見れる)やエラーでの再配菫?
窶? Pub/Subサブスクリプションにデッドレタートピックを設藹??してお縺?と、一定の回数再送信が失敗したメッセージの藹??先がデッドレタートピックに藹??更され貯められる
メッセージのフィルタ、同時実行制御により多いメッセージに対応
Pubsubをローカルでエミュレートする
pubsubのスナップショットやリテンショ繝?
トピックにリテンションを設藹??しスナップショット作成> 驕?去のサブスクしたメッセは鐔??えなさそう
サブスクにリテンションを設藹??しスナップショット作成> 驕?去縺?Ackしたメッセは鐔??えなさそう
スナップショットでどう使うのか?
キューがたまっているときに撮るものと思繧?れる。またシーク時間のポイントを設藹??する諢?味がある
スナップショットとシークを使いこなして特藹??期間の再実行を行う機閭?
スナップショットで再実行する
シークは指定時間か最後のスナップショット以降のサブスク再実行(実際push縺?runが再実行された)
Pubsubにどんなメッセージが入ってきているか確鐔??する方觸??
pull形藹??ならAckしなけれ縺?pullボタンで拾い見れる (トピックでパブリッシュしてサブスク縺?Pull し見る)
トラブルシュートはログを見るかデッドレタートピックかGCSバックアップを見る?
デッドレターキュ繝?(ドロップしたものの確認と救済?)
サブスク縺?DLQ縺?ONしデッドレタートピックを設藹??し転送する>GCSにもバックアップできる
DLTでメッセー繧?(実行済縺?OR未藹??行)の再生
データ形蠑?:スキーマを使うか、スキーマなしならdataで藹??得できる
from google cloud import pubsub_v1
from avro.io import DatumReader, BinaryDecoder
from avro schema import Parse
project_id="your-project-id"
subscription id="your-subscription-id"
subscriber pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
avro_schema = Parse("""
{
"type": "record",
"name": "Avro".
"fields": [
{
"name": "ProductName",
"type": "string",
"default":""
},
{
"name": "SKU",
"type": "int",
"default": 0
}
}
def callback(message):
print(f"Received message: {message}")
reader = DatumReader(avro_schema)
decoder = Binary Decoder (message.data)
avro_record = reader.read(decoder)
message_id=message.message id
message.ack()
print("Message ID: (message_id}")
product_name = avro_record['ProductName']
sku= avro_record['SKU']
print("Product Name: (product_name}")
print("SKU: (sku}")
subscriber.subscribe(subscription_path, callback=callback)
def callback(message):
print("Received message: (message)")
data message data
message_id=message.message_id
message.ack()
print("Date (data)")
print("Message ID: (message_id)")
Pub/Sub縺?StreamingPull APIを使用してメッセージをリアルタイムで処理する - G-gen Tech BlogStreamingPull API を使用するとアプリとの間で永続的な藹??方向接続が維持され、Pub/Sub でメッセージが利用可能になるとすぐ縺? pullされる。1 つ縺? pull リクエスト縺? 1 つ縺? pull レスポンスが返る通常縺? 単項 Pull と觸??較すると、高スループット・臀??レイテンシ。必要なメッセージを残す処理をしたりも?GCP側の問題であっても通信が切れた場合は別サーバに軆??縺?なおすためmodifyAckDeadlineも切れ再配信されるバグがある
+++
メッセージ縺?TTL (Time-To-Live) はメッセージ臀??持期間(message retention duration) に臀??存
メッセージが TTLを超えると、自動的に削除され、Subscriberが藹??信できな縺?なる
ackDeadlineSeconds (デフォルト縺?10秒、最螟?600秒) を超えたACKのメッセージは再配信されますが、TTL期限を超えた場合は觸??える
#TTLを最螟?7日間に設定
gcloud pubsub subscriptions update my-subscription message-retention-duration=604800s
DLQ (Dead Letter Queue)
Subscriberが指定回謨?(最螟?100回) メッセージ縺?ACKを行繧?なかった場合に、メッセージを隔離する仕組縺?
DLQもサブスクなので期間やTTL設藹??方觸??は同じ
#DLQ topic 作成
gcloud pubsub topics create my-dlq-topic
#5回失敗したらDLQ縺?
gcloud pubsub subscriptions update my-subscription dead-letter-topic=projects/my-project/topics/my-diq-topic max-delivery-attempts=5
#DLQ subsc作成
gcloud pubsub subscriptions create my-diq-subscription--topic-my-diq-topic
#サブスクの詳細確認
gcloud pubsub subscriptions describe my-diq-subscription
#DLQメッセージの確認、-auto-ackも付けられるが、
gcloud pubsub subscriptions pull my-dlq-subscription -limit=10
[{
"ackld": "Y3g49NfY...=",
"message": {
"data": "SGVsbG8gd29ybGQ=", #Base64 エンコードされたデー繧?
"messageld": "1234567890",
"publish Time": "2024-02-18T12:34:56.789Z"
}
}]
#base64のでコードが必要
echo "SGVsbG8gd29ybGQ=" | base64-decode
#ack-idによりackを返しDLQメッセージを削髯?
gcloud pubsub subscriptions acknowledge my-diq-subscription--ack-ids=Y3g49NfFY
モニタリン繧? > アラートポリシーから新しいアラートを作成し
pubsub.subscription.outstanding_messages を監鐔??対象に選択し、閾値を設藹??するとよい
#DLQ メッセージの再処理をfunctionsに設定 (トピックに入れなおす)
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")
def republish_message(message):
future = publisher.publish(topic_path, message.data)
print(f"Republished message ID: {future.result()}")
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-dlq-subscription")
def callback(message):
print(f"Received message: {message.data}")
republish_message(message)
message.ack()
subscriber.subscribe(subscription_path, callback=callback)
Posted by funa : 12:00 AM