■24/6/2 1:06PM
Cloud SQL
■Cloud SQL Python Connector (Cloud SQL language Connector)CloudSQL auth proxyのバイナリインストールでないやり方
Cloud SQL Python Connector自体は暗号化しないが、内部IPならサーバレスVPCコネクタで暗号化された通信が使え安全になっている。外部IPアドレスの場合はCloud SQL Auth Proxyで通信を暗号化。Cloud SQL 言語コネクタの概要 | Cloud SQL for MySQL | Google Cloud
GitHub - GoogleCloudPlatform/cloud-sql-python-connector: A Python library for connecting securely to your Cloud SQL instances.
事前必要(pip install>requirements.txt)
Flask==3.0.3gunicorn==22.0.0Werkzeug==3.0.3google-cloud-bigquery==3.25.0google-cloud-logging==3.11.1google-cloud-secret-manager==2.20.2google-api-python-client==2.141.0google-auth-httplib2==0.2.0google-auth-oauthlib==1.2.1websocket-client==1.8.0google-cloud-resource-manager==1.12.5Flask-WTF==1.2.1cloud-sql-python-connector==1.16.0pymysql==1.0.3
from flask import Flask, jsonifyfrom google.cloud.sql.connector import Connectorfrom google.cloud import secretmanagerimport pymysql
# 環境変数の定義PW_NAME = "sql-pw"PROJECT_NUM = "1234567890"DB_INSTANCE = "prj:asia-northeast1:db_instance"DB_USER = "db-user"DB_NAME = "db001"
# Secret Manager からパスワードを取得する関数def get_pw(pw_name, project_num): client = secretmanager.SecretManagerServiceClient() resource_name = f"projects/{project_num}/secrets/{pw_name}/versions/latest" res = client.access_secret_version(name=resource_name) credentials = res.payload.data.decode("utf-8") return credentials
# Cloud SQL接続def sql_getconn(connector): pw = get_pw(PW_NAME, PROJECT_NUM) conn = connector.connect( DB_INSTANCE, "pymysql", user=DB_USER, password=pw, db=DB_NAME, ip_type="private", ) return conn
app = Flask(__name__)
@app.route('/test', methods=['GET'])def get_table_data(): try: connector = Connector() conn = sql_getconn(connector) cursor = conn.cursor()
# SQLを実行して結果を取得 cursor.execute("SELECT no, name, targetDate FROM test") rows = cursor.fetchall()
# 結果をJSON形式に変換 result = [ { "no": row[0], "name": row[1], "targetDate": row[2].strftime("%Y-%m-%d %H:%M:%S") if row[2] else None } for row in rows ]
cursor.close() conn.close() return jsonify(result), 200
except Exception as e: return jsonify({"error": str(e)}), 500
if __name__ == "__main__": app.run(host="0.0.0.0", port=8080)
=============
# 追加オプションを使った接続も可 connector = Connector( ip_type="public", # "private" または "psc" も使用可能 enable_iam_auth=False, timeout=30, credentials=None, # 必要ならGoogle認証情報を渡す refresh_strategy="lazy", # "lazy" または "background" )
#トランザクション
try: conn = sql_getconn(connector) conn.autocommit = False # トランザクション開始、あるいは conn.begin() cursor = conn.cursor() # 挿入するデータを準備 new_data = [ {"no": 4, "name": "新しい名前4", "targetDate": "2024-05-01"}, {"no": 5, "name": "新しい名前5", "targetDate": "2024-05-02"}, ] # INSERT文を構築して実行 for data in new_data: sql = "INSERT INTO test (no, name, targetDate) VALUES (%s, %s, %s)" values = (data["no"], data["name"], data["targetDate"]) cursor.execute(sql, values) conn.commit() # トランザクションをコミット print("Data inserted successfully.") except Exception as e: conn.rollback() # エラーが発生した場合はロールバック print(f"Transaction rolled back due to an error: {e}") finally: cursor.close() conn.close()
#カーソル
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) params: dict形式で取得#[{'no': 1, 'name': 'Alice',...}, ...]cursor = conn.cursor(cursor=pymysql.cursors.SSCursor) 大量のデータを効率的に取得するためにストリーミングで結果を処理cursor.execute(query, params=None) cursor.execute("SELECT * FROM test WHERE no = %s", (1,)) params: プレースホルダーに対応する値のタブルまたはリストcursor.executemany(query, param list) cursor.executemany("INSERT INTO test (no, name) VALUES (%s, %s)", [(1, 'Alice'), (2, 'Bob')]) param list:繰り返し実行するパラメータのリストまたはタブルのリストcursor.fetchone() row = cursor.fetchone() #結果があれば (1, 'Alice', "2025-01-01") のような形式で1行のみ取得cursor.rowcount print(cursor.rowcount) #影響を受けた行数を返す
■接続検証用コンテナをビルド (内部IPを使うrun用)gcloud builds submit --tag asia-northeast1-docker.pkg.dev/prj/artifact_reg_name/app_name
■IAM?
Cloud SQL設定にCloud SQL 管理者 (roles/cloudsql.admin)、Cloud SQL インスタンス ユーザー (roles/cloudsql.instanceUser)等のIAMが要る?IAMユーザならいる、ローカルUserなら不要と思われる、ローカルでもCloud SQL Client (roles/cloudsql.client)等は要る
■Cloud SQL MySQL設定【開発環境】db_instance01Enterprise / Sandbox / AsiaNorthEast1 (Tokyo) / Single zoneMySQL ver 8.4Shared core/1cpu 0.6GB/HDD/10GB(auto increase)PrivateIP/設定にnwが必要(下記)/Enable private pathAuto daily backup 7days (1-5AM) / Enable point-in-time recoveryWeek1 sun 0-1am/Enable query insightsroot PW: 69696969【本番環境】Enterprise plus? キャッシュ使う?※CloudSQLはTFファイルに記載がなくてもTFステートファイルにPWを含めてしまうためTF化しない
- NW: projects/prj/global/networks/sql-vpc-nw- Connection name: prj:asia-northeast1 db_instance01
ユーザの作成 sql-user/82828282 PWをコードに入れない、シクレMgrに保存
■MySQLutf8mb4_ja_0900_as_ci_ksを使う?MySQL 8.0のCharset utf8mb4での日本語環境で使うCollationで文字比較をしてみる - CLOVER🍀_ai... アクセントを区別しない (Accent Insensitive)_as... アクセントを区別する (Accent Sensitive)_ci... 大文字・小文字を区別しない (Case Insensitive)_cs... 大文字・小文字を区別する (Case Sensitive)_ks... カナを区別する (Kana Sensitive)_bin... バイナリ
utf8mb4_unicode_ciでは"ア”と“あ”は同じものとして扱われるutf8mb4_ja_0900_as_ci_ks では"ア"≠”あ”となりカタカナとひらがなを明確に区別できるutf8mb4_ja_0900_as_ci_ks ならふりがなを使った並び替えで有効日本語のデータがメインで検索やソートでひらがな・カタカナ・濁点の区別が必要なら utf8mb4_ja_0900_as_ci_ks が適日本語と英語を混ぜたデータや広く互換性を持たせたいなら utf8mb4_unicode_ci の方が無難
インデックスをどのカラムに作ればいいか迷った時の3つの設計方針 #DB - Qiita
テーブルレコード数が1万件以上全体のレコード数の5%程度に絞り込めるカーディナリティ高さWHERE句の選択条件、または結合条件主キーおよびユニークの列には作成が不要インデックスは更新性能を劣化させる
データベースとテーブルの作成
CREATE DATABASE db;USE db;CREATE TABLE test ( no INT(10) UNSIGNED NOT NULL AUTO_INCREMENT, name VARCHAR(8) NOT NULL, targetDate TIMESTAMP NOT NULL, PRIMARY KEY (no), INDEX index_name (name), INDEX index_targetDate (targetDate))ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_ja_0900_as_ci_ks;
ENUM型は選択肢で早いがALTERが面倒なのでvarcharaにするinquiry_type ENUM('bq', 'pii') NOT NULL↓inquiry_type VARCHAR(255) NOT NULL,
VARCHAR(255) (よく使われる最大サイズ)VARCHAR(1024) (長めの文字列)VARCHAR(4096) (長文向け)長いテキストを扱うならTEXT型InnoDB の1行の最大サイズは約8KB (8126/バイト)長さは?メールは255で良い
サンプルデータINSERT INTO `table` (`name`, `date`) VALUES ('aaa', '2002-02-23');
ORMapperは面倒なのでSQLを使う
ORM Quick Start — SQLAlchemy 2.0 Documentation
【SQLAlchemy】Generic Typesと各種DBの型 対応表
SQLAlchemyでのテーブル定義 #Python - Qiita
■データベースフラグ
confが直接変更できなためフラグとしてパラメータを渡せるデータベース フラグを構成する | Cloud SQL for MySQL | Google Cloud Cloud SQL studio (コンソールでMySQLが使える) MySQLクライアントを使いたいならAuth proxyが必要 HA構成だとフェールオーバーやリードレプリカ等が使える
■NW
Cloud SQLを徹底解説! - G-gen Tech BlogCloud SQLが内部IPだとサーバレスVPCコネクタ、or 外部IPならSQL + auth proxy内部IPで良いのでVPCを作る、CloudSQLを内部IPで作るサーバレスVPCアクセスコネクタを作る vpc: sql-vpc-nw, subnet: sql-vpc-subnet 192.168.77.0/24 Gateway 192.168.77.1, Private Google Access On sql-vpc-nw-ip-range 192.168.78.0/24 on cloudSQL run-serverless-vpc-ac 192.168.79.0/28 on Runファイアウォールルールでポート (デフォルトで3306など) を開放Cloud Run のNW設定で、サーバーレス VPCコネクタを選択、ルートオプションとしてすべてのトラフィックを VPC コネクタ経由で送信を選択CloudSQLを30分程度掛けて起動、接続>接続テスト
VPC(例: 10.0.0.0/16)サブネット(Cloud SQL 用): 10.10.0.0/24(例: us-central1、VPC内)サブネット(VPCコネクタ用): 10.8.0.0/28(RunからVPCへ通信用、VPC外) VPC コネクタのサブネットは 10.8.0.0/28 のような小さな範囲を使用、VPC外だがrun自体がVPC外だから? VPC コネクタはリージョン単位なので、Cloud Run と Cloud SQL を同じリージョンに配置するのが望ましいGoogle Cloudの内部NW設計によりVPC内の異なるサブネット間でも通信可能 VPC内なら異なるリージョンのサブネットでもOK(VPC自体には範囲を設定なしでサブネットでIPが被らなければOKかと 追加の設定なしで、例えば us-central1 の VM から asia-northeast1 の Cloud SQLに直接アクセス可
外部IPの場合:
アプリがrunならサイドカーコンテナとしてAuth Proxyを追加できる サイドカーは同Pod内なのでループバックアドレス127.0.0.1あるいはlocalhost:5432 (Auth Proxy起動時に指定したポート) に通信しCloudSQLに接続するGCEにDLしてAuth proxyインストールでもいい アプリのコネクタはAuth Proxy動いているGCEのIP:ポート番号を指定に通信しCloudSQLに接続する FWでポートも開けること
■run サービスアカウント
run-sql@prj.iam.gserviceaccount.com に必要な権限 Cloud SQL Client (roles/cloudsql.client) Run Invoker (roles/run.invoker) Compute Network User (roles/compute.networkUser) -VPCコネクタを使用する
runを建てるが、InternalIPのため同プロジェクト同VPCのGCE を作成し移動してCURLでテストcurl -H "Authorization: Bearer $(gcloud auth print-identity-token)" "https://run-sql-test-1212124.asia-northeast1.run.app/test"
■MySQLでUUIDを使うか、連番を使うか? > ULIDを使うUUIDは連番に対し セキュリティ上より安全、サーバが異なってもユニーク パフォーマンスが悪い (UUIDをプライマリキーにすると速度が落ちる場合がある)連番とUUIDの両方を振り出しておく? > ULIDを使うことにするMySQLでUUIDv4をプライマリキーにするとパフォーマンス問題が起きるのはなぜ?(N回目)
Comment (0)
■24/6/1 3:24PM
GCP hands-off 3
■VPC(例: 10.0.0.0/16)サブネット(Cloud SQL 用): 10.10.0.0/24(例: us-central1、VPC内)サブネット(VPCコネクタ用): 10.8.0.0/28(RunからVPCへ通信用、VPC外) VPC コネクタのサブネットは 10.8.0.0/28 のような小さな範囲を使用、VPC外だがrun自体がVPC外だから? VPC コネクタはリージョン単位なので、Cloud Run と Cloud SQL を同じリージョンに配置するのが望ましいGoogle Cloudの内部NW設計によりVPC内の異なるサブネット間でも通信可能 VPC内なら異なるリージョンのサブネットでもOK(VPC自体には範囲を設定なしでサブネットでIPが被らなければOKかと 追加の設定なしで、例えば us-central1 の VM から asia-northeast1 の Cloud SQLに直接アクセス可
■対象アセットに対する付与可能なロールの一覧表示リソースに対して付与可能なロールの表示 | IAM Documentation | Google CloudFull Resource Name(フルでのアセット名を探せる)Asset names | Cloud Asset Inventory Documentation | Google Cloud
import google.authimport googleapiclient.discovery
def view_grantable_roles(full_resource_name: str) -> None: credentials.google.auth.default( scopes=["https://www.googleapis.com/auth/cloud-platform"] ) service = googleapiclient.discovery.build('iam', 'v1', credentials credentials) roles = ( service roles() queryGrantableRoles (body=["fullResourceName": full_resource_name}).execute() ) for role in roles["roles"] if "title" in role: print("Title: role["title"]) print("Name: role["name"]) if "description" in role: print("Description:" + role["description"]) print("")
project_id = "prj"#resource = f"//bigquery.googleapis.com/projects/prj/datasets/ds"#resource + f"//bigquery googleapis.com/projects/prj/datasets/ds/tables/tbl"resource = f"//cloudresourcemanager.googleapis.com/projects/{project_id}"view_grantable_roles(resource)
■ロールの一覧表示https://cloud.google.com/iam/docs/roles-overview?hl=ja#role-types1)事前定義ロールの場合は roles.get() を使用します。2)プロジェクトレベルのカスタムロールの場合は、projects.roles.get() を使用します。3)組織レベルのカスタムロールの場合は、organizations.roles.get() を使用します。 これら3種類で全てを網羅すると思われます projectIDがsys-のものはGAS、lifecycleStateがACTIVE以外のものも含まれるので注意
■bqへの書き込み
export GOOGLE_APPLICATION_CREDENTIALS="path/to/your-service-account-key.json"pip install google-cloud-bigquery
from google.cloud import bigqueryclient = bigquery Client()#書き込み先のテーブル情報table_ref = f"{project_id}.{dataset_id}.{table_id}"
#サンプルデータの生成def generate_sample_data(num_rows) data = [ { "organization": f"org_(num_rows)", "permission". "view", } for _ in range(num_rows) ] return data
data_to_insert = generate_sample_data(5000)errors = client.insert_rows_json(table_ref, data_to_insert)
if errors: print("Errors occurred: {errors}")else: print("Data successfully written to BigQuery!")
■データカタログデータアセットを検索する | Data Catalog Documentation | Google Cloud
Class SearchCatalogRequest (3.23.0) | Python client library | Google Cloud
サンプルで仕様書のAPIを使っているがqueryが空白刻みで入れる等の使い方が分かる
■BQスキーマ+ポリシータグ取得
from google.cloud import bigquerydef get_policy_tags_from_bq_table(project_id, dataset_id, table_id): print("################ bigquery.Client.get_table().schema start ################") print(f"Target table: {project_id}.{dataset_id}.{table_id}") bq_client = bigquery.Client() table = bq_client.get_table(f"{project_id}.{dataset_id}.{table_id}") schema = table.schema policy_tags = [] for field in schema: print(f"Column: {field.name}") if field.policy_tags: tags = [tag for tag in field.policy_tags.names] policy_tags.extend(tags) print(f"Policy Tags: {tags}") else: print("> No Policy Tags assigned.") return policy_tags
PROJECT_ID = "prj"DATASET_ID = "ds"TABLE_ID = "test001"
policy_tags = get_policy_tags_from_bq_table(PROJECT_ID, DATASET_ID, TABLE_ID)print("Collected Policy Tags:", policy_tags)
■ポリシータグ設定
from google.cloud import datacatalog_v1from google.cloud import bigquery
PROJECT_ID = "prj"DATASET_ID = "ds"TABLE_ID = "tbl01"COLUMN_NAME = "aaa"POLICY_TAG_PROJECT = "prj"POLICY_TAG_NAME = "projects/prj/locations/us/taxonomies/83893110/policyTags/11089383"
def list_taxonomy_and_policy_tag(): print("############# Start #############") list_policy_tags = [] client = datacatalog_v1.PolicyTagManagerClient() request = datacatalog_v1.ListTaxonomiesRequest( parent=f"projects/{POLICY_TAG_PROJECT}/locations/us" ) try: page_result = client.list_taxonomies(request=request) except google.api_core.exceptions.PermissionDenied as e: print(f"Skipping project {POLICY_TAG_PROJECT} due to PermissionDenied error: {e}") return [] except Exception as e: print(f"An error occurred for project {POLICY_TAG_PROJECT}: {e}") return []
for taxonomy in page_result: print(f"############ Taxonomy display_name: {taxonomy.display_name} #############") print(f"############ Taxonomy name: {taxonomy.name} #############") request_tag = datacatalog_v1.ListPolicyTagsRequest(parent=taxonomy.name) try: page_result_tag = client.list_policy_tags(request=request_tag) except Exception as e: print(f"Error on {request_tag}: {e}") break for policy_tag in page_result_tag: print("Policy tag:") print(policy_tag) list_policy_tags.append({ "project_id": POLICY_TAG_PROJECT, "taxonomy_display_name": taxonomy.display_name, "taxonomy_name": taxonomy.name, "policy_tag_name": policy_tag.name, "policy_tag_display_name": policy_tag.display_name, }) return list_policy_tags
def update_table_schema_with_policy_tag(list_policy_tags): for policy_tag in list_policy_tags: if policy_tag['policy_tag_name'] == POLICY_TAG_NAME: print( f"Target policy tag:\n" f" Project ID: {policy_tag['project_id']}\n" f" Taxonomy Display Name: {policy_tag['taxonomy_display_name']}\n" f" Taxonomy Name: {policy_tag['taxonomy_name']}\n" f" Policy Tag Name: {policy_tag['policy_tag_name']}\n" f" Policy Tag Display Name: {policy_tag['policy_tag_display_name']}" ) client = bigquery.Client() table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" table = client.get_table(table_ref) new_schema = [] for field in table.schema: if field.name == COLUMN_NAME: new_schema.append( bigquery.SchemaField( name=field.name, field_type=field.field_type, # Keep original field type mode=field.mode, # Keep original mode description=field.description, policy_tags=bigquery.PolicyTagList([POLICY_TAG_NAME]), ) ) else: new_schema.append(field) table.schema = new_schema updated_table = client.update_table(table, ["schema"]) print( f"Updated table {updated_table.project}.{updated_table.dataset_id}.{updated_table.table_id} schema\n" f"with policy_tag {POLICY_TAG_NAME} on the column {COLUMN_NAME} successfully." )if __name__ == "__main__": list_policy_tags = list_taxonomy_and_policy_tag() update_table_schema_with_policy_tag(list_policy_tags)
■KSA問題ブログ内で情報が分散、まとめたい
ワークロード毎にKSA1つ
ksaのtokenはk8s api用でgcp apiに使えない、exprireしない問題がある> Workload identity で解決する
Workload Identity は KSAとGSAの紐づけで、Workload Identity Federationとは違う
workloads がk8sの用語でリソースの総称で、そのidentityであり権限管理、でFederationは更に外部連携 ワークロードは、pod, deplyment, StatefulSet, DaemonSet, job, CronJob, ReplicationController, ReplicaSet
[Click for image]
Workload Identity がGKE クラスタで有効化されると、gke-metadata-server という DaemonSet がデプロイgke-metadata-server は Workload Identity を利用する上で必要な手続きを実行
SAの紐づけ/// 現行【Workload Identity】 GKE で Google Service Account を利用する正しいやり方 #GoogleCloud - QiitaWorkload identityを有効にして(autopilot でデフォルト有効) GCP側でKSAとGSAをIAM policy binding k8s側でKSAとGSAをkubectl annotate podでKSAを設定↓/// 新型のKSA直接bind新しくなった Workload Identity Federation for GKE を試してみるworkload identity federation ならGSAがなくなりKSAを直接bindできるWorkload identityを有効にして(autopilot でデフォルト有効)GCP側でKSAにIAM policy binding
※混在するので現行のままが良いようです
■Workload identity federation(GCP外との連携)
Workload Identity Federation の仕組みまずWIF用のSAを作成する>SAに権限を付与する>1)Workload identity provider+SAの情報をgithub actionに埋めて使う
GitHub Actions から GCP リソースにアクセスする用途2)Workload identity poolから構成情報をDLしAWSアプリに埋めて使う
AWSからGCP リソースにアクセスする用途 gcloud auth login-cred-file=構成情報ファイルパス gcloud CLI を承認する | Google Cloud CLI Documentation3)Workload identity poolから構成情報をEKSのOIDC ID token のパスを指定しDL
EKS から GCP リソースにアクセスする用途- EKSのマニフェストのサービスアカウントのアノテーションにIAMロールを記載- EKSのサービスアカウントを使用したい Podのアノテーションに追加- マウント先のパスを環境変数 GOOGLE APPLICATION_CREDENTIALS に設定- Pod内でSDK またはコマンドにてGCP リソースヘアクセス可能か確認
Comment (0)
■24/5/9 12:00AM
Pubsub
■pubsubPublisher app → |GCPの壁| Topic(Schema) → Subscription 1や2 |GCPの壁| → Subscriber app
サブスクライバーappにPull/PushさせるPull/Pushのサブスクリプションをトピックに紐づける設定をしておく
[Click for image]
【図解付き】Cloud Pub/Subに概要や使い方についてわかりやすく解説 - KIYONO Engineer Blog (kiyono-co.jp)Pub/Sub サービスの概要 | Pub/Sub ドキュメント | Google CloudGCPのCloud PubSubで考慮すること - Carpe Diem (hatenablog.com)Pub/Sub の割り当てと上限 | Pub/Sub ドキュメント | Google Cloudアプリで簡単にPubsubにパブリッシュや、サブスクもできるので、アプリ間の連携にPubsubが使える • 非同期処理(画像処理とか重めのもの • IDの種類 (message id, subscription id, topic id, ack id, project idあたりがアプリでは使われるっぽい ※ack idはpull時のみでPushのときはhttpステータスコードが200でackとなるGCP - Pub/Sub サービス概要 #GoogleCloud - QiitaPub/Sub メッセージの作成とレスポンス | Python 用 App Engine フレキシブル環境に関するドキュメント | Google Cloudトピック(メッセージのパブリッシュ先)
• スキーマ/外部アクセス許可/リテンション/GCS/バックアップの設定がある (Push/Pullの設定はない) • パブリッシュ側のベストプラクティス (JWT) Pub/Sub トピックにパブリッシュするためのベスト プラクティス | Pub/Sub ドキュメント | Google CloudサブスクライバのPushとPull (PushはEndpointが必要、デフォルトはpull) GCP - Pub/Sub サービス概要 #GoogleCloud - Qiita • at-least-once (少なくとも1回) 配信を提供します • 同じ順序指定キーを持ち、同じリージョンに存在している場合は、メッセージの順序指定を有効にできます • サブスクライバーが31日間未使用、またはサブスクリプションが未更新の場合、サブスクリプションは期限切れ • メッセージ数が多いとpull向き サブスクリプション タイプを選択する | Pub/Sub ドキュメント | Google Cloudpushはhttpsが必要? push サブスクリプションを作成する | Pub/Sub ドキュメント | Google Cloud • push エンドポイントのサーバーには、認証局が署名した有効な SSL証明書が必要でhttps • Cloud run でEvent Arcを設定するとサブスクが自動作成されrunのデフォルトhttpsのURLが使われるが、これはPullよりPushで安定した • CronバッチならPullで安定するのでは?大量リクエストはPull向きとある(Pullは失敗処理込みの話かも知れん)トピックのリテンション:デフォルトなし、最小値:10分、最大値:31日サブスクのリテンション:デフォルト値:7日、最小值:10分、最大値:7日 サブスクリプション プロパティ | Pub/Sub ドキュメント | Google Cloudpubsub ack期限(Ack Deadline)
•デフォルト60秒> 設定10分>ack延長で最大1時間まで伸ばせると思われる リース管理で確認時間を延長する | Pub/Sub ドキュメント | Google Cloud •exactly onceを設定しなければ期限の延長は保証されない •ack期限を過ぎる、あるいはNackを返す場合、メッセージは再配送される •ack応答期限の延長は99パーセンタイル(上位1%の値よりも小さい値のうち最大の値)で modifyAckDeadlineを返し、延長してもMaxExtension (ack期限を延長 する最大値) 60minまで?
modifyAckDeadlineリクエストを定期的に発行すればよいらしいメッセージの再試行を強制するには
•nack リクエストを送信 •高レベルのクライアント ライブラリを使用していない場合は、ackDeadlineSeconds を0に設定して modifyAckDeadline リクエストを送信するexactly once
1 回限りの配信 | Pub/Sub ドキュメント | Google Cloud •pullなら設定できる。他には、Cloud Dataflowを組み合わせる(プログラムコードでDataflowを使う感じかり、あるいはmessageについているunique idを利用して、KVS を用いたステート管理をして自前で重複を排除する •再配信は、メッセージに対してクライアントによる否定確認応答が行われた場合、または確認応答期限が切れる前にクライアントが確認応答期限を延長しな かった場合のいずれかか原因で発生することがある。 ※exactly onceはエラーでも再配信でPubsubパニックしないようにしたいために使うものではない?pubsubはトピックにPublishされたメッセージをDataflowに引き継げる
Dataflow (Apache Beam) を大量のメッセージをバッチ処理する場合に使える Pub/Sub→Dataflow→処理 •Apache Beamのウィンドウ処理とセッション分析とコネクタのエコシスエムがある •メッセージ重複の削除ができる •pubsub>dataflow>BQやGCS: この流れでログ等をストーリミングで入れ込めるBQサブスクリプション (PubSubはBigQuery Storage Write API を使用してデータを BigQueryテーブルに送信、GCSサブスクもある) Langganan BigQuery | Dokumentasi Pub/Sub | Google Cloud BigQuery サブスクリプションの作成 | Pub/Sub ドキュメント | Google CloudサブスクライバーApp側のコードでのフロー制御によりちょっと待てよのトラフィック急増対応 フロー制御を使用して一時的な急増を処理する | Pub/Sub ドキュメント | Google Cloudデッドレタートピック (配信試行回数が見れる)やエラーでの再配信 メッセージ エラーの処理 | Pub/Sub ドキュメント | Google Cloud • Pub/Subサブスクリプションにデッドレタートピックを設定しておくと、一定の回数再送信が失敗したメッセージの宛先がデッドレタートピックに変更され貯められるメッセージのフィルタ、同時実行制御により多いメッセージに対応 サブスクリプションからのメッセージをフィルタする | Pub/Sub ドキュメント | Google CloudPubsubをローカルでエミュレートする エミュレータを使用したローカルでのアプリのテスト | Pub/Sub ドキュメント | Google Cloudpubsubのスナップショットやリテンションクイックスタート: スナップショットまたはタイムスタンプまでシークして Pub/Sub でメッセージを再生する | Pub/Sub ドキュメント | Google Cloudトピックにリテンションを設定しスナップショット作成> 過去のサブスクしたメッセは見えなさそうサブスクにリテンションを設定しスナップショット作成> 過去のAckしたメッセは見えなさそうスナップショットでどう使うのか? cloud pubsubで配信済みのメッセージを再送する #PubSub - Qiita キューがたまっているときに撮るものと思われる。またシーク時間のポイントを設定する意味がある スナップショットとシークを使いこなして特定期間の再実行を行う機能 スナップショットで再実行する シークは指定時間か最後のスナップショット以降のサブスク再実行(実際pushでrunが再実行された)Pubsubにどんなメッセージが入ってきているか確認する方法 pull形式ならAckしなければpullボタンで拾い見れる (トピックでパブリッシュしてサブスクでPull し見る) トラブルシュートはログを見るかデッドレタートピックかGCSバックアップを見る?デッドレターキュー(ドロップしたものの確認と救済?) サブスクでDLQのONしデッドレタートピックを設定し転送する>GCSにもバックアップできる DLTでメッセージ(実行済みOR未実行)の再生データ形式:スキーマを使うか、スキーマなしならdataで取得できる トピックのスキーマを作成する | Pub/Sub ドキュメント | Google Cloud Cloud Pub/Subの概要とPythonでの実践 - case-kの備忘録from google cloud import pubsub_v1from avro.io import DatumReader, BinaryDecoderfrom avro schema import Parseproject_id="your-project-id"subscription id="your-subscription-id"subscriber pubsub_v1.SubscriberClient()subscription_path = subscriber.subscription_path(project_id, subscription_id)avro_schema = Parse("""{"type": "record","name": "Avro"."fields": [{"name": "ProductName","type": "string","default":""},{"name": "SKU","type": "int","default": 0}}def callback(message): print(f"Received message: {message}") reader = DatumReader(avro_schema) decoder = Binary Decoder (message.data) avro_record = reader.read(decoder) message_id=message.message id message.ack() print("Message ID: (message_id}") product_name = avro_record['ProductName'] sku= avro_record['SKU'] print("Product Name: (product_name}") print("SKU: (sku}")subscriber.subscribe(subscription_path, callback=callback)
def callback(message): print("Received message: (message)") data message data message_id=message.message_id message.ack() print("Date (data)") print("Message ID: (message_id)")
Pub/SubでStreamingPull APIを使用してメッセージをリアルタイムで処理する - G-gen Tech Blog
StreamingPull API を使用するとアプリとの間で永続的な双方向接続が維持され、Pub/Sub でメッセージが利用可能になるとすぐに pullされる。1 つの pull リクエストで 1 つの pull レスポンスが返る通常の 単項 Pull と比較すると、高スループット・低レイテンシ。必要なメッセージを残す処理をしたりも?GCP側の問題であっても通信が切れた場合は別サーバに繋ぎなおすためmodifyAckDeadlineも切れ再配信されるバグがある
+++メッセージのTTL (Time-To-Live) はメッセージ保持期間(message retention duration) に依存メッセージが TTLを超えると、自動的に削除され、Subscriberが受信できなくなるackDeadlineSeconds (デフォルトは10秒、最大600秒) を超えたACKのメッセージは再配信されますが、TTL期限を超えた場合は消える#TTLを最大7日間に設定gcloud pubsub subscriptions update my-subscription message-retention-duration=604800s
DLQ (Dead Letter Queue)
Subscriberが指定回数(最大100回) メッセージのACKを行わなかった場合に、メッセージを隔離する仕組みDLQもサブスクなので期間やTTL設定方法は同じ
#DLQ topic 作成gcloud pubsub topics create my-dlq-topic
#5回失敗したらDLQへgcloud pubsub subscriptions update my-subscription dead-letter-topic=projects/my-project/topics/my-diq-topic max-delivery-attempts=5
#DLQ subsc作成gcloud pubsub subscriptions create my-diq-subscription--topic-my-diq-topic
#サブスクの詳細確認gcloud pubsub subscriptions describe my-diq-subscription
#DLQメッセージの確認、-auto-ackも付けられるが、gcloud pubsub subscriptions pull my-dlq-subscription -limit=10 [{ "ackld": "Y3g49NfY...=", "message": { "data": "SGVsbG8gd29ybGQ=", #Base64 エンコードされたデータ "messageld": "1234567890", "publish Time": "2024-02-18T12:34:56.789Z" } }]
#base64のでコードが必要echo "SGVsbG8gd29ybGQ=" | base64-decode
#ack-idによりackを返しDLQメッセージを削除gcloud pubsub subscriptions acknowledge my-diq-subscription--ack-ids=Y3g49NfFY
モニタリング > アラートポリシーから新しいアラートを作成しpubsub.subscription.outstanding_messages を監視対象に選択し、閾値を設定するとよい
#DLQ メッセージの再処理をfunctionsに設定 (トピックに入れなおす)from google.cloud import pubsub_v1publisher = pubsub_v1.PublisherClient()topic_path = publisher.topic_path("my-project", "my-topic")
def republish_message(message): future = publisher.publish(topic_path, message.data) print(f"Republished message ID: {future.result()}")
subscriber = pubsub_v1.SubscriberClient()subscription_path = subscriber.subscription_path("my-project", "my-dlq-subscription")
def callback(message): print(f"Received message: {message.data}") republish_message(message) message.ack()
subscriber.subscribe(subscription_path, callback=callback)
/// BANGBOO BLOG /// - GCP runs off functions pubsub on scheduler
Comment (0)
Navi: < 1 | 2 | 3 | 4 >
-Home
-Column [133]
-Europe [9]
-Gadget [77]
-Web [136]
-Bike [4]
@/// BANGBOO BLOG ///

