1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import print_function
16
17import argparse
18import json
19import uuid
20
21from apiclient import discovery
22from apiclient.errors import HttpError
23import httplib2
24from oauth2client.client import GoogleCredentials
25
26# 30 days in milliseconds
27_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
28NUM_RETRIES = 3
29
30
31def create_big_query():
32    """Authenticates with cloud platform and gets a BiqQuery service object
33  """
34    creds = GoogleCredentials.get_application_default()
35    return discovery.build('bigquery',
36                           'v2',
37                           credentials=creds,
38                           cache_discovery=False)
39
40
41def create_dataset(biq_query, project_id, dataset_id):
42    is_success = True
43    body = {
44        'datasetReference': {
45            'projectId': project_id,
46            'datasetId': dataset_id
47        }
48    }
49
50    try:
51        dataset_req = biq_query.datasets().insert(projectId=project_id,
52                                                  body=body)
53        dataset_req.execute(num_retries=NUM_RETRIES)
54    except HttpError as http_error:
55        if http_error.resp.status == 409:
56            print('Warning: The dataset %s already exists' % dataset_id)
57        else:
58            # Note: For more debugging info, print "http_error.content"
59            print('Error in creating dataset: %s. Err: %s' %
60                  (dataset_id, http_error))
61            is_success = False
62    return is_success
63
64
65def create_table(big_query, project_id, dataset_id, table_id, table_schema,
66                 description):
67    fields = [{
68        'name': field_name,
69        'type': field_type,
70        'description': field_description
71    } for (field_name, field_type, field_description) in table_schema]
72    return create_table2(big_query, project_id, dataset_id, table_id, fields,
73                         description)
74
75
76def create_partitioned_table(big_query,
77                             project_id,
78                             dataset_id,
79                             table_id,
80                             table_schema,
81                             description,
82                             partition_type='DAY',
83                             expiration_ms=_EXPIRATION_MS):
84    """Creates a partitioned table. By default, a date-paritioned table is created with
85  each partition lasting 30 days after it was last modified.
86  """
87    fields = [{
88        'name': field_name,
89        'type': field_type,
90        'description': field_description
91    } for (field_name, field_type, field_description) in table_schema]
92    return create_table2(big_query, project_id, dataset_id, table_id, fields,
93                         description, partition_type, expiration_ms)
94
95
96def create_table2(big_query,
97                  project_id,
98                  dataset_id,
99                  table_id,
100                  fields_schema,
101                  description,
102                  partition_type=None,
103                  expiration_ms=None):
104    is_success = True
105
106    body = {
107        'description': description,
108        'schema': {
109            'fields': fields_schema
110        },
111        'tableReference': {
112            'datasetId': dataset_id,
113            'projectId': project_id,
114            'tableId': table_id
115        }
116    }
117
118    if partition_type and expiration_ms:
119        body["timePartitioning"] = {
120            "type": partition_type,
121            "expirationMs": expiration_ms
122        }
123
124    try:
125        table_req = big_query.tables().insert(projectId=project_id,
126                                              datasetId=dataset_id,
127                                              body=body)
128        res = table_req.execute(num_retries=NUM_RETRIES)
129        print('Successfully created %s "%s"' % (res['kind'], res['id']))
130    except HttpError as http_error:
131        if http_error.resp.status == 409:
132            print('Warning: Table %s already exists' % table_id)
133        else:
134            print('Error in creating table: %s. Err: %s' %
135                  (table_id, http_error))
136            is_success = False
137    return is_success
138
139
140def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
141    is_success = True
142
143    body = {
144        'schema': {
145            'fields': fields_schema
146        },
147        'tableReference': {
148            'datasetId': dataset_id,
149            'projectId': project_id,
150            'tableId': table_id
151        }
152    }
153
154    try:
155        table_req = big_query.tables().patch(projectId=project_id,
156                                             datasetId=dataset_id,
157                                             tableId=table_id,
158                                             body=body)
159        res = table_req.execute(num_retries=NUM_RETRIES)
160        print('Successfully patched %s "%s"' % (res['kind'], res['id']))
161    except HttpError as http_error:
162        print('Error in creating table: %s. Err: %s' % (table_id, http_error))
163        is_success = False
164    return is_success
165
166
167def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
168    is_success = True
169    body = {'rows': rows_list}
170    try:
171        insert_req = big_query.tabledata().insertAll(projectId=project_id,
172                                                     datasetId=dataset_id,
173                                                     tableId=table_id,
174                                                     body=body)
175        res = insert_req.execute(num_retries=NUM_RETRIES)
176        if res.get('insertErrors', None):
177            print('Error inserting rows! Response: %s' % res)
178            is_success = False
179    except HttpError as http_error:
180        print('Error inserting rows to the table %s' % table_id)
181        print('Error message: %s' % http_error)
182        is_success = False
183
184    return is_success
185
186
187def sync_query_job(big_query, project_id, query, timeout=5000):
188    query_data = {'query': query, 'timeoutMs': timeout}
189    query_job = None
190    try:
191        query_job = big_query.jobs().query(
192            projectId=project_id,
193            body=query_data).execute(num_retries=NUM_RETRIES)
194    except HttpError as http_error:
195        print('Query execute job failed with error: %s' % http_error)
196        print(http_error.content)
197    return query_job
198
199
200    # List of (column name, column type, description) tuples
201def make_row(unique_row_id, row_values_dict):
202    """row_values_dict is a dictionary of column name and column value.
203  """
204    return {'insertId': unique_row_id, 'json': row_values_dict}
205