1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from __future__ import print_function 16 17import argparse 18import json 19import uuid 20 21from apiclient import discovery 22from apiclient.errors import HttpError 23import httplib2 24from oauth2client.client import GoogleCredentials 25 26# 30 days in milliseconds 27_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000 28NUM_RETRIES = 3 29 30 31def create_big_query(): 32 """Authenticates with cloud platform and gets a BiqQuery service object""" 33 creds = GoogleCredentials.get_application_default() 34 return discovery.build( 35 "bigquery", "v2", credentials=creds, cache_discovery=False 36 ) 37 38 39def create_dataset(biq_query, project_id, dataset_id): 40 is_success = True 41 body = { 42 "datasetReference": {"projectId": project_id, "datasetId": dataset_id} 43 } 44 45 try: 46 dataset_req = biq_query.datasets().insert( 47 projectId=project_id, body=body 48 ) 49 dataset_req.execute(num_retries=NUM_RETRIES) 50 except HttpError as http_error: 51 if http_error.resp.status == 409: 52 print("Warning: The dataset %s already exists" % dataset_id) 53 else: 54 # Note: For more debugging info, print "http_error.content" 55 print( 56 "Error in creating dataset: %s. Err: %s" 57 % (dataset_id, http_error) 58 ) 59 is_success = False 60 return is_success 61 62 63def create_table( 64 big_query, project_id, dataset_id, table_id, table_schema, description 65): 66 fields = [ 67 { 68 "name": field_name, 69 "type": field_type, 70 "description": field_description, 71 } 72 for (field_name, field_type, field_description) in table_schema 73 ] 74 return create_table2( 75 big_query, project_id, dataset_id, table_id, fields, description 76 ) 77 78 79def create_partitioned_table( 80 big_query, 81 project_id, 82 dataset_id, 83 table_id, 84 table_schema, 85 description, 86 partition_type="DAY", 87 expiration_ms=_EXPIRATION_MS, 88): 89 """Creates a partitioned table. By default, a date-paritioned table is created with 90 each partition lasting 30 days after it was last modified. 91 """ 92 fields = [ 93 { 94 "name": field_name, 95 "type": field_type, 96 "description": field_description, 97 } 98 for (field_name, field_type, field_description) in table_schema 99 ] 100 return create_table2( 101 big_query, 102 project_id, 103 dataset_id, 104 table_id, 105 fields, 106 description, 107 partition_type, 108 expiration_ms, 109 ) 110 111 112def create_table2( 113 big_query, 114 project_id, 115 dataset_id, 116 table_id, 117 fields_schema, 118 description, 119 partition_type=None, 120 expiration_ms=None, 121): 122 is_success = True 123 124 body = { 125 "description": description, 126 "schema": {"fields": fields_schema}, 127 "tableReference": { 128 "datasetId": dataset_id, 129 "projectId": project_id, 130 "tableId": table_id, 131 }, 132 } 133 134 if partition_type and expiration_ms: 135 body["timePartitioning"] = { 136 "type": partition_type, 137 "expirationMs": expiration_ms, 138 } 139 140 try: 141 table_req = big_query.tables().insert( 142 projectId=project_id, datasetId=dataset_id, body=body 143 ) 144 res = table_req.execute(num_retries=NUM_RETRIES) 145 print('Successfully created %s "%s"' % (res["kind"], res["id"])) 146 except HttpError as http_error: 147 if http_error.resp.status == 409: 148 print("Warning: Table %s already exists" % table_id) 149 else: 150 print( 151 "Error in creating table: %s. Err: %s" % (table_id, http_error) 152 ) 153 is_success = False 154 return is_success 155 156 157def patch_table(big_query, project_id, dataset_id, table_id, fields_schema): 158 is_success = True 159 160 body = { 161 "schema": {"fields": fields_schema}, 162 "tableReference": { 163 "datasetId": dataset_id, 164 "projectId": project_id, 165 "tableId": table_id, 166 }, 167 } 168 169 try: 170 table_req = big_query.tables().patch( 171 projectId=project_id, 172 datasetId=dataset_id, 173 tableId=table_id, 174 body=body, 175 ) 176 res = table_req.execute(num_retries=NUM_RETRIES) 177 print('Successfully patched %s "%s"' % (res["kind"], res["id"])) 178 except HttpError as http_error: 179 print("Error in creating table: %s. Err: %s" % (table_id, http_error)) 180 is_success = False 181 return is_success 182 183 184def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): 185 is_success = True 186 body = {"rows": rows_list} 187 try: 188 insert_req = big_query.tabledata().insertAll( 189 projectId=project_id, 190 datasetId=dataset_id, 191 tableId=table_id, 192 body=body, 193 ) 194 res = insert_req.execute(num_retries=NUM_RETRIES) 195 if res.get("insertErrors", None): 196 print("Error inserting rows! Response: %s" % res) 197 is_success = False 198 except HttpError as http_error: 199 print("Error inserting rows to the table %s" % table_id) 200 print("Error message: %s" % http_error) 201 is_success = False 202 203 return is_success 204 205 206def sync_query_job(big_query, project_id, query, timeout=5000): 207 query_data = {"query": query, "timeoutMs": timeout} 208 query_job = None 209 try: 210 query_job = ( 211 big_query.jobs() 212 .query(projectId=project_id, body=query_data) 213 .execute(num_retries=NUM_RETRIES) 214 ) 215 except HttpError as http_error: 216 print("Query execute job failed with error: %s" % http_error) 217 print(http_error.content) 218 return query_job 219 220 # List of (column name, column type, description) tuples 221 222 223def make_row(unique_row_id, row_values_dict): 224 """row_values_dict is a dictionary of column name and column value.""" 225 return {"insertId": unique_row_id, "json": row_values_dict} 226