forked from Azure/azure-sdk-for-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcosmos_db.py
89 lines (78 loc) · 2.94 KB
/
cosmos_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import os
import uuid
from azure.cosmos import CosmosClient
from azure.cosmos.partition_key import PartitionKey
class CosmosDB:
def __init__(self):
URL = os.environ["COSMOS_ENDPOINT"]
KEY = os.environ["COSMOS_KEY"]
self.client = CosmosClient(URL, {"masterKey": KEY})
self.dbName = "pySolarSystem-" + uuid.uuid1().hex
def create_database(self):
print("Creating '{0}' database...".format(self.dbName))
return self.client.create_database(self.dbName)
def create_container(self, db):
collectionName = "Planets"
print("Creating '{0}' collection...".format(collectionName))
partition_key = PartitionKey(path="/id", kind="Hash")
return db.create_container(id="Planets", partition_key=partition_key)
def create_documents(self, container):
# Cosmos will look for an 'id' field in the items, if the 'id' is not specified, Cosmos is going to assign a random key.
planets = [
{
"id": "Earth",
"HasRings": False,
"Radius": 3959,
"Moons": [{"Name": "Moon"}],
},
{
"id": "Mars",
"HasRings": False,
"Radius": 2106,
"Moons": [{"Name": "Phobos"}, {"Name": "Deimos"}],
},
]
print("Inserting items in the collection...")
for planet in planets:
container.create_item(planet)
print("\t'{0}' created".format(planet["id"]))
print("\tdone")
def simple_query(self, container):
print("Quering the container...")
items = list(
container.query_items(
query="SELECT c.id FROM c", enable_cross_partition_query=True
)
)
print("\tdone: {0}".format(items))
def delete_database(self):
print("Cleaning up the resource...")
self.client.delete_database(self.dbName)
print("\tdone")
def run(self):
print("")
print("------------------------")
print("Cosmos DB")
print("------------------------")
print("1) Create a Database")
print("2) Create a Container in the database")
print("3) Insert Documents (items) into the Container")
print("4) Delete Database (Clean up the resource)")
print("")
# Ensure that the database does not exist
try:
self.delete_database()
except:
pass
try:
db = self.create_database()
container = self.create_container(db=db)
self.create_documents(container=container)
self.simple_query(container=container)
finally:
# if something goes wrong, the resource should be cleaned anyway
self.delete_database()