Skip to content

Billing #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/copilot/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Config:
populate_by_name = True

class ApiRequest(BaseModel):
projectId: str
messages: List[UserMessage | AssistantMessage]
workflow_schema: str
current_workflow_config: str
Expand Down
56 changes: 17 additions & 39 deletions apps/rowboat/app/actions/actions.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,18 @@
'use server';
import { AgenticAPIInitStreamResponse, convertFromAgenticAPIChatMessages } from "../lib/types/agents_api_types";
import { AgenticAPIInitStreamResponse } from "../lib/types/agents_api_types";
import { AgenticAPIChatRequest } from "../lib/types/agents_api_types";
import { WebpageCrawlResponse } from "../lib/types/tool_types";
import { webpagesCollection } from "../lib/mongodb";
import { z } from 'zod';
import FirecrawlApp, { ScrapeResponse } from '@mendable/firecrawl-js';
import { apiV1 } from "rowboat-shared";
import { Claims, getSession } from "@auth0/nextjs-auth0";
import { getAgenticApiResponse, getAgenticResponseStreamId } from "../lib/utils";
import { getAgenticResponseStreamId } from "../lib/utils";
import { check_query_limit } from "../lib/rate_limiting";
import { QueryLimitError } from "../lib/client_utils";
import { projectAuthCheck } from "./project_actions";
import { USE_AUTH } from "../lib/feature_flags";
import { authorizeUserAction } from "./billing_actions";

const crawler = new FirecrawlApp({ apiKey: process.env.FIRECRAWL_API_KEY || '' });

export async function authCheck(): Promise<Claims> {
if (!USE_AUTH) {
return {
email: 'guestuser@rowboatlabs.com',
email_verified: true,
sub: 'guest_user',
};
}
const { user } = await getSession() || {};
if (!user) {
throw new Error('User not authenticated');
}
return user;
}

