Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: measure amount of running instances #2366

Merged
merged 15 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ Keep composed of three main components:
3. [Keep CLI](https://github.com/keephq/keep/blob/main/keep/cli/cli.py) - A CLI that lets you control and manage Keep via CLI.

>**Disclaimer**: we use [PostHog](https://posthog.com/faq) to collect anonymous telemetries to better learn how users use Keep (masked screen recordings for CLI commands)
To turn PostHog off, set the `DISABLE_POSTHOG=true` environment variable and remove the `NEXT_PUBLIC_POSTHOG_KEY` environment variable.
To turn PostHog off, set the `POSTHOG_DISABLED=true` environment variable and remove the `NEXT_PUBLIC_POSTHOG_KEY` environment variable.

### Quickstart
#### Spinning up Keep with docker-compose
Expand Down
3 changes: 1 addition & 2 deletions docs/deployment/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ Posthog configuration controls Keep's integration with the Posthog analytics pla
| Env var | Purpose | Required | Default Value | Valid options |
|:-------------------:|:-------:|:----------:|:-------------:|:-------------:|
| **POSTHOG_API_KEY** | API key for PostHog analytics | No | "phc_muk9qE3TfZsX3SZ9XxX52kCGJBclrjhkP9JxAQcm1PZ" | Valid PostHog API key |
| **ENABLE_POSTHOG_API** | Enables or disables PostHog API | No | "false" | "true" or "false" |
| **DISABLE_POSTHOG** | Disables PostHog integration | No | "false" | "true" or "false" |
| **POSTHOG_DISABLED** | Disables PostHog integration | No | "false" | "true" or "false" |

### Ngrok
<Info>
Expand Down
2 changes: 1 addition & 1 deletion keep-ui/.env.local.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ PUSHER_HOST=localhost
PUSHER_PORT=6001
PUSHER_APP_KEY=keepappkey
# Logging
LOG_FORMAT=dev_terminal
LOG_FORMAT=dev_terminal
138 changes: 25 additions & 113 deletions keep/api/api.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import os
import asyncio
import logging
import os
import time
import threading
from importlib import metadata

import jwt
import requests
import uvicorn
from dotenv import find_dotenv, load_dotenv
from fastapi import FastAPI, Request, Response
from fastapi import FastAPI, Request
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
from opentelemetry import trace
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware.cors import CORSMiddleware
from starlette_context import plugins
from starlette_context.middleware import RawContextMiddleware
Expand All @@ -29,7 +26,6 @@
KEEP_ARQ_TASK_POOL_BASIC_PROCESSING,
KEEP_ARQ_TASK_POOL_NONE,
)
from keep.api.core.db import get_api_key
from keep.api.core.dependencies import SINGLE_TENANT_UUID
from keep.api.logging import CONFIG as logging_config
from keep.api.routes import (
Expand Down Expand Up @@ -63,14 +59,16 @@
IdentityManagerFactory,
IdentityManagerTypes,
)
from keep.posthog.posthog import get_posthog_client
from keep.posthog.posthog import POSTHOG_DISABLED, is_posthog_reachable, report_uptime_to_posthog_blocking

# load all providers into cache
from keep.providers.providers_factory import ProvidersFactory
from keep.providers.providers_service import ProvidersService
from keep.workflowmanager.workflowmanager import WorkflowManager
from keep.workflowmanager.workflowstore import WorkflowStore

from keep.api.middlewares import LoggingMiddleware, PostHogEventCaptureMiddleware

load_dotenv(find_dotenv())
keep.api.logging.setup_logging()
logger = logging.getLogger(__name__)
Expand All @@ -86,8 +84,6 @@
KEEP_VERSION = metadata.version("keep")
except Exception:
KEEP_VERSION = os.environ.get("KEEP_VERSION", "unknown")
POSTHOG_API_ENABLED = os.environ.get("ENABLE_POSTHOG_API", "false") == "true"


# Monkey patch requests to disable redirects
original_request = requests.Session.request
Expand All @@ -101,85 +97,6 @@ def no_redirect_request(self, method, url, **kwargs):
requests.Session.request = no_redirect_request


def _extract_identity(request: Request, attribute="email") -> str:
try:
token = request.headers.get("Authorization").split(" ")[1]
decoded_token = jwt.decode(token, options={"verify_signature": False})
return decoded_token.get(attribute)
# case api key
except AttributeError:
# try api key
api_key = request.headers.get("x-api-key")
if not api_key:
return "anonymous"

api_key = get_api_key(api_key)
if api_key:
return api_key.tenant_id
return "anonymous"
except Exception:
return "anonymous"


class EventCaptureMiddleware(BaseHTTPMiddleware):
def __init__(self, app: FastAPI):
super().__init__(app)
self.posthog_client = get_posthog_client()
self.tracer = trace.get_tracer(__name__)

async def capture_request(self, request: Request) -> None:
if POSTHOG_API_ENABLED:
identity = _extract_identity(request)
with self.tracer.start_as_current_span("capture_request"):
self.posthog_client.capture(
identity,
"request-started",
{
"path": request.url.path,
"method": request.method,
"keep_version": KEEP_VERSION,
},
)

async def capture_response(self, request: Request, response: Response) -> None:
if POSTHOG_API_ENABLED:
identity = _extract_identity(request)
with self.tracer.start_as_current_span("capture_response"):
self.posthog_client.capture(
identity,
"request-finished",
{
"path": request.url.path,
"method": request.method,
"status_code": response.status_code,
"keep_version": KEEP_VERSION,
},
)

async def flush(self):
if POSTHOG_API_ENABLED:
with self.tracer.start_as_current_span("flush_posthog_events"):
logger.info("Flushing Posthog events")
self.posthog_client.flush()
logger.info("Posthog events flushed")

async def dispatch(self, request: Request, call_next):
# Skip OPTIONS requests
if request.method == "OPTIONS":
return await call_next(request)
# Capture event before request
await self.capture_request(request)

response = await call_next(request)

# Capture event after request
await self.capture_response(request, response)

# Perform async tasks or flush events after the request is handled
await self.flush()
return response


def get_app(
auth_type: IdentityManagerTypes = IdentityManagerTypes.NOAUTH.value,
) -> FastAPI:
Expand Down Expand Up @@ -208,9 +125,12 @@ async def root():
allow_methods=["*"],
allow_headers=["*"],
)
if not os.getenv("DISABLE_POSTHOG", "false") == "true":
app.add_middleware(EventCaptureMiddleware)
# app.add_middleware(GZipMiddleware)
if not POSTHOG_DISABLED:
if is_posthog_reachable():
app.add_middleware(PostHogEventCaptureMiddleware)
logger.info("Posthog API is reachable, middleware plugged.")
else:
logger.info("Posthog API is not reachable, not using the middleware.")

