■24/6/21 11:00PM
BT
あそびはここで終わりにしようぜ~
Big TableCloud Bigtableを触ってみよう - Uzabase for Engineersでっかいテーブル、読み書き低レイテンシー、RDBは負荷高いときにレプ数位でスケールが難しいがBTはするので正規化せずに単一テーブルにしておく感じrow keyが主役データを追加するのに3パターンある(行追加、列追加、セル追加) 行に複数カラムファミリーにカラムが幾つか入れられるのでKVSだが結局Where句みたいに使う? 行キー「企業ID#日付」,COLUMN FAMILY「STOCK PRICE」,COLUMN「HI PRICE」「LO PRICE」に対してJSONデータを入れておく等 時間はバージョン管理として持っている 複雑な条件は無理でデータを事前整理して入れておき、JSONカラムを使ったりで一行にまとめスキャンを一発で済ます等で高スループットのみ Google検索のようにキーワードを入れると、検索結果が数多く一瞬で返る等 複雑な条件はDataprocを使うらしい
Big table構成Bigtableを徹底解説! - G-gen Tech Blogインスタンスの中に一つ以上のクラスタ(ゾーン別に設定しレプリケーション)> 各クラスタには1つ以上の同数のノード クラスタに table > 複数Column family > 複数Column > セルbigtable_app_profilesで転送クラスタ先の設定する(単一行トランザクション設定を含む) -マルチクラスタ(自動フェイルオーバ、単一行transaction不可でレプリケーションによる不整合あり) -シングルクラスタ(手動フェイルオーバ、一行transaction) デフォルトをマルチにして、通常のクラスタ転送をシングル、問題があるときだけアプリで判定しマルチに行くBigtableで複数クラスタ構成におけるデータ整合性の保証 - Carpe Diem (hatenablog.com)
スキーマ: テーブル 行キー(row key) カラムファミリー(カページコレクションポリシーを含む) カラム更新したデータはタイムスタンプによりセル内で保存される 解消するにはガベージコレクション 期限切れ値、バージョン数で設定する
仕様:KVS、行指向の行単位でスキャン各テーブルのインデックス (行キー)は1つのみで一意である必要がある行は、行キーの辞書順に並べ替えられます。列は、列ファミリー別にグループ化され、列ファミリー内で辞書順に並べ替えられます列ファミリーは特定の順序では保存されません集計列ファミリーには集計セルが含まれます行レベルでアトミック (複数行だと知らんという意) アトミック性:トランザクション整合性がある(一部の操作だけ実行した状態とならずに)
特定の行にread/writeが集中するより分散が良いBigtable のテーブルはスバース、空白行での消費はない
cbt CLI の概要 | Bigtable Documentation | Google Cloudcbt リファレンス | Bigtable Documentation | Google Cloudgcloud components updategcloud components install cbt(-/cbtrcに以下記載すれば-projectと-instance はデフォルト値で省略できる)cd ~echo project unco > ~/.cbtrcecho instance = chinco >> ~/.cbtrccbt -project unco listinstancescbt -instance chinco listclusterscbt -project unco -instance chinco ls | grep kuso-t テーブル名取得cht -project unco -instance chinco ls kuso-table カラムファミリやポリシー等取得cbt -project unco -instance chinco deletefamily kuso-table shikko-familycbt -project unco -instance chinco deletetable kuso-table テーブルを消せばカラムファミリも削除になる
Comment (0)
■24/6/1 3:24PM
GCP hands-off 3
■Cloud SQL Python Connector (Cloud SQL language Connector)CloudSQL proxyでないやり方、簡単Cloud SQL 言語コネクタの概要 | Cloud SQL for MySQL | Google Cloud
GitHub - GoogleCloudPlatform/cloud-sql-python-connector: A Python library for connecting securely to your Cloud SQL instances.
事前必要 pip install Flask mysql-connector-pythonimport mysql.connectordb_config = { 'host': 'localhost', 'user': 'your_username', 'password': 'your_password', 'database': 'your_database'}def items(): #データベースの返りをdictで取得 connection = mysql.connector.connect(**db_config) cursor = connection.cursor(dictionary=True) cursor.execute("SELECT COUNT(*) AS total FROM item") #単一カラムのとき total_items = cursor.fetchone()['total'] cursor.execute("SELECT FROM item") items = cursor.fetchall() cursor.close() connection.close()
↓コネクションプールを使うSQLAlchemy が良い?
from flask import Flask from flask_sqlalchemy import SQLAlchemy from google.cloud.sql.connector import Connector
# initialize Python Connector object connector = Connector()
#Python Connector database connection functiondef getconn(): conn = connector.connect( "project region instance-name", # Cloud SQL Instance Connection Name "pymysql", user="my-user", password="my-password", db="my-database", ip_type="public" # "private" for private IP ) return conn
app Flask(name)
#configure Flask-SQLAlchemy to use Python Connectorapp.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://"app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {"creator": getconn}
# initialize the app with the extensiondb = SQLAlchemy()db.init_app(app)
下記のオプションも使えるconnector = Connector( ip_type="public", # can also be "private" or "psc" enable_iam_auth=False, timeout=30, credentials=custom_creds, #google.auth credentials.Credentials refresh_strategy="lazy", # can be "lazy" or "background")
■Cloud SQL MySQL設定ロールは Cloud SQL 管理者 (roles/cloudsql.admin)、Cloud SQL インスタンスユーザー (roles/cloudsql.instance User)等のIAM?Cloud SQLを徹底解説! - G-gen Tech Blog【開発環境】mysql_dbsoEnterprise/Sandbox/AsiaNorthEast1 (Tokyo) / Single zoneMySQL ver 8.4Shared core / 1cpu 0.6GB/HDD/10GB(auto increase) PrivatelP/設定にvpcnwが必要/Enable private pathAuto daily backup 7days (1-5AM) / Enable point-in-time recoveryWeek1 sun 0-1am/ Enable query insightsPW: x【本番環境】Enterprise plus? キャッシュ使う?
データベースフラグ (confが直接変更できなためフラグとしてパラメータを渡せる)データベース フラグを構成する | Cloud SQL for MySQL | Google CloudCloud SQL studio (コンソールでMySQLが使える) MySQLクライアントを使いたいならAuth proxyが必要HA構成だとフェールオーバーやリードレプリカ等が使える
●Cloud SQLが内部IPだとサーパレスVPCコネクタ、or 外部IPならSQL+auth proxy内部IPで良いのでVPCを作る、CloudSQLを内部IPで作るサーバレスVPCコネクタを作るファイアウォールルールでポート (デフォルトで3306など)を開放Cloud Run のNW設定で、サーバーレス VPC コネクタを選択、ルートオプションとしてすべてのトラフィックをVPC コネクタ経由で送信を選択
■対象アセットに対する付与可能なロールの一覧表示リソースに対して付与可能なロールの表示 | IAM Documentation | Google CloudFull Resource Name(フルでのアセット名を探せる)Asset names | Cloud Asset Inventory Documentation | Google Cloud
import google.authimport googleapiclient.discovery
def view_grantable_roles(full_resource_name: str) -> None: credentials.google.auth.default( scopes=["https://www.googleapis.com/auth/cloud-platform"] ) service = googleapiclient.discovery.build('iam', 'v1', credentials credentials) roles = ( service roles() queryGrantableRoles (body=["fullResourceName": full_resource_name}).execute() ) for role in roles["roles"] if "title" in role: print("Title: role["title"]) print("Name: role["name"]) if "description" in role: print("Description:" + role["description"]) print("")
project_id = "prj"#resource = f"//bigquery.googleapis.com/projects/prj/datasets/ds"#resource + f"//bigquery googleapis.com/projects/prj/datasets/ds/tables/tbl"resource = f"//cloudresourcemanager.googleapis.com/projects/{project_id}"view_grantable_roles(resource)
■ロールの一覧表示https://cloud.google.com/iam/docs/roles-overview?hl=ja#role-types1)事前定義ロールの場合は roles.get() を使用します。2)プロジェクトレベルのカスタムロールの場合は、projects.roles.get() を使用します。3)組織レベルのカスタムロールの場合は、organizations.roles.get() を使用します。 これら3種類で全てを網羅すると思われます projectIDがsys-のものはGAS、lifecycleStateがACTIVE以外のものも含まれるので注意
■bqへの書き込み
export GOOGLE_APPLICATION_CREDENTIALS="path/to/your-service-account-key.json"pip install google-cloud-bigquery
from google.cloud import bigqueryclient = bigquery Client()#書き込み先のテーブル情報table_ref = f"{project_id}.{dataset_id}.{table_id}"
#サンプルデータの生成def generate_sample_data(num_rows) data = [ { "organization": f"org_(num_rows)", "permission". "view", } for _ in range(num_rows) ] return data
data_to_insert = generate_sample_data(5000)errors = client.insert_rows_json(table_ref, data_to_insert)
if errors: print("Errors occurred: {errors}")else: print("Data successfully written to BigQuery!")
■データカタログデータアセットを検索する | Data Catalog Documentation | Google Cloud
Class SearchCatalogRequest (3.23.0) | Python client library | Google Cloud
サンプルで仕様書のAPIを使っているがqueryが空白刻みで入れる等の使い方が分かる
■BQスキーマ+ポリシータグ取得
from google.cloud import bigquerydef get_policy_tags_from_bq_table(project_id, dataset_id, table_id): print("################ bigquery.Client.get_table().schema start ################") print(f"Target table: {project_id}.{dataset_id}.{table_id}") bq_client = bigquery.Client() table = bq_client.get_table(f"{project_id}.{dataset_id}.{table_id}") schema = table.schema policy_tags = [] for field in schema: print(f"Column: {field.name}") if field.policy_tags: tags = [tag for tag in field.policy_tags.names] policy_tags.extend(tags) print(f"Policy Tags: {tags}") else: print("> No Policy Tags assigned.") return policy_tags
PROJECT_ID = "prj"DATASET_ID = "ds"TABLE_ID = "test001"
policy_tags = get_policy_tags_from_bq_table(PROJECT_ID, DATASET_ID, TABLE_ID)print("Collected Policy Tags:", policy_tags)
■ポリシータグ設定
from google.cloud import datacatalog_v1from google.cloud import bigquery
PROJECT_ID = "prj"DATASET_ID = "ds"TABLE_ID = "tbl01"COLUMN_NAME = "aaa"POLICY_TAG_PROJECT = "prj"POLICY_TAG_NAME = "projects/prj/locations/us/taxonomies/83893110/policyTags/11089383"
def list_taxonomy_and_policy_tag(): print("############# Start #############") list_policy_tags = [] client = datacatalog_v1.PolicyTagManagerClient() request = datacatalog_v1.ListTaxonomiesRequest( parent=f"projects/{POLICY_TAG_PROJECT}/locations/us" ) try: page_result = client.list_taxonomies(request=request) except google.api_core.exceptions.PermissionDenied as e: print(f"Skipping project {POLICY_TAG_PROJECT} due to PermissionDenied error: {e}") return [] except Exception as e: print(f"An error occurred for project {POLICY_TAG_PROJECT}: {e}") return []
for taxonomy in page_result: print(f"############ Taxonomy display_name: {taxonomy.display_name} #############") print(f"############ Taxonomy name: {taxonomy.name} #############") request_tag = datacatalog_v1.ListPolicyTagsRequest(parent=taxonomy.name) try: page_result_tag = client.list_policy_tags(request=request_tag) except Exception as e: print(f"Error on {request_tag}: {e}") break for policy_tag in page_result_tag: print("Policy tag:") print(policy_tag) list_policy_tags.append({ "project_id": POLICY_TAG_PROJECT, "taxonomy_display_name": taxonomy.display_name, "taxonomy_name": taxonomy.name, "policy_tag_name": policy_tag.name, "policy_tag_display_name": policy_tag.display_name, }) return list_policy_tags
def update_table_schema_with_policy_tag(list_policy_tags): for policy_tag in list_policy_tags: if policy_tag['policy_tag_name'] == POLICY_TAG_NAME: print( f"Target policy tag:\n" f" Project ID: {policy_tag['project_id']}\n" f" Taxonomy Display Name: {policy_tag['taxonomy_display_name']}\n" f" Taxonomy Name: {policy_tag['taxonomy_name']}\n" f" Policy Tag Name: {policy_tag['policy_tag_name']}\n" f" Policy Tag Display Name: {policy_tag['policy_tag_display_name']}" ) client = bigquery.Client() table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" table = client.get_table(table_ref) new_schema = [] for field in table.schema: if field.name == COLUMN_NAME: new_schema.append( bigquery.SchemaField( name=field.name, field_type=field.field_type, # Keep original field type mode=field.mode, # Keep original mode description=field.description, policy_tags=bigquery.PolicyTagList([POLICY_TAG_NAME]), ) ) else: new_schema.append(field) table.schema = new_schema updated_table = client.update_table(table, ["schema"]) print( f"Updated table {updated_table.project}.{updated_table.dataset_id}.{updated_table.table_id} schema\n" f"with policy_tag {POLICY_TAG_NAME} on the column {COLUMN_NAME} successfully." )if __name__ == "__main__": list_policy_tags = list_taxonomy_and_policy_tag() update_table_schema_with_policy_tag(list_policy_tags)
■Workload identity federation(GCP外との連携)まずWIF用のSAを作成する>SAに権限を付与する>1)Workload identity provider+SAの情報をgithub actionに埋めて使う
GitHub Actions から GCP リソースにアクセスする用途2)Workload identity poolから構成情報をDLしAWSアプリに埋めて使う
AWSからGCP リソースにアクセスする用途 gcloud auth login-cred-file=構成情報ファイルパス gcloud CLI を承認する | Google Cloud CLI Documentation3)Workload identity poolから構成情報をEKSのOIDC ID token のパスを指定しDL
EKS から GCP リソースにアクセスする用途- EKSのマニフェストのサービスアカウントのアノテーションにIAMロールを記載- EKSのサービスアカウントを使用したい Podのアノテーションに追加- マウント先のパスを環境変数 GOOGLE APPLICATION_CREDENTIALS に設定- Pod内でSDK またはコマンドにてGCP リソースヘアクセス可能か確認
Comment (0)
■24/5/9 12:00AM
Pubsub
■pubsubPublisher app → |GCPの壁| Topic(Schema) → Subscription 1や2 |GCPの壁| → Subscriber app
サブスクライバーappにPull/PushさせるPull/Pushのサブスクリプションをトピックに紐づける設定をしておく
[Click for image]
【図解付き】Cloud Pub/Subに概要や使い方についてわかりやすく解説 - KIYONO Engineer Blog (kiyono-co.jp)Pub/Sub サービスの概要 | Pub/Sub ドキュメント | Google CloudGCPのCloud PubSubで考慮すること - Carpe Diem (hatenablog.com)Pub/Sub の割り当てと上限 | Pub/Sub ドキュメント | Google Cloudアプリで簡単にPubsubにパブリッシュや、サブスクもできるので、アプリ間の連携にPubsubが使える • 非同期処理(画像処理とか重めのもの • IDの種類 (message id, subscription id, topic id, ack id, project idあたりがアプリでは使われるっぽい ※ack idはpull時のみでPushのときはhttpステータスコードが200でackとなるGCP - Pub/Sub サービス概要 #GoogleCloud - QiitaPub/Sub メッセージの作成とレスポンス | Python 用 App Engine フレキシブル環境に関するドキュメント | Google Cloudトピック(メッセージのパブリッシュ先)
• スキーマ/外部アクセス許可/リテンション/GCS/バックアップの設定がある (Push/Pullの設定はない) • パブリッシュ側のベストプラクティス (JWT) Pub/Sub トピックにパブリッシュするためのベスト プラクティス | Pub/Sub ドキュメント | Google CloudサブスクライバのPushとPull (PushはEndpointが必要、デフォルトはpull) GCP - Pub/Sub サービス概要 #GoogleCloud - Qiita • at-least-once (少なくとも1回) 配信を提供します • 同じ順序指定キーを持ち、同じリージョンに存在している場合は、メッセージの順序指定を有効にできます • サブスクライバーが31日間未使用、またはサブスクリプションが未更新の場合、サブスクリプションは期限切れ • メッセージ数が多いとpull向き サブスクリプション タイプを選択する | Pub/Sub ドキュメント | Google Cloudpushはhttpsが必要? push サブスクリプションを作成する | Pub/Sub ドキュメント | Google Cloud • push エンドポイントのサーバーには、認証局が署名した有効な SSL証明書が必要でhttps • Cloud run でEvent Arcを設定するとサブスクが自動作成されrunのデフォルトhttpsのURLが使われるが、これはPullよりPushで安定した • CronバッチならPullで安定するのでは?大量リクエストはPull向きとある(Pullは失敗処理込みの話かも知れん)トピックのリテンション:デフォルトなし、最小値:10分、最大値:31日サブスクのリテンション:デフォルト値:7日、最小值:10分、最大値:7日 サブスクリプション プロパティ | Pub/Sub ドキュメント | Google Cloudpubsub ack期限(Ack Deadline)
•デフォルト60秒> 設定10分>ack延長で最大1時間まで伸ばせると思われる リース管理で確認時間を延長する | Pub/Sub ドキュメント | Google Cloud •exactly onceを設定しなければ期限の延長は保証されない •ack期限を過ぎる、あるいはNackを返す場合、メッセージは再配送される •ack応答期限の延長は99パーセンタイル(上位1%の値よりも小さい値のうち最大の値)で modifyAckDeadlineを返し、延長してもMaxExtension (ack期限を延長 する最大値) 60minまで?
modifyAckDeadlineリクエストを定期的に発行すればよいらしいメッセージの再試行を強制するには
•nack リクエストを送信 •高レベルのクライアント ライブラリを使用していない場合は、ackDeadlineSeconds を0に設定して modifyAckDeadline リクエストを送信するexactly once
1 回限りの配信 | Pub/Sub ドキュメント | Google Cloud •pullなら設定できる。他には、Cloud Dataflowを組み合わせる(プログラムコードでDataflowを使う感じかり、あるいはmessageについているunique idを利用して、KVS を用いたステート管理をして自前で重複を排除する •再配信は、メッセージに対してクライアントによる否定確認応答が行われた場合、または確認応答期限が切れる前にクライアントが確認応答期限を延長しな かった場合のいずれかか原因で発生することがある。 ※exactly onceはエラーでも再配信でPubsubパニックしないようにしたいために使うものではない?pubsubはトピックにPublishされたメッセージをDataflowに引き継げる •Apache Beamのウィンドウ処理とセッション分析とコネクタのエコシスエムがある •メッセージ重複の削除ができる •pubsub>dataflow>BQやGCS: この流れでログ等をストーリミングで入れ込めるBQサブスクリプション (PubSubはBigQuery Storage Write API を使用してデータを BigQueryテーブルに送信、GCSサブスクもある) Langganan BigQuery | Dokumentasi Pub/Sub | Google Cloud BigQuery サブスクリプションの作成 | Pub/Sub ドキュメント | Google CloudサブスクライバーApp側のコードでのフロー制御によりちょっと待てよのトラフィック急増対応 フロー制御を使用して一時的な急増を処理する | Pub/Sub ドキュメント | Google Cloudデッドレタートピック (配信試行回数が見れる)やエラーでの再配信 メッセージ エラーの処理 | Pub/Sub ドキュメント | Google Cloud • Pub/Subサブスクリプションにデッドレタートピックを設定しておくと、一定の回数再送信が失敗したメッセージの宛先がデッドレタートピックに変更され貯められるメッセージのフィルタ、同時実行制御により多いメッセージに対応 サブスクリプションからのメッセージをフィルタする | Pub/Sub ドキュメント | Google CloudPubsubをローカルでエミュレートする エミュレータを使用したローカルでのアプリのテスト | Pub/Sub ドキュメント | Google Cloudpubsubのスナップショットやリテンションクイックスタート: スナップショットまたはタイムスタンプまでシークして Pub/Sub でメッセージを再生する | Pub/Sub ドキュメント | Google Cloudトピックにリテンションを設定しスナップショット作成> 過去のサブスクしたメッセは見えなさそうサブスクにリテンションを設定しスナップショット作成> 過去のAckしたメッセは見えなさそうスナップショットでどう使うのか? cloud pubsubで配信済みのメッセージを再送する #PubSub - Qiita キューがたまっているときに撮るものと思われる。またシーク時間のポイントを設定する意味がある スナップショットとシークを使いこなして特定期間の再実行を行う機能 スナップショットで再実行する シークは指定時間か最後のスナップショット以降のサブスク再実行(実際pushでrunが再実行された)Pubsubにどんなメッセージが入ってきているか確認する方法 pull形式ならAckしなければpullボタンで拾い見れる (トピックでパブリッシュしてサブスクでPull し見る) トラブルシュートはログを見るかデッドレタートピックかGCSバックアップを見る?デッドレターキュー(ドロップしたものの確認と救済?) サブスクでDLQのONしデッドレタートピックを設定し転送する>GCSにもバックアップできる DLTでメッセージ(実行済みOR未実行)の再生データ形式:スキーマを使うか、スキーマなしならdataで取得できる トピックのスキーマを作成する | Pub/Sub ドキュメント | Google Cloud Cloud Pub/Subの概要とPythonでの実践 - case-kの備忘録from google cloud import pubsub_v1from avro.io import DatumReader, BinaryDecoderfrom avro schema import Parseproject_id="your-project-id"subscription id="your-subscription-id"subscriber pubsub_v1.SubscriberClient()subscription_path = subscriber.subscription_path(project_id, subscription_id)avro_schema = Parse("""{"type": "record","name": "Avro"."fields": [{"name": "ProductName","type": "string","default":""},{"name": "SKU","type": "int","default": 0}}def callback(message): print(f"Received message: {message}") reader = DatumReader(avro_schema) decoder = Binary Decoder (message.data) avro_record = reader.read(decoder) message_id=message.message id message.ack() print("Message ID: (message_id}") product_name = avro_record['ProductName'] sku= avro_record['SKU'] print("Product Name: (product_name}") print("SKU: (sku}")subscriber.subscribe(subscription_path, callback=callback)
def callback(message): print("Received message: (message)") data message data message_id=message.message_id message.ack() print("Date (data)") print("Message ID: (message_id)")
Pub/SubでStreamingPull APIを使用してメッセージをリアルタイムで処理する - G-gen Tech Blog
StreamingPull API を使用するとアプリとの間で永続的な双方向接続が維持され、Pub/Sub でメッセージが利用可能になるとすぐに pullされる。1 つの pull リクエストで 1 つの pull レスポンスが返る通常の 単項 Pull と比較すると、高スループット・低レイテンシ。必要なメッセージを残す処理をしたりも?GCP側の問題であっても通信が切れた場合は別サーバに繋ぎなおすためmodifyAckDeadlineも切れ再配信されるバグがある
/// BANGBOO BLOG /// - GCP runs off functions pubsub on scheduler
Comment (0)
Navi: 1 | 2 | 3 | 4 >
-Home
-Column [128]
-Europe [9]
-Gadget [77]
-Web [133]
-Bike [4]
@/// BANGBOO BLOG ///