export async function scrapeWebpage(url: string): Promise<z.infer<typeof WebpageCrawlResponse>> {
const page = await webpagesCollection.findOne({
"_id": url,
Expand Down Expand Up @@ -74,30 +57,25 @@ export async function scrapeWebpage(url: string): Promise<z.infer<typeof Webpage
};
}

export async function getAssistantResponse(request: z.infer<typeof AgenticAPIChatRequest>): Promise<{
messages: z.infer<typeof apiV1.ChatMessage>[],
state: unknown,
rawRequest: unknown,
rawResponse: unknown,
}> {
export async function getAssistantResponseStreamId(request: z.infer<typeof AgenticAPIChatRequest>): Promise<z.infer<typeof AgenticAPIInitStreamResponse> | { billingError: string }> {
await projectAuthCheck(request.projectId);
if (!await check_query_limit(request.projectId)) {
throw new QueryLimitError();
}

const response = await getAgenticApiResponse(request);
return {
messages: convertFromAgenticAPIChatMessages(response.messages),
state: response.state,
rawRequest: request,
rawResponse: response.rawAPIResponse,
};
}

export async function getAssistantResponseStreamId(request: z.infer<typeof AgenticAPIChatRequest>): Promise<z.infer<typeof AgenticAPIInitStreamResponse>> {
await projectAuthCheck(request.projectId);
if (!await check_query_limit(request.projectId)) {
throw new QueryLimitError();
// Check billing authorization
const agentModels = request.agents.reduce((acc, agent) => {
acc.push(agent.model);
return acc;
}, [] as string[]);
const { success, error } = await authorizeUserAction({
type: 'agent_response',
data: {
agentModels,
},
});
if (!success) {
return { billingError: error || 'Billing error' };
}

const response = await getAgenticResponseStreamId(request);
Expand Down
53 changes: 53 additions & 0 deletions apps/rowboat/app/actions/auth_actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"use server";
import { getSession } from "@auth0/nextjs-auth0";
import { USE_AUTH } from "../lib/feature_flags";
import { WithStringId, User } from "../lib/types/types";
import { getUserFromSessionId, GUEST_DB_USER } from "../lib/auth";
import { z } from "zod";
import { ObjectId } from "mongodb";
import { usersCollection } from "../lib/mongodb";

export async function authCheck(): Promise<WithStringId<z.infer<typeof User>>> {
if (!USE_AUTH) {
return GUEST_DB_USER;
}

const { user } = await getSession() || {};
if (!user) {
throw new Error('User not authenticated');
}

const dbUser = await getUserFromSessionId(user.sub);
if (!dbUser) {
throw new Error('User record not found');
}
return dbUser;
}

const EmailOnly = z.object({
email: z.string().email(),
});

export async function updateUserEmail(email: string) {
if (!USE_AUTH) {
return;
}
const user = await authCheck();

if (!email.trim()) {
throw new Error('Email is required');
}
if (!EmailOnly.safeParse({ email }).success) {
throw new Error('Invalid email');
}

// update customer email in db
await usersCollection.updateOne({
_id: new ObjectId(user._id),
}, {
$set: {
email,
updatedAt: new Date().toISOString(),
}
});
}
95 changes: 95 additions & 0 deletions apps/rowboat/app/actions/billing_actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"use server";
import {
authorize,
logUsage as libLogUsage,
getBillingCustomer,
createCustomerPortalSession,
getPrices as libGetPrices,
updateSubscriptionPlan as libUpdateSubscriptionPlan,
getEligibleModels as libGetEligibleModels
} from "../lib/billing";
import { authCheck } from "./auth_actions";
import { USE_BILLING } from "../lib/feature_flags";
import {
AuthorizeRequest,
AuthorizeResponse,
LogUsageRequest,
Customer,
PricesResponse,
SubscriptionPlan,
UpdateSubscriptionPlanRequest,
ModelsResponse
} from "../lib/types/billing_types";
import { z } from "zod";
import { WithStringId } from "../lib/types/types";

export async function getCustomer(): Promise<WithStringId<z.infer<typeof Customer>>> {
const user = await authCheck();
if (!user.billingCustomerId) {
throw new Error("Customer not found");
}
const customer = await getBillingCustomer(user.billingCustomerId);
if (!customer) {
throw new Error("Customer not found");
}
return customer;
}

export async function authorizeUserAction(request: z.infer<typeof AuthorizeRequest>): Promise<z.infer<typeof AuthorizeResponse>> {
if (!USE_BILLING) {
return { success: true };
}

const customer = await getCustomer();
const response = await authorize(customer._id, request);
return response;
}

export async function logUsage(request: z.infer<typeof LogUsageRequest>) {
if (!USE_BILLING) {
return;
}

const customer = await getCustomer();
await libLogUsage(customer._id, request);
return;
}

export async function getCustomerPortalUrl(returnUrl: string): Promise<string> {
if (!USE_BILLING) {
throw new Error("Billing is not enabled")
}

const customer = await getCustomer();
return await createCustomerPortalSession(customer._id, returnUrl);
}

export async function getPrices(): Promise<z.infer<typeof PricesResponse>> {
if (!USE_BILLING) {
throw new Error("Billing is not enabled");
}

const response = await libGetPrices();
return response;
}

export async function updateSubscriptionPlan(plan: z.infer<typeof SubscriptionPlan>, returnUrl: string): Promise<string> {
if (!USE_BILLING) {
throw new Error("Billing is not enabled");
}

const customer = await getCustomer();
const request: z.infer<typeof UpdateSubscriptionPlanRequest> = { plan, returnUrl };
const url = await libUpdateSubscriptionPlan(customer._id, request);
return url;
}

export async function getEligibleModels(): Promise<z.infer<typeof ModelsResponse> | "*"> {
if (!USE_BILLING) {
return "*";
}

const customer = await getCustomer();
const response = await libGetEligibleModels(customer._id);
return response;
}
50 changes: 47 additions & 3 deletions apps/rowboat/app/actions/copilot_actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { projectAuthCheck } from "./project_actions";
import { redisClient } from "../lib/redis";
import { fetchProjectMcpTools } from "../lib/project_tools";
import { mergeProjectTools } from "../lib/types/project_types";
import { authorizeUserAction, logUsage } from "./billing_actions";
import { USE_BILLING } from "../lib/feature_flags";

export async function getCopilotResponse(
projectId: string,
Expand All @@ -28,12 +30,21 @@ export async function getCopilotResponse(
message: z.infer<typeof CopilotAssistantMessage>;
rawRequest: unknown;
rawResponse: unknown;
}> {
} | { billingError: string }> {
await projectAuthCheck(projectId);
if (!await check_query_limit(projectId)) {
throw new QueryLimitError();
}

// Check billing authorization
const authResponse = await authorizeUserAction({
type: 'copilot_request',
data: {},
});
if (!authResponse.success) {
return { billingError: authResponse.error || 'Billing error' };
}

// Get MCP tools from project and merge with workflow tools
const mcpTools = await fetchProjectMcpTools(projectId);

Expand All @@ -45,6 +56,7 @@ export async function getCopilotResponse(

// prepare request
const request: z.infer<typeof CopilotAPIRequest> = {
projectId: projectId,
messages: messages.map(convertToCopilotApiMessage),
workflow_schema: JSON.stringify(zodToJsonSchema(CopilotWorkflow)),
current_workflow_config: JSON.stringify(copilotWorkflow),
Expand Down Expand Up @@ -132,12 +144,25 @@ export async function getCopilotResponseStream(
dataSources?: z.infer<typeof DataSource>[]
): Promise<{
streamId: string;
}> {
} | { billingError: string }> {
await projectAuthCheck(projectId);
if (!await check_query_limit(projectId)) {
throw new QueryLimitError();
}

// Check billing authorization
const authResponse = await authorizeUserAction({
type: 'copilot_request',
data: {},
});
if (!authResponse.success) {
return { billingError: authResponse.error || 'Billing error' };
}

if (!await check_query_limit(projectId)) {
throw new QueryLimitError();
}

// Get MCP tools from project and merge with workflow tools
const mcpTools = await fetchProjectMcpTools(projectId);

Expand All @@ -149,6 +174,7 @@ export async function getCopilotResponseStream(

// prepare request
const request: z.infer<typeof CopilotAPIRequest> = {
projectId: projectId,
messages: messages.map(convertToCopilotApiMessage),
workflow_schema: JSON.stringify(zodToJsonSchema(CopilotWorkflow)),
current_workflow_config: JSON.stringify(copilotWorkflow),
Expand Down Expand Up @@ -177,12 +203,21 @@ export async function getCopilotAgentInstructions(
messages: z.infer<typeof CopilotMessage>[],
current_workflow_config: z.infer<typeof Workflow>,
agentName: string,
): Promise<string> {
): Promise<string | { billingError: string }> {
await projectAuthCheck(projectId);
if (!await check_query_limit(projectId)) {
throw new QueryLimitError();
}

// Check billing authorization
const authResponse = await authorizeUserAction({
type: 'copilot_request',
data: {},
});
if (!authResponse.success) {
return { billingError: authResponse.error || 'Billing error' };
}

// Get MCP tools from project and merge with workflow tools
const mcpTools = await fetchProjectMcpTools(projectId);

Expand All @@ -194,6 +229,7 @@ export async function getCopilotAgentInstructions(

// prepare request
const request: z.infer<typeof CopilotAPIRequest> = {
projectId: projectId,
messages: messages.map(convertToCopilotApiMessage),
workflow_schema: JSON.stringify(zodToJsonSchema(CopilotWorkflow)),
current_workflow_config: JSON.stringify(copilotWorkflow),
Expand Down Expand Up @@ -237,6 +273,14 @@ export async function getCopilotAgentInstructions(
throw new Error(`Failed to call copilot api: ${copilotResponse.error}`);
}

// log the billing usage
if (USE_BILLING) {
await logUsage({
type: 'copilot_requests',
amount: 1,
});
}

// return response
return agent_instructions;
}
4 changes: 4 additions & 0 deletions apps/rowboat/app/actions/datasource_actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ export async function recrawlWebDataSource(projectId: string, sourceId: string)
}, {
$set: {
status: 'pending',
billingError: undefined,
lastUpdatedAt: (new Date()).toISOString(),
attempts: 0,
},
Expand All @@ -124,6 +125,7 @@ export async function deleteDataSource(projectId: string, sourceId: string) {
}, {
$set: {
status: 'deleted',
billingError: undefined,
lastUpdatedAt: (new Date()).toISOString(),
attempts: 0,
},
Expand Down Expand Up @@ -189,6 +191,7 @@ export async function addDocsToDataSource({
{
$set: {
status: 'pending',
billingError: undefined,
attempts: 0,
lastUpdatedAt: new Date().toISOString(),
},
Expand Down Expand Up @@ -275,6 +278,7 @@ export async function deleteDocsFromDataSource({
}, {
$set: {
status: 'pending',
billingError: undefined,
attempts: 0,
lastUpdatedAt: new Date().toISOString(),
},
Expand Down
Loading