Skip to content

Commit

Permalink
Merge pull request #2 from elibixby/bigquery
Browse files Browse the repository at this point in the history
Samples from bigquery-samples-python
  • Loading branch information
elibixby committed May 8, 2015
2 parents 3bf742d + 2e85600 commit e6bd547
Show file tree
Hide file tree
Showing 17 changed files with 835 additions and 0 deletions.
14 changes: 14 additions & 0 deletions bigquery/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
argparse==1.2.1
google-api-python-client==1.3.2
httplib2==0.9
oauth2client==1.4.6
py==1.4.26
pyasn1==0.1.7
pyasn1-modules==0.0.5
rsa==3.1.4
simplejson==3.6.5
six==1.9.0
tox==1.9.0
uritemplate==0.6
virtualenv==12.0.7
wsgiref==0.1.2
13 changes: 13 additions & 0 deletions bigquery/samples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2015, Google, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
87 changes: 87 additions & 0 deletions bigquery/samples/async_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright 2015, Google, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function # For python 2/3 interoperability
from samples.utils import get_service, paging, poll_job
import uuid
import json


# [START async_query]
def async_query(service, project_id, query, batch=False, num_retries=5):
# Generate a unique job_id so retries
# don't accidentally duplicate query
job_data = {
'jobReference': {
'projectId': project_id,
'job_id': str(uuid.uuid4())
},
'configuration': {
'query': {
'query': query,
'priority': 'BATCH' if batch else 'INTERACTIVE',
},
}
}
return service.jobs().insert(
projectId=project_id,
body=job_data).execute(num_retries=num_retries)
# [END async_query]


# [START run]
def run(project_id, query_string, batch, num_retries, interval):
service = get_service()

query_job = async_query(service,
project_id,
query_string,
batch,
num_retries)

poll_job(service,
query_job['jobReference']['projectId'],
query_job['jobReference']['jobId'],
interval,
num_retries)


for page in paging(service,
service.jobs().getQueryResults,
num_retries=num_retries,
**query_job['jobReference']):

yield json.dumps(page['rows'])
# [END run]


# [START main]
def main():
project_id = raw_input("Enter the project ID: ")
query_string = raw_input("Enter the Bigquery SQL Query: ")
batch = raw_input("Run query as batch (y/n)?: ") in ('True',
'true',
'y',
'Y',
'yes',
'Yes')


num_retries = raw_input(
"Enter number of times to retry in case of 500 error: ")
interval = raw_input(
"Enter how often to poll the query for completion (seconds): ")

for result in run(project_id, query_string, batch, num_retries, interval):
print(result)
# [END main]
65 changes: 65 additions & 0 deletions bigquery/samples/discovery_doc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2015, Google, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import json
import httplib2
import time

# [START build_and_update]

RESOURCE_PATH='..' #look for discovery docs in the parent folder
MAX_AGE = 86400 #update discovery docs older than a day

# A module that takes care of caching and updating discovery docs
# for google-api-python-clients (until such a feature is integrated)


def build_and_update(api, version):
from oauth2client.client import GoogleCredentials
from googleapiclient.discovery import build_from_document


path = os.path.join(RESOURCE_PATH, '{}.{}'.format(api, version))
try:
age = time.time() - os.path.getmtime(path)
if age > MAX_AGE:
_update_discovery_doc(api, version, path)
except os.error:
_update_discovery_doc(api, version, path)

with open(path, 'r') as discovery_doc:
return build_from_document(discovery_doc.read(),
http=httplib2.Http(),
credentials=GoogleCredentials
.get_application_default())

def _update_discovery_doc(api, version, path):
from apiclient.discovery import DISCOVERY_URI
from apiclient.errors import HttpError
from apiclient.errors import InvalidJsonError
import uritemplate

requested_url = uritemplate.expand(DISCOVERY_URI,
{'api': api, 'apiVersion': version})
resp, content = httplib2.Http().request(requested_url)
if resp.status >= 400:
raise HttpError(resp, content, uri=requested_url)
try:
with open(path, 'w') as discovery_doc:
discovery_json = json.loads(content)
json.dump(discovery_json, discovery_doc)
except ValueError:
raise InvalidJsonError(
'Bad JSON: %s from %s.' % (content, requested_url))
# [END build_and_update]
66 changes: 66 additions & 0 deletions bigquery/samples/export_data_to_cloud_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from samples.utils import get_service, poll_job
import uuid


