1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from __future__ import print_function 16 17import argparse 18import json 19import uuid 20 21from apiclient import discovery 22from apiclient.errors import HttpError 23import httplib2 24from oauth2client.client import GoogleCredentials 25 26# 30 days in milliseconds 27_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000 28NUM_RETRIES = 3 29 30 31def create_big_query(): 32 """Authenticates with cloud platform and gets a BiqQuery service object 33 """ 34 creds = GoogleCredentials.get_application_default() 35 return discovery.build('bigquery', 36 'v2', 37 credentials=creds, 38 cache_discovery=False) 39 40 41def create_dataset(biq_query, project_id, dataset_id): 42 is_success = True 43 body = { 44 'datasetReference': { 45 'projectId': project_id, 46 'datasetId': dataset_id 47 } 48 } 49 50 try: 51 dataset_req = biq_query.datasets().insert(projectId=project_id, 52 body=body) 53 dataset_req.execute(num_retries=NUM_RETRIES) 54 except HttpError as http_error: 55 if http_error.resp.status == 409: 56 print('Warning: The dataset %s already exists' % dataset_id) 57 else: 58 # Note: For more debugging info, print "http_error.content" 59 print('Error in creating dataset: %s. Err: %s' % 60 (dataset_id, http_error)) 61 is_success = False 62 return is_success 63 64 65def create_table(big_query, project_id, dataset_id, table_id, table_schema, 66 description): 67 fields = [{ 68 'name': field_name, 69 'type': field_type, 70 'description': field_description 71 } for (field_name, field_type, field_description) in table_schema] 72 return create_table2(big_query, project_id, dataset_id, table_id, fields, 73 description) 74 75 76def create_partitioned_table(big_query, 77 project_id, 78 dataset_id, 79 table_id, 80 table_schema, 81 description, 82 partition_type='DAY', 83 expiration_ms=_EXPIRATION_MS): 84 """Creates a partitioned table. By default, a date-paritioned table is created with 85 each partition lasting 30 days after it was last modified. 86 """ 87 fields = [{ 88 'name': field_name, 89 'type': field_type, 90 'description': field_description 91 } for (field_name, field_type, field_description) in table_schema] 92 return create_table2(big_query, project_id, dataset_id, table_id, fields, 93 description, partition_type, expiration_ms) 94 95 96def create_table2(big_query, 97 project_id, 98 dataset_id, 99 table_id, 100 fields_schema, 101 description, 102 partition_type=None, 103 expiration_ms=None): 104 is_success = True 105 106 body = { 107 'description': description, 108 'schema': { 109 'fields': fields_schema 110 }, 111 'tableReference': { 112 'datasetId': dataset_id, 113 'projectId': project_id, 114 'tableId': table_id 115 } 116 } 117 118 if partition_type and expiration_ms: 119 body["timePartitioning"] = { 120 "type": partition_type, 121 "expirationMs": expiration_ms 122 } 123 124 try: 125 table_req = big_query.tables().insert(projectId=project_id, 126 datasetId=dataset_id, 127 body=body) 128 res = table_req.execute(num_retries=NUM_RETRIES) 129 print('Successfully created %s "%s"' % (res['kind'], res['id'])) 130 except HttpError as http_error: 131 if http_error.resp.status == 409: 132 print('Warning: Table %s already exists' % table_id) 133 else: 134 print('Error in creating table: %s. Err: %s' % 135 (table_id, http_error)) 136 is_success = False 137 return is_success 138 139 140def patch_table(big_query, project_id, dataset_id, table_id, fields_schema): 141 is_success = True 142 143 body = { 144 'schema': { 145 'fields': fields_schema 146 }, 147 'tableReference': { 148 'datasetId': dataset_id, 149 'projectId': project_id, 150 'tableId': table_id 151 } 152 } 153 154 try: 155 table_req = big_query.tables().patch(projectId=project_id, 156 datasetId=dataset_id, 157 tableId=table_id, 158 body=body) 159 res = table_req.execute(num_retries=NUM_RETRIES) 160 print('Successfully patched %s "%s"' % (res['kind'], res['id'])) 161 except HttpError as http_error: 162 print('Error in creating table: %s. Err: %s' % (table_id, http_error)) 163 is_success = False 164 return is_success 165 166 167def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): 168 is_success = True 169 body = {'rows': rows_list} 170 try: 171 insert_req = big_query.tabledata().insertAll(projectId=project_id, 172 datasetId=dataset_id, 173 tableId=table_id, 174 body=body) 175 res = insert_req.execute(num_retries=NUM_RETRIES) 176 if res.get('insertErrors', None): 177 print('Error inserting rows! Response: %s' % res) 178 is_success = False 179 except HttpError as http_error: 180 print('Error inserting rows to the table %s' % table_id) 181 print('Error message: %s' % http_error) 182 is_success = False 183 184 return is_success 185 186 187def sync_query_job(big_query, project_id, query, timeout=5000): 188 query_data = {'query': query, 'timeoutMs': timeout} 189 query_job = None 190 try: 191 query_job = big_query.jobs().query( 192 projectId=project_id, 193 body=query_data).execute(num_retries=NUM_RETRIES) 194 except HttpError as http_error: 195 print('Query execute job failed with error: %s' % http_error) 196 print(http_error.content) 197 return query_job 198 199 200 # List of (column name, column type, description) tuples 201def make_row(unique_row_id, row_values_dict): 202 """row_values_dict is a dictionary of column name and column value. 203 """ 204 return {'insertId': unique_row_id, 'json': row_values_dict} 205