app.include_router(providers.router, prefix="/providers", tags=["providers"])
app.include_router(actions.router, prefix="/actions", tags=["actions"])
Expand Down Expand Up @@ -258,6 +178,18 @@ async def root():
# if any endpoints needed, add them on_start
identity_manager.on_start(app)

@app.on_event("startup")
async def report_posthog():
if not POSTHOG_DISABLED:
if is_posthog_reachable():
thread = threading.Thread(target=report_uptime_to_posthog_blocking)
thread.start()
logger.info("Uptime Reporting to Posthog launched.")
else:
logger.info("Reporting to Posthog not launched because it's not reachable.")
else:
logger.info("Posthog reporting is disabled so no uptime reporting.")

@app.on_event("startup")
async def on_startup():
logger.info("Loading providers into cache")
Expand Down Expand Up @@ -347,27 +279,7 @@ async def catch_exception(request: Request, exc: Exception):
},
)

@app.middleware("http")
async def log_middleware(request: Request, call_next):
identity = _extract_identity(request, attribute="keep_tenant_id")
logger.info(
f"Request started: {request.method} {request.url.path}",
extra={"tenant_id": identity},
)

# for debugging purposes, log the payload
if os.environ.get("LOG_AUTH_PAYLOAD", "false") == "true":
logger.info(f"Request headers: {request.headers}")

start_time = time.time()
request.state.tenant_id = identity
response = await call_next(request)

