-
-
Notifications
You must be signed in to change notification settings - Fork 7
Integrate LangGraph for Mapbox GeoJSON control #503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate LangGraph for Mapbox GeoJSON control #503
Conversation
- Replaced the procedural MapControlOrchestrator with a LangGraph StateGraph implementation in lib/agents/map-langgraph.tsx. - Implemented nodes for classification, MCP tool execution (geocoding), GeoJSON parsing, and map command generation. - Added a MapStatus component to stream intermediate orchestration steps to the UI. - Integrated the new orchestrator into app/actions.tsx using runMapGraph. - Added required dependencies. Co-authored-by: ngoiyaeric <115367894+ngoiyaeric@users.noreply.github.com>
|
👋 Jules, reporting for duty! I'm here to lend a hand with this pull request. When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down. I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job! For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with New to Jules? Learn more at jules.google/docs. For security, I will only act on instructions from the user who triggered this task. |
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
|
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new LangGraph orchestrator introduces a few reliability and maintainability issues: the MapStatus UI stream is not finalized on error, several any/casts undercut the “type-safe state transitions” goal, and parserNode mutates GeoJSON in-place. Additionally, the streaming status updates are currently emitted after phases complete (reducing UX value), and the graph’s retry loop re-runs classification rather than the steps that can change outputs. Addressing these will improve determinism, debuggability, and latency/cost.
Summary of changes
Summary
- Replaced the legacy map orchestrator call in
app/actions.tsxwith a LangGraph-drivenrunMapGraph(...)pipeline. - Added a streaming UI status indicator via the new client component
components/map/map-status.tsxand wired it into the orchestration flow. - Introduced
lib/agents/map-langgraph.tsximplementing aStateGraphwith nodes for classification, MCP tool execution, GeoJSON parsing, command generation, and validation. - Re-exported the new orchestrator entrypoint from
lib/agents/index.tsx. - Added LangChain/LangGraph dependencies (
@ai-sdk/langchain,@langchain/*) topackage.json.
| console.log("🚀 Starting LangGraph Map Orchestrator..."); | ||
| const statusStream = createStreamableUI(<MapStatus status="Initializing..." />); | ||
| uiStream.append(statusStream.value); | ||
|
|
||
| locationResponse = await runMapGraph(answer, { | ||
| mcpClient, | ||
| connectionId, | ||
| enableFeedbackLoop: false // Disable for now to reduce latency | ||
| onStatusUpdate: (status) => { | ||
| statusStream.update(<MapStatus status={status} />); | ||
| } | ||
| }); | ||
| console.log('✨ Orchestrator finished:', locationResponse); | ||
| statusStream.done(); | ||
| } catch (e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statusStream.done() is only called on the success path. If runMapGraph(...) throws, the UI stream created by createStreamableUI(<MapStatus ... />) may never be finalized, which can leave the client in a perpetual “loading”/spinner state and potentially leak resources.
This is particularly likely given MCP/network calls in the graph. Finalizing the stream in a finally block (or calling .error(...)/.done() in the catch) will make the UI deterministic.
Suggestion
Move statusStream.done() into a finally to guarantee termination, and optionally update status on failure.
const statusStream = createStreamableUI(<MapStatus status="Initializing..." />)
uiStream.append(statusStream.value)
try {
locationResponse = await runMapGraph(answer, {
mcpClient,
connectionId,
onStatusUpdate: (status) => statusStream.update(<MapStatus status={status} />),
})
} catch (e) {
statusStream.update(<MapStatus status="Map orchestration failed; falling back..." />)
throw e
} finally {
statusStream.done()
}Reply with "@CharlieHelps yes please" if you'd like me to add a commit with this change.
| const MapGraphState = Annotation.Root({ | ||
| researcherResponse: Annotation<string>, | ||
| classification: Annotation<QueryClassification | undefined>, | ||
| mcpData: Annotation<MCPResult[]>({ | ||
| reducer: (x, y) => x.concat(y), | ||
| default: () => [], | ||
| }), | ||
| geocodedFeatures: Annotation<GeoJSONFeature[]>({ | ||
| reducer: (x, y) => x.concat(y), | ||
| default: () => [], | ||
| }), | ||
| geojson: Annotation<GeoJSONFeatureCollection | null>({ | ||
| reducer: (x, y) => y ?? x, | ||
| default: () => null, | ||
| }), | ||
| commands: Annotation<MapCommand[]>({ | ||
| reducer: (x, y) => y ?? x, | ||
| default: () => [], | ||
| }), | ||
| iteration: Annotation<number>({ | ||
| reducer: (x, y) => x + y, | ||
| default: () => 0, | ||
| }), | ||
| feedback: Annotation<MapStateFeedback | undefined>, | ||
| error: Annotation<string | undefined>, | ||
| mcpClient: Annotation<any>, | ||
| connectionId: Annotation<string | undefined>, | ||
| }); | ||
|
|
||
| // Nodes | ||
| const classifierNode = async (state: typeof MapGraphState.State) => { | ||
| console.log("--- NODE: CLASSIFIER ---"); | ||
| const model = await getModel(); | ||
| const ORCHESTRATOR_CLASSIFICATION_PROMPT = `Analyze the following response from a researcher agent and determine if it requires Mapbox MCP tool intervention. | ||
|
|
||
| Classification Criteria: | ||
| - simple_location: Researcher mentions one or two specific places. | ||
| - route_distance: Researcher mentions travel between points or distances. | ||
| - multi_location: Researcher lists many places (e.g., "top 10 museums"). | ||
| - nearby_search: Researcher mentions things "near" a location. | ||
| - complex_operation: Requires multiple steps or logic. | ||
|
|
||
| Response to analyze: | ||
| ${state.researcherResponse}`; | ||
|
|
||
| const { object } = await generateObject({ | ||
| model, | ||
| schema: QueryClassificationSchema, | ||
| prompt: ORCHESTRATOR_CLASSIFICATION_PROMPT, | ||
| }); | ||
|
|
||
| return { classification: object }; | ||
| }; | ||
|
|
||
| const mcpExecutorNode = async (state: typeof MapGraphState.State) => { | ||
| console.log("--- NODE: MCP_EXECUTOR ---"); | ||
| if (!state.classification || !state.mcpClient || !state.connectionId) { | ||
| return { mcpData: [], geocodedFeatures: [] }; | ||
| } | ||
|
|
||
| const operations = state.classification.mcpOperations || []; | ||
| const mcpData: MCPResult[] = []; | ||
| const geocodedFeatures: GeoJSONFeature[] = []; | ||
|
|
||
| for (const operation of operations) { | ||
| try { | ||
| let result; | ||
| switch (operation) { | ||
| case 'geocode': | ||
| result = await state.mcpClient.tools.execute('mapbox_geocode_location', { | ||
| arguments: { query: state.researcherResponse, includeMapPreview: true }, | ||
| connectedAccountId: state.connectionId | ||
| }); | ||
| break; | ||
| case 'search_nearby': | ||
| result = await state.mcpClient.tools.execute('mapbox_search_box_text_search', { | ||
| arguments: { q: state.researcherResponse }, | ||
| connectedAccountId: state.connectionId | ||
| }); | ||
| break; | ||
| } | ||
|
|
||
| if (result) { | ||
| const data = (result as any).data || result; | ||
| mcpData.push({ tool: operation, result: data, timestamp: Date.now() }); | ||
|
|
||
| if (data.features) { | ||
| geocodedFeatures.push(...data.features); | ||
| } | ||
| } | ||
| } catch (e) { | ||
| console.error(`MCP error (${operation}):`, e); | ||
| } | ||
| } | ||
|
|
||
| return { mcpData, geocodedFeatures }; | ||
| }; | ||
|
|
||
| const parserNode = async (state: typeof MapGraphState.State) => { | ||
| console.log("--- NODE: PARSER ---"); | ||
| const parserOutput = await geojsonParser(state.researcherResponse); | ||
|
|
||
| let finalGeojson = parserOutput.geojson; | ||
|
|
||
| if (state.geocodedFeatures.length > 0) { | ||
| if (!finalGeojson) { | ||
| finalGeojson = { type: 'FeatureCollection', features: state.geocodedFeatures }; | ||
| } else { | ||
| finalGeojson.features = [...finalGeojson.features, ...state.geocodedFeatures]; | ||
| } | ||
| } | ||
|
|
||
| return { geojson: finalGeojson }; | ||
| }; | ||
|
|
||
| const commandGeneratorNode = async (state: typeof MapGraphState.State) => { | ||
| console.log("--- NODE: COMMAND_GENERATOR ---"); | ||
| if (!state.geojson) return { commands: [] }; | ||
|
|
||
| const { commands } = await mapCommandGenerator({ | ||
| text: state.researcherResponse, | ||
| geojson: state.geojson | ||
| }); | ||
|
|
||
| return { commands }; | ||
| }; | ||
|
|
||
| const validatorNode = async (state: typeof MapGraphState.State) => { | ||
| console.log("--- NODE: VALIDATOR ---"); | ||
| return { iteration: 1 }; | ||
| }; | ||
|
|
||
| // Router for conditional edge | ||
| const shouldContinue = (state: typeof MapGraphState.State) => { | ||
| const isSpatial = state.classification?.type !== undefined; | ||
| const hasGeoJSON = state.geojson !== null && state.geojson.features.length > 0; | ||
|
|
||
| if (isSpatial && !hasGeoJSON && (state.iteration || 0) < 2) { | ||
| return "classifier"; | ||
| } | ||
| return END; | ||
| }; | ||
|
|
||
| // Define the graph | ||
| const workflow = new StateGraph(MapGraphState) | ||
| .addNode("classifier", classifierNode) | ||
| .addNode("mcp_executor", mcpExecutorNode) | ||
| .addNode("parser", parserNode) | ||
| .addNode("command_generator", commandGeneratorNode) | ||
| .addNode("validator", validatorNode) | ||
| .addEdge(START, "classifier") | ||
| .addEdge("classifier", "mcp_executor") | ||
| .addEdge("mcp_executor", "parser") | ||
| .addEdge("parser", "command_generator") | ||
| .addEdge("command_generator", "validator") | ||
| .addConditionalEdges("validator", shouldContinue as any, { | ||
| classifier: "classifier", | ||
| [END]: END | ||
| }); | ||
|
|
||
| export const mapGraph = workflow.compile(); | ||
|
|
||
| /** | ||
| * Bridge to run the graph and return results | ||
| */ | ||
| export async function runMapGraph( | ||
| researcherResponse: string, | ||
| options: { mcpClient?: any, connectionId?: string, onStatusUpdate?: (status: string) => void } | ||
| ): Promise<LocationResponse> { | ||
| const initialState = { | ||
| researcherResponse, | ||
| iteration: 0, | ||
| mcpData: [] as MCPResult[], | ||
| geocodedFeatures: [] as GeoJSONFeature[], | ||
| commands: [] as MapCommand[], | ||
| geojson: null as GeoJSONFeatureCollection | null, | ||
| mcpClient: options.mcpClient, | ||
| connectionId: options.connectionId, | ||
| }; | ||
|
|
||
| const stream = await mapGraph.stream(initialState, { streamMode: "values" }); | ||
|
|
||
| let finalResult: any = initialState; | ||
|
|
||
| for await (const update of stream) { | ||
| finalResult = update; | ||
|
|
||
| if (options.onStatusUpdate) { | ||
| if (update.classification && (!update.mcpData || update.mcpData.length === 0)) options.onStatusUpdate("Classifying query..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MapGraphState uses Annotation<any> for mcpClient, and later there are multiple (result as any) / let finalResult: any / shouldContinue as any. This undermines the stated goal of “Improved Type Safety” and makes it easier for runtime shape changes to silently break status routing and result extraction.
Even if the external client is hard to type, you can still strongly type the graph state you control (e.g., the streamed state updates, the tool execute return shape you rely on, and the conditional-edge router signature) and avoid broad any at the state level.
Suggestion
Replace any with minimal local interfaces/types and remove the as any casts where possible.
Example direction:
type McpToolExecutor = {
tools: {
execute: (
tool: string,
params: { arguments: Record<string, unknown>; connectedAccountId: string }
) => Promise<{ data?: unknown } | unknown>
}
}
// In state
mcpClient: Annotation<McpToolExecutor | undefined>,
// Stream result typing
let finalResult: typeof MapGraphState.State = initialState as any // ideally avoid this tooAlso type the router properly (LangGraph exposes router typing helpers), and eliminate shouldContinue as any.
Reply with "@CharlieHelps yes please" if you'd like me to add a commit that tightens these types without changing runtime behavior.
| const parserNode = async (state: typeof MapGraphState.State) => { | ||
| console.log("--- NODE: PARSER ---"); | ||
| const parserOutput = await geojsonParser(state.researcherResponse); | ||
|
|
||
| let finalGeojson = parserOutput.geojson; | ||
|
|
||
| if (state.geocodedFeatures.length > 0) { | ||
| if (!finalGeojson) { | ||
| finalGeojson = { type: 'FeatureCollection', features: state.geocodedFeatures }; | ||
| } else { | ||
| finalGeojson.features = [...finalGeojson.features, ...state.geocodedFeatures]; | ||
| } | ||
| } | ||
|
|
||
| return { geojson: finalGeojson }; | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parserNode mutates finalGeojson.features in-place when finalGeojson is already set:
finalGeojson.features = [...finalGeojson.features, ...state.geocodedFeatures]In graph/state-machine setups, in-place mutation can lead to subtle bugs (e.g., previous state snapshots unexpectedly changing, reducer semantics becoming confusing). Returning a new object is safer and makes the node purely functional.
Suggestion
Avoid mutating finalGeojson and return a new FeatureCollection instead.
const parserOutput = await geojsonParser(state.researcherResponse)
const parsed = parserOutput.geojson
const geojson = state.geocodedFeatures.length
? {
type: 'FeatureCollection' as const,
features: [
...(parsed?.features ?? []),
...state.geocodedFeatures,
],
}
: parsed
return { geojson }Reply with "@CharlieHelps yes please" if you'd like me to add a commit with this refactor.
| const stream = await mapGraph.stream(initialState, { streamMode: "values" }); | ||
|
|
||
| let finalResult: any = initialState; | ||
|
|
||
| for await (const update of stream) { | ||
| finalResult = update; | ||
|
|
||
| if (options.onStatusUpdate) { | ||
| if (update.classification && (!update.mcpData || update.mcpData.length === 0)) options.onStatusUpdate("Classifying query..."); | ||
| if (update.mcpData?.length > 0 && !update.geojson) options.onStatusUpdate("Geocoding locations..."); | ||
| if (update.geojson && (!update.commands || update.commands.length === 0)) options.onStatusUpdate("Generating map data..."); | ||
| if (update.commands?.length > 0) options.onStatusUpdate("Finalizing map..."); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The status update logic is inverted/late in a few places:
"Classifying query..."is only emitted whenupdate.classificationalready exists, which is after classification is finished."Geocoding locations..."is gated onmcpData.length > 0, which is after MCP calls return.
As a result, the UI likely jumps from "Initializing..." straight to later statuses, and may miss the most useful “in progress” updates. Emit status when entering nodes (or infer transitions from previous->current state) rather than from the presence of completed outputs.
Suggestion
Track the last emitted phase and update based on transitions, or (preferably) emit status from inside nodes via a callback carried in state/options.
Minimal transition-based fix:
let phase: 'init'|'classifying'|'mcp'|'parsing'|'commands'|'done' = 'init'
for await (const update of stream) {
finalResult = update
if (options.onStatusUpdate) {
if (phase === 'init') {
phase = 'classifying'
options.onStatusUpdate('Classifying query...')
} else if (phase === 'classifying' && update.classification) {
phase = 'mcp'
options.onStatusUpdate('Geocoding locations...')
} else if (phase === 'mcp' && (update.geojson || update.geocodedFeatures?.length)) {
phase = 'commands'
options.onStatusUpdate('Generating map data...')
} else if (phase === 'commands' && update.commands?.length) {
phase = 'done'
options.onStatusUpdate('Finalizing map...')
}
}
}Reply with "@CharlieHelps yes please" if you'd like me to add a commit improving the status lifecycle.
| // Router for conditional edge | ||
| const shouldContinue = (state: typeof MapGraphState.State) => { | ||
| const isSpatial = state.classification?.type !== undefined; | ||
| const hasGeoJSON = state.geojson !== null && state.geojson.features.length > 0; | ||
|
|
||
| if (isSpatial && !hasGeoJSON && (state.iteration || 0) < 2) { | ||
| return "classifier"; | ||
| } | ||
| return END; | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldContinue treats any non-undefined classification.type as “spatial” and loops back to classifier when GeoJSON is missing. That loop re-runs the same classification prompt against the same researcherResponse, which is unlikely to change outcomes and can add latency/cost without improving results.
If you need a retry, it should probably retry the parser (or MCP execution with different arguments) rather than classification. Otherwise, end early when classification indicates no MCP operations/geojson expected.
Suggestion
Change the retry edge to target the node that can actually change output (e.g., parser or mcp_executor), or remove the loop entirely.
For example, retry parsing once:
if (isSpatial && !hasGeoJSON && state.iteration < 1) return 'parser'
return END…and update the conditional edges map accordingly.
Reply with "@CharlieHelps yes please" if you'd like me to add a commit adjusting the retry behavior.
| const mcpExecutorNode = async (state: typeof MapGraphState.State) => { | ||
| console.log("--- NODE: MCP_EXECUTOR ---"); | ||
| if (!state.classification || !state.mcpClient || !state.connectionId) { | ||
| return { mcpData: [], geocodedFeatures: [] }; | ||
| } | ||
|
|
||
| const operations = state.classification.mcpOperations || []; | ||
| const mcpData: MCPResult[] = []; | ||
| const geocodedFeatures: GeoJSONFeature[] = []; | ||
|
|
||
| for (const operation of operations) { | ||
| try { | ||
| let result; | ||
| switch (operation) { | ||
| case 'geocode': | ||
| result = await state.mcpClient.tools.execute('mapbox_geocode_location', { | ||
| arguments: { query: state.researcherResponse, includeMapPreview: true }, | ||
| connectedAccountId: state.connectionId | ||
| }); | ||
| break; | ||
| case 'search_nearby': | ||
| result = await state.mcpClient.tools.execute('mapbox_search_box_text_search', { | ||
| arguments: { q: state.researcherResponse }, | ||
| connectedAccountId: state.connectionId | ||
| }); | ||
| break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mcpExecutorNode always sends the entire researcherResponse as the geocode query and nearby search query. This will degrade geocoding quality (and may increase cost) if the response contains narrative text, multiple locations, or irrelevant context.
Because you already have a classification step, you should pass extracted locations/queries from that output (or a dedicated extraction node) rather than the full response.
Suggestion
Extend QueryClassificationSchema to include a normalized queries payload (e.g., locations: string[], nearbyQuery?: { term: string; near: string }) and use that for MCP calls.
case 'geocode': {
for (const loc of state.classification.locations ?? []) {
const result = await state.mcpClient.tools.execute('mapbox_geocode_location', {
arguments: { query: loc, includeMapPreview: true },
connectedAccountId: state.connectionId,
})
// collect features...
}
break
}Reply with "@CharlieHelps yes please" if you'd like me to add a commit with this suggestion.
d7ed542
into
feat/geojson-enrichment-agent
This PR replaces the legacy MapControlOrchestrator with a modern LangGraph-based implementation.
Key changes:
lib/agents/map-langgraph.tsxfile defines aStateGraphthat manages the handoff from the researcher's text response to the map control logic. It includes nodes for classification, MCP tool execution, GeoJSON parsing, and command generation.MapStatuscomponent provides visual feedback during the orchestration process, streaming status updates like "Classifying query..." and "Geocoding locations..." directly to the chat interface.@ai-sdk/langchain,@langchain/core,@langchain/langgraph,@langchain/openai, and@langchain/google-genai.The implementation follows the pattern suggested in the AI SDK documentation for LangChain/LangGraph integration.
PR created automatically by Jules for task 17076216558174372852 started by @ngoiyaeric