xref: /aosp_15_r20/external/grpc-grpc/tools/gcp/utils/big_query_utils.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import print_function
16
17import argparse
18import json
19import uuid
20
21from apiclient import discovery
22from apiclient.errors import HttpError
23import httplib2
24from oauth2client.client import GoogleCredentials
25
26# 30 days in milliseconds
27_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
28NUM_RETRIES = 3
29
30
31def create_big_query():
32    """Authenticates with cloud platform and gets a BiqQuery service object"""
33    creds = GoogleCredentials.get_application_default()
34    return discovery.build(
35        "bigquery", "v2", credentials=creds, cache_discovery=False
36    )
37
38
39def create_dataset(biq_query, project_id, dataset_id):
40    is_success = True
41    body = {
42        "datasetReference": {"projectId": project_id, "datasetId": dataset_id}
43    }
44
45    try:
46        dataset_req = biq_query.datasets().insert(
47            projectId=project_id, body=body
48        )
49        dataset_req.execute(num_retries=NUM_RETRIES)
50    except HttpError as http_error:
51        if http_error.resp.status == 409:
52            print("Warning: The dataset %s already exists" % dataset_id)
53        else:
54            # Note: For more debugging info, print "http_error.content"
55            print(
56                "Error in creating dataset: %s. Err: %s"
57                % (dataset_id, http_error)
58            )
59            is_success = False
60    return is_success
61
62
63def create_table(
64    big_query, project_id, dataset_id, table_id, table_schema, description
65):
66    fields = [
67        {
68            "name": field_name,
69            "type": field_type,
70            "description": field_description,
71        }
72        for (field_name, field_type, field_description) in table_schema
73    ]
74    return create_table2(
75        big_query, project_id, dataset_id, table_id, fields, description
76    )
77
78
79def create_partitioned_table(
80    big_query,
81    project_id,
82    dataset_id,
83    table_id,
84    table_schema,
85    description,
86    partition_type="DAY",
87    expiration_ms=_EXPIRATION_MS,
88):
89    """Creates a partitioned table. By default, a date-paritioned table is created with
90    each partition lasting 30 days after it was last modified.
91    """
92    fields = [
93        {
94            "name": field_name,
95            "type": field_type,
96            "description": field_description,
97        }
98        for (field_name, field_type, field_description) in table_schema
99    ]
100    return create_table2(
101        big_query,
102        project_id,
103        dataset_id,
104        table_id,
105        fields,
106        description,
107        partition_type,
108        expiration_ms,
109    )
110
111
112def create_table2(
113    big_query,
114    project_id,
115    dataset_id,
116    table_id,
117    fields_schema,
118    description,
119    partition_type=None,
120    expiration_ms=None,
121):
122    is_success = True
123
124    body = {
125        "description": description,
126        "schema": {"fields": fields_schema},
127        "tableReference": {
128            "datasetId": dataset_id,
129            "projectId": project_id,
130            "tableId": table_id,
131        },
132    }
133
134    if partition_type and expiration_ms:
135        body["timePartitioning"] = {
136            "type": partition_type,
137            "expirationMs": expiration_ms,
138        }
139
140    try:
141        table_req = big_query.tables().insert(
142            projectId=project_id, datasetId=dataset_id, body=body
143        )
144        res = table_req.execute(num_retries=NUM_RETRIES)
145        print('Successfully created %s "%s"' % (res["kind"], res["id"]))
146    except HttpError as http_error:
147        if http_error.resp.status == 409:
148            print("Warning: Table %s already exists" % table_id)
149        else:
150            print(
151                "Error in creating table: %s. Err: %s" % (table_id, http_error)
152            )
153            is_success = False
154    return is_success
155
156
157def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
158    is_success = True
159
160    body = {
161        "schema": {"fields": fields_schema},
162        "tableReference": {
163            "datasetId": dataset_id,
164            "projectId": project_id,
165            "tableId": table_id,
166        },
167    }
168
169    try:
170        table_req = big_query.tables().patch(
171            projectId=project_id,
172            datasetId=dataset_id,
173            tableId=table_id,
174            body=body,
175        )
176        res = table_req.execute(num_retries=NUM_RETRIES)
177        print('Successfully patched %s "%s"' % (res["kind"], res["id"]))
178    except HttpError as http_error:
179        print("Error in creating table: %s. Err: %s" % (table_id, http_error))
180        is_success = False
181    return is_success
182
183
184def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
185    is_success = True
186    body = {"rows": rows_list}
187    try:
188        insert_req = big_query.tabledata().insertAll(
189            projectId=project_id,
190            datasetId=dataset_id,
191            tableId=table_id,
192            body=body,
193        )
194        res = insert_req.execute(num_retries=NUM_RETRIES)
195        if res.get("insertErrors", None):
196            print("Error inserting rows! Response: %s" % res)
197            is_success = False
198    except HttpError as http_error:
199        print("Error inserting rows to the table %s" % table_id)
200        print("Error message: %s" % http_error)
201        is_success = False
202
203    return is_success
204
205
206def sync_query_job(big_query, project_id, query, timeout=5000):
207    query_data = {"query": query, "timeoutMs": timeout}
208    query_job = None
209    try:
210        query_job = (
211            big_query.jobs()
212            .query(projectId=project_id, body=query_data)
213            .execute(num_retries=NUM_RETRIES)
214        )
215    except HttpError as http_error:
216        print("Query execute job failed with error: %s" % http_error)
217        print(http_error.content)
218    return query_job
219
220    # List of (column name, column type, description) tuples
221
222
223def make_row(unique_row_id, row_values_dict):
224    """row_values_dict is a dictionary of column name and column value."""
225    return {"insertId": unique_row_id, "json": row_values_dict}
226