Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 88 additions & 38 deletions lambda/python-crud-lambda/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,30 @@
import json
from pymongo import MongoClient
from bson import ObjectId
from pprint import pprint

# Environment variables
ATLAS_CONNECTION_STRING = os.environ['ATLAS_CONNECTION_STRING']
COLLECTION_NAME = os.environ['COLLECTION_NAME']
DB_NAME = os.environ['DB_NAME']


def connect_to_mongodb():
client = MongoClient(ATLAS_CONNECTION_STRING)
try:
client = MongoClient(ATLAS_CONNECTION_STRING)
except Exception as e:
print(e)
return client


def success_response(body):
return {
'statusCode': '200',
'body': json.dumps(body),
'headers': {
'Content-Type': 'application/json',
},
"statusCode": "200",
"body": body,
"headers": {
"Content-Type": "application/json"
}
}


def error_response(err):
error_message = str(err)
return {
Expand All @@ -32,42 +36,88 @@ def error_response(err):
},
}


def lambda_handler(event, context):
client = None
try:
# Connect to MongoDB
payload = json.loads(event['body'])
client = connect_to_mongodb()
db = client[DB_NAME]
collection = db[COLLECTION_NAME]

# CRUD operations
operation = event['httpMethod']
if operation == 'GET':
payload = event.get('queryStringParameters') or {}
if '_id' in payload:
payload['_id'] = ObjectId(payload['_id'])
response = list(collection.find(payload))
for doc in response:
doc['_id'] = str(doc['_id'])
else:
payload = json.loads(event['body'])
if operation == 'POST':
insert_result = collection.insert_one(payload)
response = {'_id': str(insert_result.inserted_id)}
elif '_id' not in payload:
return error_response(ValueError('_id is a required field in the body'))
elif operation == 'PUT':
document_id = ObjectId(payload['_id'])
del payload['_id']
response = collection.update_one({'_id': document_id}, {'$set': payload})
response = {'modified_count': response.modified_count}
elif operation == 'DELETE':
response = collection.delete_one({'_id': ObjectId(payload['_id'])})
response = {'deleted_count': response.deleted_count}
op = event['rawPath']
db, coll = payload['database'], payload['collection']
if op == "/findOne":
filter_op = payload['filter'] if 'filter' in payload else {}
projection = payload['projection'] if 'projection' in payload else {}
result = {"document": client[db][coll].find_one(filter_op, projection)}
if isinstance(result['document']['_id'], ObjectId):
result['document']['_id'] = str(result['document']['_id'])

elif op == "/find":
agg_query = []

if 'filter' in payload and payload['filter'] != {}:
agg_query.append({"$match": payload['filter']})

if "sort" in payload and payload['sort'] != {}:
agg_query.append({"$sort": payload['sort']})

if "skip" in payload:
agg_query.append({"$skip": payload['skip']})

if 'limit' in payload:
agg_query.append({"$limit": payload['limit']})

if "projection" in payload and payload['projection'] != {}:
agg_query.append({"$project": payload['projection']})

result = {"documents": list(client[db][coll].aggregate(agg_query))}
for obj in result['documents']:
if isinstance(obj['_id'], ObjectId):
obj['_id'] = str(obj['_id'])

elif op == "/insertOne":
if "document" not in payload or payload['document'] == {}:
return error_response("Send a document to insert")
insert_op = client[db][coll].insert_one(payload['document'])
result = {"insertedId": str(insert_op.inserted_id)}

elif op == "/insertMany":
if "documents" not in payload or payload['documents'] == {}:
return error_response("Send a document to insert")
insert_op = client[db][coll].insert_many(payload['documents'])
result = {"insertedIds": [str(_id) for _id in insert_op.inserted_ids]}

elif op in ["/updateOne", "/updateMany"]:
payload['upsert'] = payload['upsert'] if 'upsert' in payload else False
if "_id" in payload['filter']:
payload['filter']['_id'] = ObjectId(payload['filter']['_id'])
if op == "/updateOne":
update_op = client[db][coll].update_one(payload['filter'], payload['update'], upsert=payload['upsert'])
else:
update_op = client[db][coll].update_many(payload['filter'], payload['update'], upsert=payload['upsert'])
result = {"matchedCount": update_op.matched_count, "modifiedCount": update_op.modified_count}

elif op in ["/deleteOne", "/deleteMany"]:
payload['filter'] = payload['filter'] if 'filter' in payload else {}
if "_id" in payload['filter']:
payload['filter']['_id'] = ObjectId(payload['filter']['_id'])
if op == "/deleteOne":
result = {"deletedCount": client[db][coll].delete_one(payload['filter']).deleted_count}
else:
return error_response(ValueError('Unsupported method "{}"'.format(operation)))
result = {"deletedCount": client[db][coll].delete_many(payload['filter']).deleted_count}

elif op == "/aggregate":
if "pipeline" not in payload or payload['pipeline'] == []:
return error_response("Send a pipeline")
docs = list(client[db][coll].aggregate(payload['pipeline']))
for obj in docs:
if isinstance(obj['_id'], ObjectId):
obj['_id'] = str(obj['_id'])
result = {"documents": docs}

else:
return error_response("Not a valid operation")

return success_response(response)
return success_response(result)

except Exception as e:
return error_response(e)
Expand Down