# [START export_table]
def export_table(service, cloud_storage_path,
projectId, datasetId, tableId,
num_retries=5):
# Generate a unique job_id so retries
# don't accidentally duplicate export
job_data = {
'jobReference': {
'projectId': projectId,
'jobId': str(uuid.uuid4())
},
'configuration': {
'extract': {
'sourceTable': {
'projectId': projectId,
'datasetId': datasetId,
'tableId': tableId,
},
'destinationUris': [cloud_storage_path],
}
}
}
return service.jobs().insert(
projectId=projectId,
body=job_data).execute(num_retries=num_retries)
# [END export_table]


# [START run]
def run(cloud_storage_path,
projectId, datasetId, tableId,
num_retries, interval):

bigquery = get_service()
resource = export_table(bigquery, cloud_storage_path,
projectId, datasetId, tableId, num_retries)
poll_job(bigquery,
resource['jobReference']['projectId'],
resource['jobReference']['jobId'],
interval,
num_retries)
# [END run]


# [START main]
def main():
projectId = raw_input("Enter the project ID: ")
datasetId = raw_input("Enter a dataset ID: ")
tableId = raw_input("Enter a table name to copy: ")
cloud_storage_path = raw_input(
"Enter a Google Cloud Storage URI: ")
interval = raw_input(
"Enter how often to poll the job (in seconds): ")
num_retries = raw_input(
"Enter the number of retries in case of 500 error: ")

run(cloud_storage_path,
projectId, datasetId, tableId,
num_retries, interval)

print 'Done exporting!'
# [END main]
94 changes: 94 additions & 0 deletions bigquery/samples/load_data_by_post.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright 2015, Google, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import httplib2
from samples.utils import get_service, poll_job
from oauth2client.client import GoogleCredentials


# [START make_post]
def make_post(http, schema, data, projectId, datasetId, tableId):
url = ('https://www.googleapis.com/upload/bigquery/v2/projects/' +
projectId + '/jobs')
# Create the body of the request, separated by a boundary of xxx
resource = ('--xxx\n' +
'Content-Type: application/json; charset=UTF-8\n' + '\n' +
'{\n' +
' "configuration": {\n' +
' "load": {\n' +
' "schema": {\n'
' "fields": ' + str(schema) + '\n' +
' },\n' +
' "destinationTable": {\n' +
' "projectId": "' + projectId + '",\n' +
' "datasetId": "' + datasetId + '",\n' +
' "tableId": "' + tableId + '"\n' +
' }\n' +
' }\n' +
' }\n' +
'}\n' +
'--xxx\n' +
'Content-Type: application/octet-stream\n' +
'\n')
# Append data to the request body
resource += data

# Signify the end of the body
resource += ('--xxx--\n')

headers = {'Content-Type': 'multipart/related; boundary=xxx'}

return http.request(url,
method='POST',
body=resource,
headers=headers)
# [END make_post]


# [START main]
def main():
credentials = GoogleCredentials.get_application_default()
http = credentials.authorize(httplib2.Http())
projectId = raw_input('Enter the project ID: ')
datasetId = raw_input('Enter a dataset ID: ')
tableId = raw_input('Enter a table name to load the data to: ')
schema_path = raw_input(
'Enter the path to the schema file for the table: ')

with open(schema_path, 'r') as schema_file:
schema = schema_file.read()

data_path = raw_input('Enter the path to the data file: ')

with open(data_path, 'r') as data_file:
data = data_file.read()

resp, content = make_post(http,
schema,
data,
projectId,
datasetId,
tableId)

if resp.status == 200:
job_resource = json.loads(content)
service = get_service(credentials)
poll_job(service, **job_resource['jobReference'])
print("Success!")
else:
print("Http error code: {}".format(resp.status))
# [END main]

if __name__ == '__main__':
main()
Loading

0 comments on commit e6bd547

Please sign in to comment.