end_time = time.time()
logger.info(
f"Request finished: {request.method} {request.url.path} {response.status_code} in {end_time - start_time:.2f}s",
)
return response
app.add_middleware(LoggingMiddleware)

keep.api.observability.setup(app)

Expand Down
117 changes: 117 additions & 0 deletions keep/api/middlewares.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import os
import jwt
import time
import logging
from importlib import metadata

from fastapi import FastAPI, Request, Response
from opentelemetry import trace
from starlette.middleware.base import BaseHTTPMiddleware

from keep.api.core.db import get_api_key
from keep.posthog.posthog import posthog_client

logger = logging.getLogger(__name__)
try:
KEEP_VERSION = metadata.version("keep")
except Exception:
KEEP_VERSION = os.environ.get("KEEP_VERSION", "unknown")


def _extract_identity(request: Request, attribute="email") -> str:
try:
token = request.headers.get("Authorization").split(" ")[1]
decoded_token = jwt.decode(token, options={"verify_signature": False})
return decoded_token.get(attribute)
# case api key
except AttributeError:
# try api key
api_key = request.headers.get("x-api-key")
if not api_key:
return "anonymous"

api_key = get_api_key(api_key)
if api_key:
return api_key.tenant_id
return "anonymous"
except Exception:
return "anonymous"

class PostHogEventCaptureMiddleware(BaseHTTPMiddleware):
def __init__(self, app: FastAPI):
super().__init__(app)
self.posthog_client = posthog_client
self.tracer = trace.get_tracer(__name__)

async def capture_request(self, request: Request) -> None:
identity = _extract_identity(request)
with self.tracer.start_as_current_span("capture_request"):
self.posthog_client.capture(
identity,
"request-started",
{
"path": request.url.path,
"method": request.method,
"keep_version": KEEP_VERSION,
},
)

async def capture_response(self, request: Request, response: Response) -> None:
identity = _extract_identity(request)
with self.tracer.start_as_current_span("capture_response"):
self.posthog_client.capture(
identity,
"request-finished",
{
"path": request.url.path,
"method": request.method,
"status_code": response.status_code,
"keep_version": KEEP_VERSION,
},
)

async def flush(self):
with self.tracer.start_as_current_span("flush_posthog_events"):
logger.debug("Flushing Posthog events")
self.posthog_client.flush()
logger.debug("Posthog events flushed")

async def dispatch(self, request: Request, call_next):
# Skip OPTIONS requests
if request.method == "OPTIONS":
return await call_next(request)
# Capture event before request
await self.capture_request(request)

response = await call_next(request)

# Capture event after request
await self.capture_response(request, response)

# Perform async tasks or flush events after the request is handled
await self.flush()
return response


class LoggingMiddleware(BaseHTTPMiddleware):

async def dispatch(self, request: Request, call_next):
identity = _extract_identity(request, attribute="keep_tenant_id")
logger.info(
f"Request started: {request.method} {request.url.path}",
extra={"tenant_id": identity},
)

# for debugging purposes, log the payload
if os.environ.get("LOG_AUTH_PAYLOAD", "false") == "true":
logger.info(f"Request headers: {request.headers}")

start_time = time.time()
request.state.tenant_id = identity
response = await call_next(request)

end_time = time.time()
logger.info(
f"Request finished: {request.method} {request.url.path} {response.status_code} in {end_time - start_time:.2f}s",
)
return response
3 changes: 1 addition & 2 deletions keep/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
from keep.api.core.db_on_start import try_create_single_tenant
from keep.api.core.dependencies import SINGLE_TENANT_UUID
from keep.cli.click_extensions import NotRequiredIf
from keep.posthog.posthog import get_posthog_client
from keep.posthog.posthog import posthog_client
from keep.providers.providers_factory import ProvidersFactory
from keep.workflowmanager.workflowmanager import WorkflowManager
from keep.workflowmanager.workflowstore import WorkflowStore

load_dotenv(find_dotenv())

posthog_client = get_posthog_client()
try:
KEEP_VERSION = metadata.version("keep")
except metadata.PackageNotFoundError:
Expand Down
Loading
Loading