Skip to content

Commit

Permalink
initial files from bigquery-samples-python
Browse files Browse the repository at this point in the history
  • Loading branch information
elibixby committed May 5, 2015
1 parent ec2f327 commit 8e97091
Show file tree
Hide file tree
Showing 35 changed files with 772 additions and 0 deletions.
Empty file added bigquery/dump/__init__.py
Empty file.
104 changes: 104 additions & 0 deletions bigquery/dump/load-data-by-POST.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import sys
import json

from apiclient.discovery import build
from oauth2client.file import Storage
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.tools import run
import httplib2

# for python3 compat
raw_input = vars(__builtins__).get('raw_input', input)

FLOW = OAuth2WebServerFlow(
client_id='xxxxxxx.apps.googleusercontent.com',
client_secret='shhhhhhhhhhhh',
scope='https://www.googleapis.com/auth/bigquery',
user_agent='my-program-name/1.0')


def loadTable(http, service):
projectId = raw_input('Choose your project ID: ')
datasetId = raw_input('Choose a dataset ID: ')
tableId = raw_input('Choose a table name to load the data to: ')

url = ('https://www.googleapis.com/upload/bigquery/v2/projects/' +
projectId + '/jobs')
newSchemaFile = raw_input('What is your schema? ')
schema = open(newSchemaFile, 'r')

# Create the body of the request, separated by a boundary of xxx
newresource = ('--xxx\n' +
'Content-Type: application/json; charset=UTF-8\n' + '\n' +
'{\n' +
' "configuration": {\n' +
' "load": {\n' +
' "schema": {\n'
' "fields": ' + schema.read() + '\n' +
' },\n' +
' "destinationTable": {\n' +
' "projectId": "' + projectId + '",\n' +
' "datasetId": "' + datasetId + '",\n' +
' "tableId": "' + tableId + '"\n' +
' }\n' +
' }\n' +
' }\n' +
'}\n' +
'--xxx\n' +
'Content-Type: application/octet-stream\n' +
'\n')
newDataFile = raw_input('What is your data? ')

# Append data from the specified file to the request body
f = open(newDataFile, 'r')
newresource += f.read()

# Signify the end of the body
newresource += ('--xxx--\n')

headers = {'Content-Type': 'multipart/related; boundary=xxx'}
resp, content = http.request(url, method='POST',
body=newresource, headers=headers)

if resp.status == 200:
jsonResponse = json.loads(content)
jobReference = jsonResponse['jobReference']['jobId']
import time
while True:
jobCollection = service.jobs()
getJob = jobCollection.get(projectId=projectId,
jobId=jobReference).execute()
currentStatus = getJob['status']['state']

if 'DONE' == currentStatus:
print('Done Loading!')
return
else:
print('Waiting to load...')
print('Current status: ' + currentStatus)
print(time.ctime())
time.sleep(10)


def main(argv):
# If the credentials don't exist or are invalid, run the native client
# auth flow. The Storage object will ensure that if successful the good
# credentials will get written back to a file.
#
# Choose a file name to store the credentials.
storage = Storage('bigquery2.dat')
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run(FLOW, storage)

# Create an httplib2.Http object to handle our HTTP requests
# and authorize it with our good credentials.
http = httplib2.Http()
http = credentials.authorize(http)

service = build('bigquery', 'v2', http=http)

loadTable(http, service)

if __name__ == '__main__':
main(sys.argv)
14 changes: 14 additions & 0 deletions bigquery/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
argparse==1.2.1
google-api-python-client==1.3.2
httplib2==0.9
oauth2client==1.4.6
py==1.4.26
pyasn1==0.1.7
pyasn1-modules==0.0.5
rsa==3.1.4
simplejson==3.6.5
six==1.9.0
tox==1.9.0
uritemplate==0.6
virtualenv==12.0.7
wsgiref==0.1.2
Empty file added bigquery/samples/__init__.py
Empty file.
Binary file added bigquery/samples/__init__.pyc
Binary file not shown.
74 changes: 74 additions & 0 deletions bigquery/samples/async_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from __future__ import print_function # For python 2/3 interoperability
from samples.utils import get_service, paging, poll_job
import uuid
import json


# [START async_query]
def async_query(service, project_id, query, batch=False, num_retries=5):
# Generate a unique job_id so retries
# don't accidentally duplicate query
job_data = {
'jobReference': {
'projectId': project_id,
'job_id': str(uuid.uuid4())
},
'configuration': {
'query': {
'query': query,
'priority': 'BATCH' if batch else 'INTERACTIVE',
},
}
}
return service.jobs().insert(
projectId=project_id,
body=job_data).execute(num_retries=num_retries)
# [END async_query]


# [START run]
def run(project_id, query_string, batch, num_retries, interval):
service = get_service()

query_job = async_query(service,
project_id,
query_string,
batch,
num_retries)

poll_job(service,
query_job['jobReference']['projectId'],
query_job['jobReference']['jobId'],
interval,
num_retries)


for page in paging(service,
service.jobs().getQueryResults,
num_retries=num_retries,
**query_job['jobReference']):

yield json.dumps(page['rows'])
# [END run]


# [START main]
def main():
project_id = raw_input("Enter the project ID: ")
query_string = raw_input("Enter the Bigquery SQL Query: ")
batch = raw_input("Run query as batch (y/n)?: ") in ('True',
'true',
'y',
'Y',
'yes',
'Yes')


num_retries = raw_input(
"Enter number of times to retry in case of 500 error: ")
interval = raw_input(
"Enter how often to poll the query for completion (seconds): ")

for result in run(project_id, query_string, batch, num_retries, interval):
print(result)
# [END main]
Binary file added bigquery/samples/async_query.pyc
Binary file not shown.
52 changes: 52 additions & 0 deletions bigquery/samples/discovery_doc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
import json
import httplib2
import time

