- Batch Offline: You run your model regularly (hourly, daily, monthly). Pull the data from the DB and then apply the model.
- Batch Online: It's running all the time. It's always available. You can run your models using Web Services (relationship 1 x 1 between client-server) and Streaming ( 1 x n relationship)
pipenv install scikit-learn==1.0.2 flask --python=3.9
pipenv shell
conda create -n web-service python=3.9
conda activate web-service
pip install -r requirements.txt
After you have finished predict.py and test.py, run predict.py on your virtual-enviroment and test.py on whatever you want to test your app.
(venv) python predict.py
(base) python test.py
- To avoid Flask warning, we can install gunicorn:
pip install gunicorn
gunicorn --bind=0.0.0.0:9696 predict:app
pip install requests
- Create Dockerfile
- Make the image (make sure about your Python version, you can use python -V to check)
- Search for the python image in the Docker Hub, I'll select slim version (slim version is used for reducing the size of the image)
- Build the image with:
where:
docker build -t ride-duration-prediction-service:v1 . (Add winpty at the beggining to the command if you are using Windows)
ride-duration-prediction-service:v1
is the name of the image- t: v1 is the tag of the image
.
is the path to the directory where the Dockerfile is
- Test it with:
where :
docker run -it --rm -p 9696:9696 ride-duration-prediction-service:v1 and run the test.py
- -it: Interactive mode
- --rm: Remove the container after it is done
- -p: Port mapping
I've just created a bucket called mlflow-models-esteban
.
mlflow server --backend-store-uri=sqlite:///mlflow.db --default-artifact-root=s3://mlflow-models-esteban/
export MLFLOW_TRACKING_URI="http://127.0.0.1:5000"
export MODEL_RUN_ID="6dd459b11b4e48dc862f4e1019d166f6"
And add it to the predict.py file:
import os
RUN_ID = os.getenv('RUN_ID')
mlflow artifacts download \
--run-id ${MODEL_RUN_ID} \
--artifact-path model \
--dst-path .
Follow the tutorial that I've provided in Tutorial: Using Amazon Lambda with Amazon Kinesis
- Create your IAM role and policy. Name it
lambda-kinesis-role
- Then go to Lambda and create a new function.
- Name it
ride-duration-prediction-test
- Set Runtime to Python 3.9
- Use a existing role:
lambda-kinesis-role
- Name it
import json
def lambda_handler(event, context):
print(json.dumps(event))
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
- Click in deploy
- Click in test
- Set the name of the test to
test
- Save it and then click in test
- Set the name of the test to
- You should see the test result in the console
import json
def prepare_features(ride):
features = {}
features['PU_DO'] = '%s_%s' % (ride['PULocationID'], ride['DOLocationID'])
features['trip_distance'] = ride['trip_distance']
return features
def predict(features):
return 10.0
def lambda_handler(event, context):
ride = event['ride']
ride_id = event['ride_id']
features = prepare_features(ride)
prediction = predict(features)
#print(json.dumps(event))
return {
'ride_duration': prediction,
'ride_id': id
}
{
"ride": {
"PULocationID": 130,
"DOLocationID": 205,
"trip_distance": 3.66
},
"ride_id": 123
}
- Name it
ride_events
- In capacity mode select
Provisioned
- Only one shard
- Select
Kinesis
- Select
ride_events
KINESIS_STREAM_INPUT=ride_events
aws kinesis put-record \
--stream-name ${KINESIS_STREAM_INPUT} \
--partition-key 1 \
--data "Hello, this is a test."
Copy random-forest.ipynb from web-service-mlflow folder into batch folder.
cp ../web-service-mlflow/random-forest.ipynb score.ipynb
Access to mlflow to see the model id:
mlflow server --backend-store-uri=sqlite:///mlflow.db --default-artifact-root=s3://mlflow-models-esteban/
jupyter nbconvert --to script score.ipynb
python score.py green 2021 3 133f7f2c17384132b4d4f76682ab6139
- Make sure that you have already installed pandas, scikit-learn, mlflow, etc.
@flow
def ride_duration_prediction(
taxi_type: str,
run_id:str,
run_date: datetime=None):
if run_date is None:
ctx = get_run_context()
date = ctx.flow_run_expected_start_time
prev_month = run_date - relativedelta(months=1)
year = prev_month.year
month = prev_month.month
input_file = f'https://s3.amazonaws.com/nyc-tlc/trip+data/{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet'
output_file = f'output/{taxi_type}/{year:04d}-{month:02d}.parquet'
apply_model(
input_file=input_file,
run_id=run_id,
output_file=output_file)
prefect orion start
pip install s3fs
OUTPUT_FOLDER=s3://mlflow-models-esteban/
- Create score_deploy.py file
- Add the following code:
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import CronSchedule
from prefect.flow_runners import SubprocessFlowRunner
DeploymentSpec(
flow_location="score.py",
name="ride_duration_prediction",
parameters={
"taxi_type": "green",
"run_id": "e1efc53e9bd149078b0c12aeaa6365df",
},
flow_storage="fb2163c7-c9ac-448b-b225-0fe9a9ae197a",
schedule=CronSchedule(cron="0 3 2 * *"),
flow_runner=SubprocessFlowRunner(),
tags=["ml"]
)
We've already a storage for the model, so we don't need to create it again, but we have to locate the storage, do it with:
prefect storage ls
You should see the storage, copy the id and paste it in Score Deploy Code
prefect deployment create score_deploy.py
prefect deployment inspect 'ride-duration-prediction/ride_duration_prediction'
prefect agent start 096c894e-a77a-4cb3-917b-ffb5b72ebb11
- Remember, if you want the model run, you need to start the agent and the agent will start the deployment.
We can backfill our model, now we'll create a new script called Score Backfill
python score_backfill.py
COMING SOON
Did you take notes? Add them here:
- Send a PR, add your notes above this line