Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion apps/dev-playground/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import "reflect-metadata";
import { analytics, createApp, server } from "@databricks/appkit";
import {
agent,
analytics,
chatUI,
createApp,
server,
} from "@databricks/appkit";
import { WorkspaceClient } from "@databricks/sdk-experimental";
import { lakebaseExamples } from "./lakebase-examples-plugin";
import { reconnect } from "./reconnect-plugin";
Expand All @@ -15,13 +21,26 @@ function createMockClient() {
return client;
}

// Example: agent + chat UI (disabled by default; set ENABLE_AGENT_EXAMPLE=true to activate)
const agentPlugins =
process.env.ENABLE_AGENT_EXAMPLE === "true"
? [
agent({
// model: 'databricks-claude-sonnet-4-5', // or set DATABRICKS_AGENT_SERVING_ENDPOINT_NAME
systemPrompt: "You are a helpful Databricks data assistant.",
}),
chatUI({ enablePersistence: false }),
]
: [];

createApp({
plugins: [
server({ autoStart: false }),
reconnect(),
telemetryExamples(),
analytics({}),
lakebaseExamples(),
...agentPlugins,
],
...(process.env.APPKIT_E2E_TEST && { client: createMockClient() }),
}).then((appkit) => {
Expand Down
4 changes: 4 additions & 0 deletions packages/appkit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@databricks/langchainjs": "^0.1.0",
"@databricks/lakebase": "workspace:*",
"@databricks/sdk-experimental": "^0.16.0",
"@langchain/core": "^1.1.8",
"@langchain/langgraph": "^1.1.2",
"@langchain/mcp-adapters": "^1.1.1",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/api-logs": "^0.208.0",
"@opentelemetry/auto-instrumentations-node": "^0.67.0",
Expand Down
4 changes: 3 additions & 1 deletion packages/appkit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ export {
} from "./errors";
// Plugin authoring
export { Plugin, toPlugin } from "./plugin";
export { analytics, server } from "./plugins";
export { agent, analytics, chatUI, server } from "./plugins";
export type { AgentTraceDestination, IAgentConfig } from "./plugins/agent";
export type { IChatUIConfig } from "./plugins/chat-ui";
// Registry types and utilities for plugin manifests
export type {
ConfigSchema,
Expand Down
9 changes: 9 additions & 0 deletions packages/appkit/src/plugin/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ export abstract class Plugin<
return;
}

/**
* Inject routes directly onto the root Express application (e.g. /invocations).
* Called by ServerPlugin after all plugin routers are mounted at /api/{name}.
*
* Use this for endpoints that must live at the app root rather than under /api/,
* such as the Databricks model serving `/invocations` convention.
*/
injectAppRoutes?(_app: express.Application): void {}

async setup() {}

getEndpoints(): PluginEndpointMap {
Expand Down
291 changes: 291 additions & 0 deletions packages/appkit/src/plugins/agent/agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
/**
* AgentPlugin — first-class AppKit plugin for LangChain/LangGraph agents.
*
* Ported from ~/app-templates/agent-langchain-ts/src/framework/plugins/agent/AgentPlugin.ts
* and ~/app-templates/agent-langchain-ts/src/agent.ts
*/

import type { DatabricksMCPServer } from "@databricks/langchainjs";
import { buildMCPServerConfig, ChatDatabricks } from "@databricks/langchainjs";
import { HumanMessage, SystemMessage } from "@langchain/core/messages";
import { createReactAgent } from "@langchain/langgraph/prebuilt";
import type express from "express";
import { createLogger } from "../../logging/logger";
import { Plugin, toPlugin } from "../../plugin";
import { createInvokeHandler, type InvokableAgent } from "./invoke-handler";
import { agentManifest } from "./manifest";
import type { AgentTraceDestination, IAgentConfig } from "./types";

const logger = createLogger("agent");

const DEFAULT_SYSTEM_PROMPT =
"You are a helpful AI assistant with access to various tools.";

export class AgentPlugin extends Plugin<IAgentConfig> {
public name = "agent" as const;

/** Plugin manifest declaring metadata and resource requirements */
static manifest = agentManifest;

protected declare config: IAgentConfig;

private langGraphAgent: InvokableAgent | null = null;
private systemPrompt = DEFAULT_SYSTEM_PROMPT;
private mcpClient: { close(): Promise<void> } | null = null;

/**
* Provides config-dependent resource requirements:
* when traceDestination.type === 'mlflow' and no experimentId in config,
* MLFLOW_EXPERIMENT_ID env var becomes required.
*/
static getResourceRequirements(config: IAgentConfig) {
const resources = [];
if (
config.traceDestination?.type === "mlflow" &&
!config.traceDestination.experimentId
) {
resources.push({
type: "experiment" as const,
alias: "MLflow Experiment",
resourceKey: "agent-mlflow-experiment",
description:
"MLflow experiment for tracing agent invocations (required when traceDestination.type is 'mlflow' and no experimentId is provided in config)",
permission: "CAN_READ" as const,
fields: {
id: { env: "MLFLOW_EXPERIMENT_ID" },
},
required: true,
});
}
return resources;
}

async setup() {
this.systemPrompt = this.config.systemPrompt ?? DEFAULT_SYSTEM_PROMPT;

// Initialize tracing if requested
if (this.config.traceDestination?.type === "mlflow") {
await this._setupMLflowTracing(this.config.traceDestination);
}

// Resolve model name from config or environment
const modelName =
this.config.model ?? process.env.DATABRICKS_AGENT_SERVING_ENDPOINT_NAME;

if (!modelName) {
throw new Error(
"AgentPlugin: model name is required. Set config.model or DATABRICKS_AGENT_SERVING_ENDPOINT_NAME env var.",
);
}

// Create ChatDatabricks model
const model = new ChatDatabricks({
model: modelName,
temperature: this.config.temperature ?? 0.1,
maxTokens: this.config.maxTokens ?? 2000,
maxRetries: 3,
});

// Load MCP tools if configured
const tools: any[] = [];

if (this.config.mcpServers?.length) {
try {
// Build MCP server configurations (handles Databricks auth)
const mcpServerConfigs = await buildMCPServerConfig(
this.config.mcpServers,
);

// Dynamically import MultiServerMCPClient to avoid hard dep at module level
const { MultiServerMCPClient } = await import(
"@langchain/mcp-adapters"
);
this.mcpClient = new MultiServerMCPClient({
mcpServers: mcpServerConfigs,
throwOnLoadError: false,
prefixToolNameWithServerName: true,
});

const mcpTools = await (this.mcpClient as any).getTools();
tools.push(...mcpTools);
logger.info(
"Loaded %d MCP tools from %d server(s)",
tools.length,
this.config.mcpServers.length,
);
} catch (err) {
logger.warn(
"Failed to load MCP tools: %O — continuing without them",
err,
);
}
}

// Add any statically configured tools
if (this.config.tools?.length) {
tools.push(...this.config.tools);
}

// Create the LangGraph ReAct agent
this.langGraphAgent = createReactAgent({
llm: model,
tools,
}) as InvokableAgent;

logger.info(
"AgentPlugin initialized: model=%s tools=%d systemPrompt=%s",
modelName,
tools.length,
this.systemPrompt.slice(0, 60),
);
}

injectRoutes(router: express.Router) {
// POST /api/agent — standard appkit invoke endpoint (streaming Responses API format)
router.post(
"/",
createInvokeHandler(
() => this.langGraphAgent!,
() => this.systemPrompt,
),
);
this.registerEndpoint("invoke", `/api/${this.name}`);
}

/**
* Inject /invocations at root level — the Databricks model serving convention.
* Called by ServerPlugin after plugin routes are mounted.
*/
injectAppRoutes(app: express.Application) {
app.post(
"/invocations",
createInvokeHandler(
() => this.langGraphAgent!,
() => this.systemPrompt,
),
);
}

async shutdown() {
if (this.mcpClient) {
try {
await this.mcpClient.close();
} catch (err) {
logger.warn("Error closing MCP client: %O", err);
}
}
}

exports() {
return {
/**
* Invoke the agent and return the full text response.
*/
invoke: async (
messages: { role: string; content: string }[],
): Promise<string> => {
if (!this.langGraphAgent)
throw new Error("AgentPlugin not initialized");
const builtMessages = [
new SystemMessage(this.systemPrompt),
...messages.map((m) =>
m.role === "user"
? new HumanMessage(m.content)
: new SystemMessage(m.content),
),
];
const result = await this.langGraphAgent.invoke({
messages: builtMessages,
});
const finalMessages = result.messages ?? [];
const last = finalMessages[finalMessages.length - 1];
return typeof last?.content === "string" ? last.content : "";
},

/**
* Stream agent response as text chunks.
*/
stream: async function* (
this: AgentPlugin,
messages: { role: string; content: string }[],
) {
if (!this.langGraphAgent)
throw new Error("AgentPlugin not initialized");
const builtMessages = [
new SystemMessage(this.systemPrompt),
...messages.map((m) =>
m.role === "user"
? new HumanMessage(m.content)
: new SystemMessage(m.content),
),
];
const stream = this.langGraphAgent.streamEvents(
{ messages: builtMessages },
{ version: "v2" },
);
for await (const event of stream) {
if (event.event === "on_chat_model_stream") {
const content = event.data?.chunk?.content;
if (content && typeof content === "string") {
yield content;
}
}
}
}.bind(this),
};
}

/**
* Set up MLflow/OTel tracing.
* Uses the OTLP exporter already available in appkit pointed at the
* Databricks OTel collector endpoint, enriched with MLflow headers.
*/
private async _setupMLflowTracing(
dest: Extract<AgentTraceDestination, { type: "mlflow" }>,
) {
const experimentId = dest.experimentId ?? process.env.MLFLOW_EXPERIMENT_ID;

if (!experimentId) {
logger.warn(
"AgentPlugin: traceDestination.type is 'mlflow' but no experimentId found. " +
"Set traceDestination.experimentId or MLFLOW_EXPERIMENT_ID to enable tracing.",
);
return;
}

const databricksHost = process.env.DATABRICKS_HOST;
if (!databricksHost) {
logger.warn(
"AgentPlugin: DATABRICKS_HOST not set, skipping MLflow tracing setup.",
);
return;
}

// Set up environment variables consumed by the existing TelemetryManager
// so that OTel spans are exported to the Databricks MLflow endpoint.
const host = databricksHost.replace(/\/$/, "");
if (!process.env.OTEL_EXPORTER_OTLP_ENDPOINT) {
process.env.OTEL_EXPORTER_OTLP_ENDPOINT = `${host}/api/2.0/otel/v1/traces`;
}

// Pass experiment ID as an OTel resource attribute recognised by MLflow
const existing = process.env.OTEL_RESOURCE_ATTRIBUTES ?? "";
const expAttr = `mlflow.experimentId=${experimentId}`;
if (!existing.includes(expAttr)) {
process.env.OTEL_RESOURCE_ATTRIBUTES = existing
? `${existing},${expAttr}`
: expAttr;
}

logger.info(
"MLflow tracing configured: experimentId=%s endpoint=%s",
experimentId,
process.env.OTEL_EXPORTER_OTLP_ENDPOINT,
);
}
}

export const agent = toPlugin<typeof AgentPlugin, IAgentConfig, "agent">(
AgentPlugin,
"agent",
);
3 changes: 3 additions & 0 deletions packages/appkit/src/plugins/agent/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from "./agent";
export * from "./manifest";
export * from "./types";
Loading