# [START build_and_update]

RESOURCE_PATH='..' #look for discovery docs in the parent folder
MAX_AGE = 86400 #update discovery docs older than a day

# A module that takes care of caching and updating discovery docs
# for google-api-python-clients (until such a feature is integrated)


def build_and_update(api, version):
from oauth2client.client import GoogleCredentials
from googleapiclient.discovery import build_from_document


path = os.path.join(RESOURCE_PATH, '{}.{}'.format(api, version))
try:
age = time.time() - os.path.getmtime(path)
if age > MAX_AGE:
_update_discovery_doc(api, version, path)
except os.error:
_update_discovery_doc(api, version, path)

with open(path, 'r') as discovery_doc:
return build_from_document(discovery_doc.read(),
http=httplib2.Http(),
credentials=GoogleCredentials
.get_application_default())

def _update_discovery_doc(api, version, path):
from apiclient.discovery import DISCOVERY_URI
from apiclient.errors import HttpError
from apiclient.errors import InvalidJsonError
import uritemplate

requested_url = uritemplate.expand(DISCOVERY_URI,
{'api': api, 'apiVersion': version})
resp, content = httplib2.Http().request(requested_url)
if resp.status >= 400:
raise HttpError(resp, content, uri=requested_url)
try:
with open(path, 'w') as discovery_doc:
discovery_json = json.loads(content)
json.dump(discovery_json, discovery_doc)
except ValueError:
raise InvalidJsonError(
'Bad JSON: %s from %s.' % (content, requested_url))
# [END build_and_update]
Binary file added bigquery/samples/discovery_doc.pyc
Binary file not shown.
66 changes: 66 additions & 0 deletions bigquery/samples/export_data_to_cloud_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from samples.utils import get_service, poll_job
import uuid


# [START export_table]
def export_table(service, cloud_storage_path,
projectId, datasetId, tableId,
num_retries=5):
# Generate a unique job_id so retries
# don't accidentally duplicate export
job_data = {
'jobReference': {
'projectId': projectId,
'jobId': str(uuid.uuid4())
},
'configuration': {
'extract': {
'sourceTable': {
'projectId': projectId,
'datasetId': datasetId,
'tableId': tableId,
},
'destinationUris': [cloud_storage_path],
}
}
}
return service.jobs().insert(
projectId=projectId,
body=job_data).execute(num_retries=num_retries)
# [END export_table]


# [START run]
def run(cloud_storage_path,
projectId, datasetId, tableId,
num_retries, interval):

bigquery = get_service()
resource = export_table(bigquery, cloud_storage_path,
projectId, datasetId, tableId, num_retries)
poll_job(bigquery,
resource['jobReference']['projectId'],
resource['jobReference']['jobId'],
interval,
num_retries)
# [END run]


# [START main]
def main():
projectId = raw_input("Enter the project ID: ")
datasetId = raw_input("Enter a dataset ID: ")
tableId = raw_input("Enter a table name to copy: ")
cloud_storage_path = raw_input(
"Enter a Google Cloud Storage URI: ")
interval = raw_input(
"Enter how often to poll the job (in seconds): ")
num_retries = raw_input(
"Enter the number of retries in case of 500 error: ")

run(cloud_storage_path,
projectId, datasetId, tableId,
num_retries, interval)

print 'Done exporting!'
# [END main]
Binary file added bigquery/samples/export_data_to_cloud_storage.pyc
Binary file not shown.
81 changes: 81 additions & 0 deletions bigquery/samples/load_data_by_post.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import json
import httplib2
from samples.utils import get_service, poll_job
from oauth2client.client import GoogleCredentials


# [START make_post]
def make_post(http, schema, data, projectId, datasetId, tableId):
url = ('https://www.googleapis.com/upload/bigquery/v2/projects/' +
projectId + '/jobs')
# Create the body of the request, separated by a boundary of xxx
resource = ('--xxx\n' +
'Content-Type: application/json; charset=UTF-8\n' + '\n' +
'{\n' +
' "configuration": {\n' +
' "load": {\n' +
' "schema": {\n'
' "fields": ' + str(schema) + '\n' +
' },\n' +
' "destinationTable": {\n' +
' "projectId": "' + projectId + '",\n' +
' "datasetId": "' + datasetId + '",\n' +
' "tableId": "' + tableId + '"\n' +
' }\n' +
' }\n' +
' }\n' +
'}\n' +
'--xxx\n' +
'Content-Type: application/octet-stream\n' +
'\n')
# Append data to the request body
resource += data

# Signify the end of the body
resource += ('--xxx--\n')

headers = {'Content-Type': 'multipart/related; boundary=xxx'}

return http.request(url,
method='POST',
body=resource,
headers=headers)
# [END make_post]


# [START main]
def main():
credentials = GoogleCredentials.get_application_default()
http = credentials.authorize(httplib2.Http())
projectId = raw_input('Enter the project ID: ')
datasetId = raw_input('Enter a dataset ID: ')
tableId = raw_input('Enter a table name to load the data to: ')
schema_path = raw_input(
'Enter the path to the schema file for the table: ')

with open(schema_path, 'r') as schema_file:
schema = schema_file.read()

data_path = raw_input('Enter the path to the data file: ')

with open(data_path, 'r') as data_file:
data = data_file.read()

resp, content = make_post(http,
schema,
data,
projectId,
datasetId,
tableId)

if resp.status == 200:
job_resource = json.loads(content)
service = get_service(credentials)
poll_job(service, **job_resource['jobReference'])
print("Success!")
else:
print("Http error code: {}".format(resp.status))
# [END main]

if __name__ == '__main__':
main()
Loading

0 comments on commit 8e97091

Please sign in to comment.