■Cloud SQL Python Connector (Cloud SQL language Connector)
CloudSQL proxyでないやり方、簡単
Cloud SQL 言語コネクタの概要 | Cloud SQL for MySQL | Google Cloud
GitHub - GoogleCloudPlatform/cloud-sql-python-connector: A Python library for connecting securely to your Cloud SQL instances.
GitHub - GoogleCloudPlatform/cloud-sql-python-connector: A Python library for connecting securely to your Cloud SQL instances.
事前必要 pip install Flask mysql-connector-python
import mysql.connector
db_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
def items():
#データベースの返りをdictで取得
connection = mysql.connector.connect(**db_config)
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT COUNT(*) AS total FROM item")
#単一カラムのとき
total_items = cursor.fetchone()['total']
cursor.execute("SELECT FROM item")
items = cursor.fetchall()
cursor.close()
connection.close()
↓
↓
コネクションプールを使うSQLAlchemy が良い?
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from google.cloud.sql.connector import Connector
# initialize Python Connector object
connector = Connector()
#Python Connector database connection function
def getconn():
conn = connector.connect(
"project region instance-name", # Cloud SQL Instance Connection Name
"pymysql", user="my-user", password="my-password", db="my-database",
ip_type="public" # "private" for private IP
)
return conn
app Flask(name)
#configure Flask-SQLAlchemy to use Python Connector
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://"
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {"creator": getconn}
# initialize the app with the extension
db = SQLAlchemy()
db.init_app(app)
下記のオプションも使える
connector = Connector(
ip_type="public", # can also be "private" or "psc"
enable_iam_auth=False,
timeout=30,
credentials=custom_creds, #google.auth credentials.Credentials
refresh_strategy="lazy", # can be "lazy" or "background"
)
■Cloud SQL MySQL設定
ロールは Cloud SQL 管理者 (roles/cloudsql.admin)、Cloud SQL インスタンスユーザー (roles/cloudsql.instance User)等のIAM?
【開発環境】mysql_dbso
Enterprise/Sandbox/AsiaNorthEast1 (Tokyo) / Single zone
MySQL ver 8.4
Shared core / 1cpu 0.6GB/HDD/10GB(auto increase)
PrivatelP/設定にvpcnwが必要/Enable private path
Auto daily backup 7days (1-5AM) / Enable point-in-time recovery
Week1 sun 0-1am/ Enable query insights
PW: x
【本番環境】
Enterprise plus? キャッシュ使う?
データベースフラグ (confが直接変更できなためフラグとしてパラメータを渡せる)
Cloud SQL studio (コンソールでMySQLが使える)
MySQLクライアントを使いたいならAuth proxyが必要
HA構成だとフェールオーバーやリードレプリカ等が使える
●Cloud SQLが内部IPだとサーパレスVPCコネクタ、or 外部IPならSQL+auth proxy
内部IPで良いのでVPCを作る、CloudSQLを内部IPで作る
サーバレスVPCコネクタを作る
ファイアウォールルールでポート (デフォルトで3306など)を開放
Cloud Run のNW設定で、サーバーレス VPC コネクタを選択、ルートオプションとしてすべてのトラフィックをVPC コネクタ経由で送信を選択
■対象アセットに対する付与可能なロールの一覧表示
■対象アセットに対する付与可能なロールの一覧表示
Full Resource Name(フルでのアセット名を探せる)
import google.auth
import googleapiclient.discovery
def view_grantable_roles(full_resource_name: str) -> None:
credentials.google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
service = googleapiclient.discovery.build('iam', 'v1', credentials credentials)
roles = (
service roles()
queryGrantableRoles (body=["fullResourceName": full_resource_name}).execute()
)
for role in roles["roles"]
if "title" in role:
print("Title: role["title"])
print("Name: role["name"])
if "description" in role:
print("Description:" + role["description"])
print("")
project_id = "prj"
#resource = f"//bigquery.googleapis.com/projects/prj/datasets/ds"
#resource + f"//bigquery googleapis.com/projects/prj/datasets/ds/tables/tbl"
resource = f"//cloudresourcemanager.googleapis.com/projects/{project_id}"
view_grantable_roles(resource)
■ロールの一覧表示
■ロールの一覧表示
https://cloud.google.com/iam/docs/roles-overview?hl=ja#role-types
1)事前定義ロールの場合は roles.get() を使用します。
2)プロジェクトレベルのカスタムロールの場合は、projects.roles.get() を使用します。
3)組織レベルのカスタムロールの場合は、organizations.roles.get() を使用します。
これら3種類で全てを網羅すると思われます
projectIDがsys-のものはGAS、lifecycleStateがACTIVE以外のものも含まれるので注意
■bqへの書き込み
■bqへの書き込み
export GOOGLE_APPLICATION_CREDENTIALS="path/to/your-service-account-key.json"
pip install google-cloud-bigquery
from google.cloud import bigquery
client = bigquery Client()
#書き込み先のテーブル情報
table_ref = f"{project_id}.{dataset_id}.{table_id}"
#サンプルデータの生成
def generate_sample_data(num_rows)
data = [
{
"organization": f"org_(num_rows)",
"permission". "view",
}
for _ in range(num_rows)
]
return data
data_to_insert = generate_sample_data(5000)
errors = client.insert_rows_json(table_ref, data_to_insert)
if errors:
print("Errors occurred: {errors}")
else:
print("Data successfully written to BigQuery!")
■データカタログ
データアセットを検索する | Data Catalog Documentation | Google Cloud
Class SearchCatalogRequest (3.23.0) | Python client library | Google Cloud
サンプルで仕様書のAPIを使っているがqueryが空白刻みで入れる等の使い方が分かる
■BQスキーマ+ポリシータグ取得
■ポリシータグ設定
Class SearchCatalogRequest (3.23.0) | Python client library | Google Cloud
サンプルで仕様書のAPIを使っているがqueryが空白刻みで入れる等の使い方が分かる
■BQスキーマ+ポリシータグ取得
from google.cloud import bigquery
def get_policy_tags_from_bq_table(project_id, dataset_id, table_id):
print("################ bigquery.Client.get_table().schema start ################")
print(f"Target table: {project_id}.{dataset_id}.{table_id}")
bq_client = bigquery.Client()
table = bq_client.get_table(f"{project_id}.{dataset_id}.{table_id}")
schema = table.schema
policy_tags = []
for field in schema:
print(f"Column: {field.name}")
if field.policy_tags:
tags = [tag for tag in field.policy_tags.names]
policy_tags.extend(tags)
print(f"Policy Tags: {tags}")
else:
print("> No Policy Tags assigned.")
return policy_tags
PROJECT_ID = "prj"
DATASET_ID = "ds"
TABLE_ID = "test001"
policy_tags = get_policy_tags_from_bq_table(PROJECT_ID, DATASET_ID, TABLE_ID)
print("Collected Policy Tags:", policy_tags)
■ポリシータグ設定
from google.cloud import datacatalog_v1
from google.cloud import bigquery
PROJECT_ID = "prj"
DATASET_ID = "ds"
TABLE_ID = "tbl01"
COLUMN_NAME = "aaa"
POLICY_TAG_PROJECT = "prj"
POLICY_TAG_NAME = "projects/prj/locations/us/taxonomies/83893110/policyTags/11089383"
def list_taxonomy_and_policy_tag():
print("############# Start #############")
list_policy_tags = []
client = datacatalog_v1.PolicyTagManagerClient()
request = datacatalog_v1.ListTaxonomiesRequest(
parent=f"projects/{POLICY_TAG_PROJECT}/locations/us"
)
try:
page_result = client.list_taxonomies(request=request)
except google.api_core.exceptions.PermissionDenied as e:
print(f"Skipping project {POLICY_TAG_PROJECT} due to PermissionDenied error: {e}")
return []
except Exception as e:
print(f"An error occurred for project {POLICY_TAG_PROJECT}: {e}")
return []
for taxonomy in page_result:
print(f"############ Taxonomy display_name: {taxonomy.display_name} #############")
print(f"############ Taxonomy name: {taxonomy.name} #############")
request_tag = datacatalog_v1.ListPolicyTagsRequest(parent=taxonomy.name)
try:
page_result_tag = client.list_policy_tags(request=request_tag)
except Exception as e:
print(f"Error on {request_tag}: {e}")
break
for policy_tag in page_result_tag:
print("Policy tag:")
print(policy_tag)
list_policy_tags.append({
"project_id": POLICY_TAG_PROJECT,
"taxonomy_display_name": taxonomy.display_name,
"taxonomy_name": taxonomy.name,
"policy_tag_name": policy_tag.name,
"policy_tag_display_name": policy_tag.display_name,
})
return list_policy_tags
def update_table_schema_with_policy_tag(list_policy_tags):
for policy_tag in list_policy_tags:
if policy_tag['policy_tag_name'] == POLICY_TAG_NAME:
print(
f"Target policy tag:\n"
f" Project ID: {policy_tag['project_id']}\n"
f" Taxonomy Display Name: {policy_tag['taxonomy_display_name']}\n"
f" Taxonomy Name: {policy_tag['taxonomy_name']}\n"
f" Policy Tag Name: {policy_tag['policy_tag_name']}\n"
f" Policy Tag Display Name: {policy_tag['policy_tag_display_name']}"
)
client = bigquery.Client()
table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
table = client.get_table(table_ref)
new_schema = []
for field in table.schema:
if field.name == COLUMN_NAME:
new_schema.append(
bigquery.SchemaField(
name=field.name,
field_type=field.field_type, # Keep original field type
mode=field.mode, # Keep original mode
description=field.description,
policy_tags=bigquery.PolicyTagList([POLICY_TAG_NAME]),
)
)
else:
new_schema.append(field)
table.schema = new_schema
updated_table = client.update_table(table, ["schema"])
print(
f"Updated table {updated_table.project}.{updated_table.dataset_id}.{updated_table.table_id} schema\n"
f"with policy_tag {POLICY_TAG_NAME} on the column {COLUMN_NAME} successfully."
)
if __name__ == "__main__":
list_policy_tags = list_taxonomy_and_policy_tag()
update_table_schema_with_policy_tag(list_policy_tags)
■Workload identity federation(GCP外との連携)
まずWIF用のSAを作成する>SAに権限を付与する>
1)Workload identity provider+SAの情報をgithub actionに埋めて使う
GitHub Actions から GCP リソースにアクセスする用途
GitHub Actions から GCP リソースにアクセスする用途
2)Workload identity poolから構成情報をDLしAWSアプリに埋めて使う
AWSからGCP リソースにアクセスする用途
AWSからGCP リソースにアクセスする用途
gcloud auth login-cred-file=構成情報ファイルパス
3)Workload identity poolから構成情報をEKSのOIDC ID token のパスを指定しDL
EKS から GCP リソースにアクセスする用途
EKS から GCP リソースにアクセスする用途
- EKSのマニフェストのサービスアカウントのアノテーションにIAMロールを記載
- EKSのサービスアカウントを使用したい Podのアノテーションに追加
- マウント先のパスを環境変数 GOOGLE APPLICATION_CREDENTIALS に設定
- Pod内でSDK またはコマンドにてGCP リソースヘアクセス可能か確認