Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bot_async_transcription_elasticsearch_rag/.env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
RECALL_API_KEY=RECALL_API_KEY
RECALL_REGION=RECALL_REGION # e.g. us-west-2, us-east-1, eu-central-1, ap-northeast-1

# Optional if using the run.sh script to launch a bot
MEETING_URL=MEETING_URL # e.g. any Zoom, Google Meet, Microsoft Teams, Webex, or GoToMeeting URL
NGROK_DOMAIN=NGROK_DOMAIN # Omit the protocol e.g. if your ngrok URL is https://1a8d23b7ab2d.ngrok-free.app, drop the protocol and set to 1a8d23b7ab2d.ngrok-free.app

# Optional
ELASTIC_SEARCH_URL= # defaults to docker elastic search
EMBEDDING_MODEL= # defaults to "embeddinggemma"
154 changes: 154 additions & 0 deletions bot_async_transcription_elasticsearch_rag/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Async Transcription with RAG

This example expands on the hybrid diarization example by providing retrieval augmented generation (RAG) to fetch previous transcripts.

## What is Retrieval Augmented Generation (RAG)?

Retrieval Augmented Generation is a search method that enables large language models (LLMs) to retrieve and incorporate new information.

In this example, we provide a simple RAG server that searches an [ElasticSearch](https://www.elastic.co/elasticsearch/vector-database) vector database for content.

## How It Works

The server listens for webhook events from Recall.ai:

1. When `recording.done` is received, it triggers async transcript creation via Recall's API.
2. When `transcript.done` is received, it downloads both the transcript and speaker timeline data, then merges them using the hybrid diarization algorithm. This information is then vectorized and stored in an ElasticSearch database for querying.

## Prerequisites

- [Docker](https://www.docker.com/products/docker-desktop/)
- [Ollama](https://ollama.com/download)
- [ngrok](https://ngrok.com/)
- [Node.js](https://nodejs.org/en/download)
- [npm](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm)

## Quickstart

> **Note:** Make sure you don't have any apps running on port 4000 before starting.

### 1. Start docker

Set up an ElasticSearch database and, optionally, the Kibana dashboard to visualize the data.

```bash
docker compose up -d
```

### 2. Start ngrok

In a terminal window, run:

```bash
ngrok http 4000
```

After it's running, copy the ngrok URL (e.g. `somehash.ngrok-free.app`).

### 3. Set up environment variables

Copy the `.env.sample` file and rename it to `.env`:

```bash
cp .env.sample .env
```

Fill out the variables in the `.env` file, including the ngrok domain from step 2 (omit the `https://` protocol).

### 4. Add the ElasticSearch indices

Setup the ElasticSearch database to store transcript data.

```bash
npm run dev:setup
```

The created indices can be found at [Kibana index management](http://localhost:5601/app/management/data/index_management/indices)

### 5. Setup Ollama

Download the embedding model. The default model used is Embedding Gemma; however, another model can be used with the `EMBEDDING_MODEL` environment variable.

```bash
ollama pull embeddinggemma
```

### 6. Add your webhook URL to the Recall dashboard

Go to the Recall.ai webhooks dashboard for your region and add your ngrok URL:

- [us-east-1 webhooks dashboard](https://us-east-1.recall.ai/dashboard/webhooks)
- [us-west-2 webhooks dashboard](https://us-west-2.recall.ai/dashboard/webhooks)
- [eu-central-1 webhooks dashboard](https://eu-central-1.recall.ai/dashboard/webhooks)
- [ap-northeast-1 webhooks dashboard](https://ap-northeast-1.recall.ai/dashboard/webhooks)

Subscribe to the following events:

- `recording.done`
- `transcript.done`

### 7. Setup Deepgram Transcription

Go to the Recall.ai transcription dashboard and set up the Deepgram provider. You may choose a different provider by updating [create_async_transcript](./src/api/).

### 8. Start the server

Open this directory in a new terminal and run:

```bash
npm install
npm run dev
```

This will start a server on port 4000.

### 9. Create a bot

You can create a bot using the `run.sh` script or manually with `curl`.

**Option A: Using `run.sh` (recommended)**

```bash
chmod +x run.sh
./run.sh
```

**Option B: Using `curl`**

```bash
curl --request POST \
--url https://RECALL_REGION.recall.ai/api/v1/bot/ \
--header 'Authorization: RECALL_API_KEY' \
--header 'accept: application/json' \
--header 'content-type: application/json' \
--data '{
"meeting_url": "YOUR_MEETING_URL"
}'
```

Replace `RECALL_REGION`, `RECALL_API_KEY`, and `YOUR_MEETING_URL` with your own values.

### 10. Use the MCP search tool

Upon completion of the transcript, all context is stored in the ElasticSearch database for future querying.

This example provides an MCP server (without authentication) located at `/api/mcp`. To use the MCP, add the server endpoint `http://localhost:4000/api/mcp` if running locally (such as with Claude Code) or `${NGROK_DOMAIN}/api/mcp` if running on a cloud program (such as ChatGPT).

To continue with our example, we will use Cursor. Add the following MCP configuration in Cursor's settings:
```json
{
"mcpServers": {
"Recall Ai Local": {
"url": "http://localhost:4000/api/mcp",
"headers": {}
}
}
}
```

In the chat window, you can now ask some of the following questions:
- Find me all the transcripts from yesterday
- Find me the transcript where I talk about _(search term)_
- I remember _(person name)_ talking about _(search term)_, can you recall it for me?
- I remember _(person name)_ talking about _(search term)_, what was it again?
- What conversations was _(person name)_ in yesterday?
31 changes: 31 additions & 0 deletions bot_async_transcription_elasticsearch_rag/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.19.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- xpack.security.http.ssl.enabled=false
- ES_JAVA_OPTS=-Xms1g -Xmx1g
ports:
- "9200:9200"
- "9300:9300"
volumes:
- es_data:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1

kibana:
image: docker.elastic.co/kibana/kibana:8.11.3
container_name: kibana
depends_on:
- elasticsearch
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"

volumes:
es_data:
29 changes: 29 additions & 0 deletions bot_async_transcription_elasticsearch_rag/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"name": "bot_async_transcription_with_rag",
"version": "1.0.0",
"description": "Process transcription data into a RAG server",
"main": "api/index.ts",
"scripts": {
"dev": "ts-node src/api/index.ts",
"dev:setup": "ts-node src/script/index.ts",
"test": "vitest run",
"test:watch": "vitest"
},
"author": "Dylan Vanmali",
"license": "MIT",
"devDependencies": {
"@types/node": "^24.10.1",
"ts-node": "^10.9.2",
"typescript": "^5.9.3",
"vitest": "^2.1.8"
},
"dependencies": {
"@elastic/elasticsearch": "^8.19.1",
"@modelcontextprotocol/sdk": "^1.25.3",
"commander": "^14.0.2",
"dotenv": "^17.2.3",
"http": "^0.0.1-security",
"ollama": "^0.6.3",
"zod": "^4.1.13"
}
}
23 changes: 23 additions & 0 deletions bot_async_transcription_elasticsearch_rag/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env bash
set -euo pipefail

DOTENV_FILE="${DOTENV_FILE:-.env}"
if [ -f "$DOTENV_FILE" ]; then
# shellcheck source=/dev/null
source "$DOTENV_FILE"
fi

: "${RECALL_REGION:?RECALL_REGION is required (us-west-2, us-east-1, eu-central-1, ap-northeast-1)}"
: "${RECALL_API_KEY:?RECALL_API_KEY is required}"
: "${MEETING_URL:?MEETING_URL is required (Zoom/Meet URL)}"

curl --request POST \
--url https://${RECALL_REGION}.recall.ai/api/v1/bot/ \
--header "Authorization: ${RECALL_API_KEY}" \
--header "accept: application/json" \
--header "content-type: application/json" \
--data @- <<EOF
{
"meeting_url": "${MEETING_URL}"
}
EOF
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import ollama from "ollama";
import { z } from "zod";
import { env } from "../config/env";
import { RecordingArtifactSchema } from "../schemas/RecordingArtifactSchema";
import { SpeakerTimelinePartSchema } from "../schemas/SpeakerTimelinePartSchema";
import { TranscriptArtifactEventSchema, type TranscriptArtifactEventType } from "../schemas/TranscriptArtifactEventSchema";
import { TranscriptArtifactSchema } from "../schemas/TranscriptArtifactSchema";
import { TranscriptPartSchema } from "../schemas/TranscriptPartSchema";
import { convert_to_hybrid_diarized_transcript_parts } from "./convert_to_hybrid_diarized_transcript_parts";
import { convert_to_readable_transcript } from "./convert_to_readable_transcript";
import { create_index_paragraph } from "./elasticsearch";

/**
* Create an async transcript job for a recording.
* A `transcript.done` or `transcript.failed` webhook event will be sent when the job has completed and the transcript is ready.
*/
export async function create_async_transcript(args: { recording_id: string }) {
const { recording_id } = z.object({ recording_id: z.string() }).parse(args);
const response = await fetch(`https://${env.RECALL_REGION}.recall.ai/api/v1/recording/${recording_id}/create_transcript/`, {
method: "POST",
headers: {
"Authorization": `${env.RECALL_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
provider: { deepgram_async: { diarize: true } },
}),
});
if (!response.ok) throw new Error(await response.text());
return TranscriptArtifactSchema.parse(await response.json());
}

/*
* Retrieve and save the transcript.
*/
export async function bot_async_transcription(args: { msg: TranscriptArtifactEventType, recording_id: string }) {
const { msg, recording_id } = z.object({ msg: TranscriptArtifactEventSchema, recording_id: z.string() }).parse(args);

const recording = await retrieve_recording_artifact({ recording_id: msg.data.recording.id });
console.log(`Retrieved recording: ${recording.id}`);

if (!recording.media_shortcuts?.transcript?.data?.download_url) {
throw new Error("Transcript download URL is null");
}
if (!recording.media_shortcuts.participant_events?.data?.speaker_timeline_download_url) {
throw new Error("Speaker timeline download URL is null");
}

// Retrieve and format transcript data.
const transcript_parts = await retrieve_transcript_parts({
download_url: recording.media_shortcuts.transcript.data.download_url,
});
console.log(`Retrieved ${transcript_parts.length} transcript parts`);
const speaker_timeline_data = await retrieve_speaker_timeline_parts({
download_url: recording.media_shortcuts.participant_events.data.speaker_timeline_download_url,
});
console.log(`Retrieved ${speaker_timeline_data.length} speaker timeline parts`);
const hybrid_transcript_parts = convert_to_hybrid_diarized_transcript_parts({
transcript_parts,
speaker_timeline_data,
});
// console.log("transcript words:", hybrid_transcript_parts.at(0)?.words);
console.log(`Formatted ${hybrid_transcript_parts.length} hybrid transcript parts`);
const readable_hybrid_transcript_parts = convert_to_readable_transcript({ transcript_parts: hybrid_transcript_parts, recording_id });
console.log(`Formatted ${readable_hybrid_transcript_parts.length} readable hybrid transcript parts`);
console.log("transcript:", readable_hybrid_transcript_parts);

// Batch elastic embeddings
const paragraphs = readable_hybrid_transcript_parts.map((p) => p.paragraph);
console.log(`Creating ${paragraphs.length} embeddings with ollama`);
const embeddings = await ollama.embed({
model: env.EMBEDDING_MODEL || "embeddinggemma",
input: paragraphs,
});
console.log(`Created ${embeddings.embeddings.length} embeddings with ollama`);

// Add the formatted transcript to ElasticSearch
console.log("Adding data to ElasticSearch");
for (let i=0; i<embeddings.embeddings.length; i++) {
try {
const part = readable_hybrid_transcript_parts[i];
if (!part) continue;
const embedding = embeddings.embeddings[i];
if (!embedding) continue;
await create_index_paragraph(
`${recording_id}_${i}`, {
...part,
recording_id,
},
new Date(Date.parse(recording.created_at)),
embedding,
);
} catch (error) {
console.error(error);
}
}
console.log("Added data to ElasticSearch");

// Return the transcript parts and readable transcript.
return {
transcript_parts: hybrid_transcript_parts,
readable_transcript_parts: readable_hybrid_transcript_parts,
};
}

/**
* Retrieve the recording artifact.
*/
async function retrieve_recording_artifact(args: { recording_id: string }) {
const { recording_id } = z.object({ recording_id: z.string() }).parse(args);
const response = await fetch(`https://${env.RECALL_REGION}.recall.ai/api/v1/recording/${recording_id}/`, {
method: "GET",
headers: {
"Authorization": `${env.RECALL_API_KEY}`,
"Content-Type": "application/json",
},
});
if (!response.ok) throw new Error(await response.text());
return RecordingArtifactSchema.parse(await response.json());
}

/**
* Retrieve the transcript parts from the transcript artifact's `download_url`.
*/
async function retrieve_transcript_parts(args: { download_url: string }) {
const { download_url } = z.object({ download_url: z.string() }).parse(args);

const response = await fetch(download_url);
if (!response.ok) throw new Error(await response.text());

return TranscriptPartSchema.array().parse(await response.json());
}

/**
* Retrieve the speaker timeline data from the participant events artifact's `download_url`.
*/
async function retrieve_speaker_timeline_parts(args: { download_url: string }) {
const { download_url } = z.object({ download_url: z.string() }).parse(args);

const response = await fetch(download_url);
if (!response.ok) throw new Error(await response.text());

return SpeakerTimelinePartSchema.array().parse(await response.json());
}
Loading