Skip to content

Commit

Permalink
[RFC] Flyte Agent (#3558)
Browse files Browse the repository at this point in the history
* [RFC] External Plugin Service

Signed-off-by: Kevin Su <pingsutw@apache.org>

* update doc

Signed-off-by: Kevin Su <pingsutw@apache.org>

---------

Signed-off-by: Kevin Su <pingsutw@apache.org>
  • Loading branch information
pingsutw authored Aug 25, 2023
1 parent 516dd39 commit 7094b11
Showing 1 changed file with 270 additions and 0 deletions.
270 changes: 270 additions & 0 deletions rfc/system/0000-exteranl-plugin-service.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
# Out of Core Plugin [RFC]

## Motivation
Currently, it is hard to implement backend plugins, especially for data-scientists & MLE’s who do not have working knowledge of Golang. Also, performance requirements, maintenance and development is cumbersome.

The document here proposes a path to make it possible to write plugins rapidly, while decoupling them from the core flytepropeller engine.

## Goals
* Plugins should be easy to author - no need of code generation, using tools that MLEs and Data Scientists are not accustomed to using.
* Most important plugins for Flyte today are plugins that communicate with external services.
* It should be possible to test these plugins independently and also deploy them privately.
* It should be possible for users to use backend plugins for local development, especially in flytekit and unionML
* It should be possible to author plugins in any language.
* Plugins should be scalable
* Plugins should have a very simple API
* Plugins should show up in the UI (extra details)

## Overview
**Flytekit Backend Plugin** is a plugin registry written by FastAPI. Both users and Propeller can send the request to it to run the jobs (Bigquery, Databricks). Moreover, this registry should be stateless, so we can easily to scale up to multiple machines.

![](https://i.imgur.com/3FQcW99.png)

![](https://i.imgur.com/9xBeEri.jpg)



**FlytePropeller Backend System Plugin**: New web api plugin used to send the request to flytekit backend service to create a job on an external platform.

### Create Job
```sequence
Propeller->Grpc API Server: REST (Task spec)
Grpc API Server->BigQuery: REST (BQ job Spec)
BigQuery->Grpc API Server: Job Id
Grpc API Server->Propeller: Job Id
```
### Poll Job
```sequence
Propeller->Async Cache: Create Cache
Async Cache->Grpc API Server: REST (Job Id)
Grpc API Server->BigQuery: REST (Job Id)
BigQuery->Grpc API Server: Job State, Message
Grpc API Server->Async Cache: Job State, Message
Propeller->Async Cache: Fetch Cache
```

### Register a backend plugin
Users should be able to write the plugin using a rest-like interface. In this proposal we recommend using a simplified form of WebAPI plugins. the goal of the plugins is to enable the following functions

It should be possible to implement this using a Web Service framework like `FastAPI`. Raw implementation for a `fastAPI` like server can be as follows.

**Note**: It should be possible to implement multiple **resource** plugins per python library. Thus each resource should be delimited.

```python
from fastapi import FastAPI
from flytekit.backend.plugins import TaskInfo, IO, PollResultSuccess,
PollResultFailure, OperationInProgress, PollResult

app = FastAPI()

@app.put("/plugins/v1/{task_type}/{version}")
def create(task_info: TaskInfo) -> ResourceInfo:
"""
Creates resource.
return job_id, error code, message
200 Job was created.
401 The request was unauthorized.
500 The request was not handled correctly due to a server error.
"""
return {"job_id": job_id, "error_code": error_code, "message": message}

@app.get("/plugins/v1/{task_type}/{version}")
async def poll(resource_id: int, task_info: TaskInfo, io: IO, resource_info: ResourceInfo) -> PollResult:
"""
Retrieves resource and returns the current observable status
return job_spec, job_state (succeed, failed, running), error core, message.
200 Job was retrieved successfully.
400 The request was malformed. See JSON response for error details.
401 The request was unauthorized.
500 The request was not handled correctly due to a server error.
"""
if resp.state == "success":
state = succeed
elif resp.state == "failure":
state = failed
elif resp.state == "running":
state = running
return {"job_spec": job_spec, "job_state": state, "error_code": error_code, "message": message}

@app.delete("/plugins/v1/{task_type}/{version}")
async def delete_res(resource_id: str):
"""
Delete the resource
return job_id, error core, message.
200 Job was deleted successfully.
400 The request was malformed. See JSON response for error details.
401 The request was unauthorized.
500 The request was not handled correctly due to a server error.
"""
return {"job_id": job_id, "error_code": error_code, "message": message}

```
**Ideally**, we should provide a simplified interface to this in flytekit. This would mean that the flytekit plugin can simply use this interface to create a local plugin and a backendplugin.

```python
class BackendPluginBase:

def __init__(self, task_type: str, version: str = "v1"):
self._task_type = task_type
self._version = version

@property
def task_type(self) -> str:
return self._task_type

@property
def version(self) -> str:
return self._version

@abstractmethod
async def create(self):
pass

@abstractmethod
async def poll(self):
pass

@abstractmethod
async def delete(self):
pass

BackendPluginRegistry.register(BQPlugin())
```

### (Alternative) GRPC Backend Server
```python
class BackendPluginServer(BackendPluginServiceServicer):
def CreateTask(self, request: plugin_system_pb2.TaskCreateRequest, context) -> plugin_system_pb2.TaskCreateResponse:
req = TaskCreateRequest.from_flyte_idl(request)
plugin = BackendPluginRegistry.get_plugin(req.template.type)
return plugin.create(req.inputs, req.output_prefix, req.template)

def GetTask(self, request: plugin_system_pb2.TaskGetRequest, context):
plugin = BackendPluginRegistry.get_plugin(request.task_type)
return plugin.get(job_id=request.job_id, output_prefix=request.output_prefix, prev_state=request.prev_state)

def DeleteTask(self, request: plugin_system_pb2.TaskDeleteRequest, context):
plugin = BackendPluginRegistry.get_plugin(request.task_type)
return plugin.delete(request.job_id)
```
# Register a New plugin


### Run a Flytekit Backend Plugin
The workflow code running Snowflake, Databricks should not be changed. We can add a new config `enable_backend_flytekit_plugin_system`. If it's true, the job will be handled by plugin system. If it can't find the plugin in the registry, it falls back to use the default web api plugin in the propeller.

### Secrets Management
Secrets management should not be imposed using Flyte’s convention, though we should provide a simplified secrets reader using flytekit secret system?

## Backend Plugin System Deployment
There are two options to deploy backend plugin system.

### 1. All the plugin running in one deployment
Pros:
* Easy to maintain.
* One image, and one deployment yaml file.

Cons:
* Dependency conflict between two plugins.
* Need to restart the deployment and rebuild the image when we add a new backend plugin to flytekit.

### 2. One plugin per deployment

Pros:
* We can scale up the specific plugin deployments when the requests increase in some plugins.
* Only need to deploy the plugins that people will use.

Cons:
* Hard to maintain. (tons of images, and deployment yaml files)

### Deployment yaml
Backend plugin system can be run in the deployment independently. Because it's stateless, we can just scale up the replica if request increases.

This yaml file can be added into the Flyte helm chart (`flyte-core`).

```ymal
apiVersion: apps/v1
kind: Deployment
metadata:
name: backend-plugin-system
labels:
app: backend-plugin-system
spec:
replicas: 1
selector:
matchLabels:
app: backend-plugin-system
template:
metadata:
labels:
app: backend-plugin-system
spec:
containers:
- name: backend-plugin-system
image: pingsutw/backend-plugin-system:v1
ports:
- containerPort: 8000
---
apiVersion: v1
kind: Service
metadata:
name: backend-plugin-system
spec:
selector:
app: backend-plugin-system
ports:
- protocol: TCP
port: 8000
targetPort: 8000
```
### Docker Image
```dockerfile
FROM python:3.9-slim-buster

MAINTAINER Flyte Team <users@flyte.org>
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit

WORKDIR /root
ENV PYTHONPATH /root

RUN apt-get update && apt-get install -y git
RUN pip install "git+https://github.com/flyteorg/flytekit@backend-plugin-system"
RUN pip install fastapi uvicorn[standard]
RUN pip install numpy==1.23.1

CMD uvicorn flytekit.extend.backend.fastapi:app --host 0.0.0.0 --port 8000
```

## Authentication
- [Authentication with FastAPI](https://www.freecodecamp.org/news/how-to-add-jwt-authentication-in-fastapi/)
- [Network Policies](https://kubernetes.io/docs/concepts/services-networking/network-policies/)

## Phase 1 (POC)
- **Flytekit**: Use Fast api to create a backend plugin service that can submit the job to Bigquery or Databricks.
- **FlytePropeller**: Add a web api plugin that can talk to flytekit backend plugin service.

## Phase 2
- **Authentication** - only propeller and users having access token can submit the job to backend plugin system.
- **Deployment**: Add backend plugin system deployment to the helm chart.
- **Benchmark**: Measure the overhead, and improve the performance.

## Open Question:

1. Should we provide a way to deploy these plugins using Flyte? IMO this should be optional and not required for MVP?
2. We could implement something similar for golang


## Benchmark
Compare the memory and CPU consumption of the FastAPI server with that of the current Propeller server.
![](https://i.imgur.com/EXXmdyt.png)

The round latency between grpc and FastAPI server.
![](https://i.imgur.com/hp8wss4.png =50%x)

The round latency between grpc and current propeller.
![](https://i.imgur.com/42mdktu.png =50%x)

0 comments on commit 7094b11

Please sign in to comment.