Skip to content

Commit

Permalink
I change in background_task from asyncio to pymongo driver. also I cr…
Browse files Browse the repository at this point in the history
…eated many classes to manage Catalog Store, and Cart. So instead of testing with client_app, I change to test the classes directly to isolate a better way from MongoDB Database
  • Loading branch information
gnm3000 committed May 13, 2022
1 parent f864466 commit d9fd346
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 97 deletions.
56 changes: 23 additions & 33 deletions sales/background_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def send(cls,queue,message):

class SalesBackgroundTask:

def processCart(self,cart):
def processCart(self,cart_id:str):
# send to rabbit cart_queue
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('192.168.49.2',
Expand All @@ -46,7 +46,7 @@ def processCart(self,cart):

channel.queue_declare(queue='sales_cart_queue', durable=True)

message = {"cart_id":str(cart['_id'])}
message = {"cart_id":cart_id}
channel.basic_publish(
exchange='',
routing_key='sales_cart_queue',
Expand All @@ -62,53 +62,51 @@ def processCart(self,cart):


import time
import motor.motor_asyncio
from bson.objectid import ObjectId
MONGODB_URL = "mongodb://adminuser:password123@mongo-nodeport-svc.default.svc.cluster.local/?retryWrites=true&w=majority" # prod
MONGODB_URL = "mongodb://adminuser:password123@192.168.49.2:32258/?retryWrites=true&w=majority" # local
from background_tasks import SalesBackgroundTask
client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL)
db = client.sales
import asyncio
import pymongo
client = pymongo.MongoClient(MONGODB_URL)
db = client["sales"]
import pprint
async def set_order_as(condition,status):
print("SET ORDER CONDITION=",condition,"AS STATUS=",status)
r=await db["carts"].update_one(condition,{'$set':{"status":status}})
print("modified_count",r.modified_count)

class MsgProcessor:

def process(self,msg):
msg = json.loads(msg)
cart_id = msg["cart_id"]
# request customer API
# connect to mongo and read cart info
req = requests.get("http://localhost:5678/customers")
last_customer=(req.json()[-1])
wallet_usd = last_customer["wallet_usd"]
print("PREVIA WALLET",wallet_usd)

req = requests.get("http://localhost:8000/sales/products")
catalog_products=req.json()
print("catalog_products",catalog_products)
#asyncio.get_event_loop().run_until_complete(run())


req = requests.get("http://localhost:8000/sales/checkout-status",params={"cart_id":cart_id})

cart=req.json()
products = cart["cart"]["products"]
customer_id = cart["cart"]["customer_id"]
req = requests.get("http://localhost:5678/customers/%s" % customer_id)
customer=req.json()
wallet_usd = customer["wallet_usd"]
print("WALLET USD",wallet_usd)

purchase=0
order_failed=False
# --- if purchase > stock => NoStockError
for product in products:
product_info = requests.get("http://localhost:8000/sales/product",{"id":product["product_id"]}).json()
stock=product_info["quantity"]
if(product["quantity"] > stock):
condition = {"customer_id": last_customer['_id'], "status": "pending"}
condition = {"customer_id": customer['_id'], "status": "pending"}
order_failed=True
print("Error no stock")
loop = asyncio.get_event_loop()
coroutine = set_order_as({"_id":ObjectId(cart_id)},"failed_by_stock")
loop.run_until_complete(coroutine)
loop.close()
db["carts"].update_one({"_id":ObjectId(cart_id)},{'$set':{"status":"failed_by_stock"}})

break

purchase = purchase + product["price"]*product["quantity"]
Expand All @@ -119,26 +117,18 @@ def process(self,msg):
# process rabbit cart_queue
# --- if purchase > wallet => NoMoneyError
if(purchase>wallet_usd):
condition = {"customer_id": last_customer['_id'], "status": "pending"}
loop = asyncio.get_event_loop()
coroutine = set_order_as({"_id":ObjectId(cart_id)},"failed_by_insufficient_funds")
loop.run_until_complete(coroutine)
loop.close()
db["carts"].update_one({"_id":ObjectId(cart_id)},{'$set':{"status":"failed_by_insufficient_funds"}})
order_failed=True
MessageSender.send("failed_orders",{"cart_id":cart_id})
return
# user_wallet = user_wallet - purchase
wallet_new_value = last_customer["wallet_usd"]- purchase
#wallet_new_value = last_customer["wallet_usd"]- purchase
# actualizar la wallet del cliente!!
req = requests.post("http://localhost:5678/customer/update-wallet",params={"wallet_usd":wallet_new_value,"customer_id":last_customer['_id']})
req = requests.post("http://localhost:5678/customers/update-wallet",params={"cart_id":cart_id,
"purchase_usd":purchase,
"customer_id":customer['_id']})
_=req.json()
condition = {"customer_id": last_customer['_id'], "status": "pending"}


loop = asyncio.get_event_loop()
coroutine = set_order_as({"_id":ObjectId(cart_id)},"success")
loop.run_until_complete(coroutine)
loop.close()
db["carts"].update_one({"_id":ObjectId(cart_id)},{'$set':{"status":"success"}})
MessageSender.send("success_orders",{"cart_id":cart_id})
# if sucess=> create order and send to shipping microservice (using rabbitMQ)
# if error => create fail order and send to customers microservice (using rabbitMQ)
Expand Down
151 changes: 114 additions & 37 deletions sales/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from unittest.mock import Mock
from bson import ObjectId
from pydantic import BaseModel, Field
from typing import Optional, List
Expand Down Expand Up @@ -45,6 +46,36 @@ class ProductModel(BaseModel):
async def hello():
return {"hello"}

from abc import ABC,abstractmethod
class AbstractCatalogStore(ABC):
def __init__(self) -> None:
super().__init__()

def add(self,product_id: str, name:str, price:float,quantity: int):
raise NotImplementedError


class CatalogStore:
def __init__(self,db):
self.db=db
async def _insert_one(self,dict_params):
product= await db["products"].insert_one(dict_params)
return product
async def insert_one(self,data):
return await self._insert_one(data)

class FakeCatalogStore:
def __init__(self,db):
self.db=db
def _insert_one(self,dict_params):
p = Mock()
p.inserted_id=str(ObjectId())
return p
def insert_one(self,data):
return self._insert_one(data)




@app.get("/sales/products", response_model=List[ProductModel])
async def list_products_catalog():
Expand All @@ -56,63 +87,109 @@ async def get_a_product(id:str):
""" Get one product from catalog store """
return await db["products"].find_one({"_id": ObjectId(id) })

async def insert_one(dict_params):
product= await db["products"].insert_one(dict_params)
return product

@app.post("/sales/products")
async def new_product_catalog(name: str, price: float, quantity: int):
""" Scenario: Add new product to the catalog store """

product = insert_one({"name": name, "price": price, "quantity": quantity})
catalog = CatalogStore(db=db)
product= catalog.insert_one({"name": name, "price": price, "quantity": quantity})
return {"message": "product inserted", "id": str(product.inserted_id)}

class AbstractCart(ABC):
def __init__(self) -> None:
super().__init__()

def add(self,product_id: str, name:str, price:float,quantity: int):
raise NotImplementedError

class MyShoppingCart(AbstractCart):
def __init__(self,db,customer_id: str):
self.customer_id=customer_id
self.db = db
async def add(self,product_id: str, name:str, price:float,quantity: int):
condition = {"customer_id": self.customer_id, "status": "open"}
product_line = {
"product_id": product_id,
"name": name,
"price": price,
"quantity": quantity
}
cart = await self.db["carts"].find_one(condition)
if(cart):
await self.db["carts"].update_one(condition,{'$push': {'products': product_line}})
return {"message": "the product was inserted to your cart",
"cart_id": str(cart['_id']),
"cart_products": cart["products"]
}
else:
cart = await self.db["carts"].insert_one({"customer_id": self.customer_id, "status": "open","products": [product_line]})
return {"message": "the product was inserted to your new cart",
"cart_id": str(cart.inserted_id),
"cart_products": [product_line]
}
class FakeShoppingCart(AbstractCart):
def __init__(self,db,customer_id: str):
self.customer_id=customer_id
self.db = db
self.empty=True
def add(self,product_id: str, name:str, price:float,quantity: int):
condition = {"customer_id": self.customer_id, "status": "open"}
product_line = {
"product_id": product_id,
"name": name,
"price": price,
"quantity": quantity
}
if(not self.empty):
return {"message": "the product was inserted to your cart",
"cart_id": str(ObjectId()),
"cart_products": [product_line]
}
else:
self.empty=False
return {"message": "the product was inserted to your new cart",
"cart_id": str(ObjectId()),
"cart_products": [product_line]
}


@app.post("/sales/cart")
async def add_product_to_cart(product_id: str, customer_id: str, name: str, price: float, quantity: int):
""" Scenario: Add a product to my cart:
add a product to my shopping cart and return the actual cart state"""
# if no shopping cart open => create new
# if shopping cart => append product

condition = {"customer_id": customer_id, "status": "open"}
product_line = {
"product_id": product_id,
"name": name,
"price": price,
"quantity": quantity}
cart = await db["carts"].find_one(condition)
if(cart):
db["carts"].update_one(condition,{'$push': {'products': product_line}})
return {"message": "the product was inserted to your cart",
"cart_id": str(cart['_id']),
"cart_products": cart["products"]
}
else:
cart = await db["carts"].insert_one({"customer_id": customer_id, "status": "open","products": [product_line]})
return {"message": "the product was inserted to your new cart",
"cart_id": str(cart.inserted_id),
"cart_products": [product_line]
}
my_shopping_Cart = MyShoppingCart(customer_id=customer_id)
message_return = await my_shopping_Cart.add(product_id=product_id,name=name,price=price,quantity=quantity)
return message_return



class CheckoutCartProcessor:
def __init__(self,db):
self.db = db
async def getOpenCartByCustomerId(self,customer_id:str):
condition = {"customer_id": customer_id, "status": "open"}
return await self.db["carts"].find_one(condition)
async def setCartPending(self,customer_id:str):
condition = {"customer_id": customer_id, "status": "open"}
return await self.db["carts"].update_one(condition,{'$set':{"status":"pending"}})

def process_cart(self,cart_id,processor:SalesBackgroundTask):
processor.processCart(cart_id=cart_id)

@app.post("/sales/checkout")
async def checkout_cart(customer_id: str):
""" Scenario: The customer want to checkout and pay the order from the cart """
# get the cart -> product list
condition = {"customer_id": customer_id, "status": "open"}
cart = await db["carts"].find_one(condition)
ch=CheckoutCartProcessor(db)
cart = await ch.getOpenCartByCustomerId(customer_id=customer_id)
cart_id = cart['_id']
if(cart==None):
return {"message": "No cart was found"}
cart["status"] = "pending"
# save the cart as processing=pending
await db["carts"].update_one(condition,{'$set':{"status":"pending"}})
await ch.setCartPending(customer_id=customer_id)
ch.process_cart(cart_id=str(cart_id),processor=SalesBackgroundTask())
return {"message":"Your cart is in processing state","cart_id":str(cart_id),"status":"pending"}

processor = SalesBackgroundTask()
processor.processCart(cart)

return {"message":"Your cart is in processing state","cart_id":str(cart['_id']),"status":"pending"}
raise NotImplementedError


@app.get("/sales/checkout-status")
async def checkout_cart(cart_id: str):
Expand Down
2 changes: 2 additions & 0 deletions sales/run_dev_sales.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
./env/bin/python -m uvicorn main:app --port 8000 --reload

24 changes: 10 additions & 14 deletions sales/tests/step_defs/test_add_a_product_to_my_cart.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

from bson import ObjectId
from pytest_bdd import scenarios, given, when,then, parsers,scenario

from main import app
from main import AbstractCart, app,FakeShoppingCart
from fastapi.testclient import TestClient
import pytest

#scenarios('../features/main.feature')
@pytest.fixture
def context():
Expand All @@ -14,23 +14,19 @@ def context():
def test_cart():
pass

@given("I'm a customer and I want to buy", target_fixture='client_app')
def client_app():
client = TestClient(app)
return client
@given("I'm a customer and I want to buy", target_fixture='cart')
def cart():
cart = FakeShoppingCart(db=None,customer_id=str(ObjectId()))
return cart


@when("I add a product to my cart")
def ddg_response_contents(client_app,context):
response = client_app.post("/sales/cart",params={"product_id": "62796a564f406dec0c2dca6d",
"customer_id": "6277135f64af6aad4682bed9",
"name": "t-shirt", "price":31,"quantity": 1})
def ddg_response_contents(cart: AbstractCart,context):
response= cart.add(product_id=str(ObjectId()),name="t-shirt",price=31,quantity=1)

context["response"] = response

#return response

@then("I should get the products list updated that already are in my cart")
def response_add_cart(context):
#print(ddg_response_contents)
assert context["response"].status_code==200
pass
assert len(context["response"]["cart_products"])>0
24 changes: 11 additions & 13 deletions sales/tests/step_defs/test_add_new_product_to_the_catalog_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
from pytest_bdd import scenarios, given, when,then, parsers,scenario

from main import app
from main import FakeCatalogStore

from fastapi.testclient import TestClient
import pytest
from unittest import mock
Expand All @@ -13,21 +15,17 @@ def context():
def test_catalog_store():
pass

@given("I'm a manager and I want to setup the store", target_fixture='client_app')
def client_app():
client_app = TestClient(app)
return client_app
@given("I'm a manager and I want to setup the store", target_fixture='catalog')
def catalog():
catalog=FakeCatalogStore(db=None)
return catalog

@when("add a product to the catalog")
def ddg_response_contents(client_app,context):
product = mock.Mock()
product.inserted_id="xxx"
with mock.patch("main.insert_one",return_value=product):
response= client_app.post("/sales/products",params={"name": "Jeans","price": 40,"quantity": 100})
context["response"] = response
def ddg_response_contents(catalog:FakeCatalogStore,context):
r=catalog.insert_one({"name": "Jeans","price": 40,"quantity": 100})
context["result"] = r

@then("I should get the confirmation message of added product")
def response_add_cart(context):
assert context["response"].json()["message"]=="product inserted"
assert context["response"].status_code==200
assert hasattr(context["result"],"inserted_id")

0 comments on commit d9fd346

Please sign in to comment.