Python -> Gcloud sdk -> gcloud auth -> GCP(Bigquery etc.) <-> federation query/Connected sheet
Python(Oauth key) -> GCP認証情報(Oauth Key) + API -> GoogleWorkspace
GCPのクレデンシャルページでOauth2.0 client IDと鍵が発行でき、鍵でイケる
Pythonコードで鍵を指定すると実行時にログインを求められ、client IDとユーザIDを紐づけして実行することになる
Python -> local csv/tsvが基本
●Python(SA key) -> GCP認証情報(SA Key) + API -> GoogleWorkspace
サービスアカウントでGWSにアクセスできないのでダメ
信頼しているドメインとのみ外部共有を許可する - Google Workspace 管理者 ヘルプ サービス アカウント(ドメイン名の末尾が「gserviceaccount.com」)を信頼しているドメインにすることはできません
OUで許可するとイケるはずだが、、
●Python でGoogle docをイジる
下記のURLの内容を検証すればよい
Google Cloudコンソールで
Oauth同意画面を設定
Google Docs APIを有効化
OAuth クライアントIDを作成
シークレットJson ファイルができるのでDL(リネーム)
コードにクレデンシャルJSONファイルとDocのURLに含まれるdocumentIDを記述
→Python実行するとDocのデータが取れる(ローカルの場合は楽)
/// runのデプロイ時に設定を入れる方法について
1)環境変数を(コンソール/cmd/コンテナymiのどれかで)設定:
env=os environ.get("ENV") で使う。ログに出やすく非推奨
2)シークレットマネージャ保存分を設定
環境変数+コードでやるのと同じ?
3)ボリュームを使う:
クレデンを入れ、トークンの一時保存ができる?
Cloud RunでSecret Managerを使いたい #Python - Qiita※サービスアカウントでGWSを扱うにはGWSのOUで受け入れる設定が必要な場合がある
■secret managerに保存してコードで呼び出して使う
Secret Managerのシークレットアクセサー権限
(checksumをかけている)
from google.cloud import secretmanager
import google.cloud.logging
import logging
def get_url(secret_key, project_num)
logging.warning('####### secret_key' + str(secret_key) + '######')
client = secretmanager.SecretManagerServiceClient()
resource_name = "projects/()/secrets/()/versions/latest.format(project_num, secret_key)
res = client.access_secret_version(resource_name)
slack_url = res.payload.data.decode("utf-8")
return slack_url
■API等でデータを取った時中身が分からない場合
pramsが何かわからん時
print(params)
print(type(params))
#<class 'proto.marshal.collections.maps.MapComposite'>
#よくわからんクラスでもdirで保持するAttributeが分かる
attributes = dir(params)
print(attributes)
#そこに含まれるメソッドも確認できるのでhelpする
help(params.get)
#prams.get('query')すると含まれるSQLが分かりこれで進める等
■Protocol buffers
APIの返りはGoogleは自社で開発したProtocol buffersを使っていようだ
たとえば下記が返る
name: "projects/98765"
parent: "folders/12345"
project_id: "aaaaaa-bbbb-market"
state: "ACTIVE"
display_name: "aaaaaa-bbbb-market"
create_time{
seconds: 1601250933
nanos: 820000000
}
update_time{
seconds: 1632826231
nanos: 634000000
}
etag: "W/a06910d9093db111"
labels{
key: "budget_group"
value: "cccc"
}
これは
print (type(response))すると下記であり
<class "google.cloud. resourcemanager v3.types.projects.Project">
print (response.project_id) で簡単にデコードし値取得できることが分かる
APIからの値を取るときのコード
from google.cloud import resourcemanager_v3
client = resourcemanager_v3.ProjectsClient()
request resourcemanager v3.ListProjectsRequest{
#組織の場合、現状は権限がGOP側で用意がなく無理だった
#parent organizations/12345678.
parent="folders/1122233344"
}
page_result = client.list_projects(request=request)
for response in page result:
print(type(response))
print (response.project_id)
エンコードする場合 https://blog.imind.jp/entry/2019/12/28/124728
pip install googleapis-common-protos でインスコ?
sudo apt install protobuf-compiler でインスコ
sudo apt-get install protobuf-compiler でインスコ
※google提供のフォルダごと使用しようとして失敗した方法
※にprotoファイルがあるが丸々必要なので下記でDL
※git clone https://github.com/googleapis/googleapis.git
※バスを合わせてprojects.protoを使うが失敗
※たとえば protoc python_out=. --proto_path=googleapis ./googleapis/google/cloud/resourcemanager/v3/projects.proto
projects.proto を下記の内容で一から作成することが必要だった
syntax proto3;
message Resource{
string name = 1;
string parent = 2;
string project_id = 3;
string state = 4;
string display_name = 5;
map<string, string> create_time = 6;
map<string, string> update_time = 7;
string etag = 8;
map<string, string> labels = 9;
}
そして下記を実行しコンパイル
protoc --python_out=. ./projects.proto
projects_pb2.pyが生成されるため、パッケージとして読みこみprotocol buffersを実行できるようになる
import projects_pb2.py
※なおエラーで pip install U protobuf=3.20.0でダウングレードした
注意点としては、pythonとprotocol bufferとBigqueryの型合わせが必要
DateやTimestampはUNIXエポックからの日数や秒数に変換する必要がある
Noneをstr 'None'や、int -1や、bool FalseにPythonで調整をする
//UNIXエポックからの日数
current_date = datetime.now()
epoch = datetime(1970, 1, 1)
record_date = (current_date - epoch).days
//UNIXエポックからの秒数
data_string = str(date_v)
dt_obj = datetime.fromisoformat(date_string.replace("Z","+00:00"))
epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
seconds_since_epoch = (dt_obj - epoch).tatal_seconds()
microseconds_since_epoch = int(seconds_since_epoch * 1e6)
date_v = microseconds_since_epoch
■BQ APIクォータ割り当て超過(1000件insertしようとした)
テーブル変更1日1500件まで
テーブルメタデータ変更は10sあたり5回まで
テーブル当たりDMLの実行待ちキューは20件まで
テーブル当たり10sあたり25のDMLまで
→各insertでスリープを5秒入れた
import time
time.sleep(5)
↓
上限がテーブル単位のためテーブル名を分けると回避できるらしい
■BQ streaming insert->BQ storage read/write APIの上限はDMLと別で、閾値が大きい
streaming insert -> Bigquery storage write API を使う
■Python/Client libraryの値をBQに入れるにあたり
仕様書で型を調べる。STRUCTやクラスは紐解いて通常のカラムでBQに挿入
timestampやboolやint64はそのままの形でBQに挿入
BQ SQL:日付は値なしならNULLを入れる、数値やBool値はクォートで囲まない
PythonでSQLインサート文を作るとき改行コードが含まれるものをセットするとSyntax errror:Unclosed string literal
q = q.replace('//', '////') バックスラッシュをエスケープ、あるとイリーガルエスケープシーケンスとなる、raw文字列にしたい?
q = q.replace('/n', '//n') 改行をエスケープ
q = q.replace("'", "\\'") SQLが途切れないようシングルクォートをエスケープ
q = q.replace('/n', ' ') 改行を空白で置き換える
■変更の判断
変更で問題がでないか→PCにマウスと付けて問題が起こらないかという問題と相似、最終的に経験で判断するしか
■監査ログからSetIamのメソッドを取りBQ権限付与を検知するクエリ
WITH source AS(
SELECT
*
FROM `project-logging.organization_audit_log_v2.cloudaudit_googleapis_com_activity_20*`
WHERE_TABLE_SUFFIX = format_date('%y%m%d', current_date("Asia/Tokyo"))
),
project source AS(
SELECT
ROW_NUMBER() OVER (ORDER BY timestamp) as id,
*
FROM source
WHERE
protopayload_auditlog.methodName = 'SetlamPolicy'
project_authorizationinfo AS(
SELECT
DISTINCT
id,
__ori.resource.type as type,
__ori.resource.labels.project_id as project_id,
__ori.resource.labels.dataset_id as dataset_id,
protopayload_auditlog.methodName as method_name
protopayload_auditlog.resourceName as resource_name,
protopayload_auditlog.authenticationInfo.principal Email as email_manipulator,
authorizationInfo.resource as request_resource,
authorizationInfo.permission as request_permission,
authorizationInfo.granted as request_granted,
protopayload_auditlog.requestMetadata.callerlp as callerlp,
protopayload_auditlog.requestMetadata.callerSuppliedUserAgent as callerSuppliedUserAgent,
FROM project_source AS __ori
). UNNEST (protopayload_auditlog.authorizationInfo) AS authorizationInfo
project_bindingdeltas AS(
SELECT
id,
--array_binding Deltas_project as binding Deltas_project,
array_binding Deltas_project.action as action_project,
array_binding Deltas_project.member as member_project,
array_binding Deltas_project.role as role_project,
timestamp
FROM project_source AS_ori
,UNNEST (protopayload_auditlog.servicedata_v1_iam.policyDelta.bindingDeltas) AS array_binding Deltas_project
),
project_setiam AS(
SELECT
--*, except(id)
type,
project_id,
dataset_id,
method_name,
resource_name,
email_manipulator,
request_resource,
request_permission,
request_granted,
callerip,
callerSuppliedUserAgent.
action_project,
member_project,
role project.
CAST(NULL AS STRING) AS metadataJson,
CAST(NULL AS STRING) AS bindingDeltas_dataset,
CASTINULLAS STRING AS action_dataset,
CAST(NULL AS STRING) AS member_dataset,
CAST(NULL AS STRING) AS role_dataset,
CAST(NULL AS STRING) AS bindingDeltas_table,
CAST(NULL AS STRING) AS action_table,
CAST(NULL AS STRING) AS member_table,
CAST(NULL AS STRING) AS role_table,
timestamp
FROM project_authorizationinfo
LEFT JOIN project_bindingdeltas ON project_authorizationinfo.id = project_bindingdeltas.id
WHERE role_project LIKE 'roles/bigquery%
),
resource_source AS (
SELECT
__ori.resource.type as type,
__ori.resource.labels.project_id as project id,
__ori.resource.labels.dataset_id as dataset_id,
protopayload_auditlog.methodName as method_name,
protopayload_auditlog.resourceName as resource_name,
protopayload_auditlog.authenticationInfo.principalEmail as email_manipulator,
authorizationInfo.resource as request_resource,
authorizationInfo.permission as request_permission,
authorizationInfo.granted as request_granted,
protopayload_auditlog.requestMetadata.callerlp as callerlp,
protopayload_auditlog.requestMetadata.callerSuppliedUserAgent as callerSuppliedUserAgent,
protopayload_auditiog.metadataJson,
timestamp
FROM source AS __ori
,UNNEST(protopayload_auditlog.authorizationInfo) AS authorizationInfo
WHERE
protopayload_auditlog.methodName = 'google.iam.v1.IAMPolicy.SetlamPolicy'
--AND timestamp= "2024-03-11 04:11:30.885258 UTC"
),
resource_id AS (
SELECT
ROW_NUMBER() OVER (ORDER BY timestamp) as id,
*
FROM resource_source
),
resource_bq_dataset AS (
SELECT
id as id_dataset,
json_extract(metadataJson, '$.datasetChange bindingDeltas') as bindingDeltas_dataset,
json_extract(array_bindingDeltas_dataset, '$action') as action_dataset,
json_extract(array_bindingDeltas_dataset, $.member') as member_dataset,
json_extract(array_bindingDeltas_dataset, '$.role') as role_dataset,
FROM resource_id
,UNNEST(json query_array(metadataJson, '$.datasetChange.bindingDeltas')) AS array_bindingDeltas_dataset
),
resource_bq_table AS (
SELECT
id as id table,
json_extract(metadataJson, '$.tableChange.bindingDeltas') as bindingDeltas_table,
json extract(array_bindingDeltas_table, '$.action') as action table,
json_extract(array_bindingDeltas_table. '$.member') as member table,
json_extract(array_bindingDeltas_table, '$.role') as role_table,
FROM resource_id
,UNNEST(json query_array(metadataJson, '$.tableChange.bindingDeltas')) AS array_bindingDeltas_table
),
resource_setiam AS (
SELECT
--*except(id, id_dataset, id_table)
type,
project_id,
dataset_id,
method_name,
resource_name,
email_manipulator,
request_resource,
request_permission,
request_granted,
callerlp,
callerSuppliedUserAgent,
CAST(NULL AS STRING) AS action_project,
CAST(NULL AS STRING) AS member_project,
CAST(NULL AS STRING) AS role_project,
metadataJson,
bindingDeltas_dataset,
action_dataset,
member_dataset,
role_dataset,
bindingDeltas_table,
action_table,
member_table,
role_table,
timestamp
FROM resource_id
LEFT JOIN resource_bq_dataset ON resource_id.id = resource_bq_dataset.id_dataset
LEFT JOIN resource_bq_table ON resource_id.id = resource_bq_table.id_table
)
SELECT * FROM project_setiam
UNION ALL
SELECT * FROM resource_setiam
■BQからCloudSQLにデータを入れる (GCSを経由する、コマンドやPythonがある
bq query --use_legacy_sql=false 'CREATE OR REPLACE TABLE `prj.ds._table` AS SELECT FROM `prj.ds.view`';
bq extract -destination_format CSV 'prj.ds._table' gs://bucket/tbl.csv
gcloud sql import csv インスタンス名
gs://bucket/tbl.csv --database=データベース名 --table=テーブル名
■ログの重複をなくす
import google.cloud.logging
import logging
# クライアントの作成
client = google.cloud.logging.Client()
# Cloud Logging ハンドラを追加
client.get_default_handler()
client.setup_logging()
# 既存のハンドラをすべて削除
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
# 新しいハンドラを追加
logging.basicConfig(level=logging.INFO)
# logging.basicConfig(level=logging.DEBUG) # DEBUG レベルからすべてのレベルを記録
# propagate を無効にして重複を防ぐ
logger = logging.getLogger()
logger.propagate = False
# 各ログレベルでテスト
logging.debug('This is a DEBUG log')
logging.info('This is an INFO log')
logging.warning('This is a WARNING log')
logging.error('This is an ERROR log')
logging.critical('This is a CRITICAL log')
■何度かAPIコールを繰り返す
def safe_replace_text(document_id, old_text, new_text, max_attempts=3):
for attempt in range(max_attempts):
try:
replace_text(document_id, old_text, new_text)
break # 成功した場合はループを抜ける
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_attempts - 1:
print("Reached maximum attempts.")