-
Notifications
You must be signed in to change notification settings - Fork 56
Feature/knowledge connector s3 #831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tmccaughey
wants to merge
20
commits into
master
Choose a base branch
from
feature/knowledge-connector-s3
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
4361527
added first version
tmccaughey 11c958e
set limit to 500 and fixed sync
tmccaughey ea67374
Fixed chunk size
tmccaughey 0dae689
Update extensions/aws/src/module.ts
tmccaughey f9ddef4
Update extensions/aws/src/knowledge-connectors/helpers/text_extractor.ts
tmccaughey 7711ecd
Update extensions/aws/src/knowledge-connectors/helpers/text_chunker.ts
tmccaughey 99d76b5
Update extensions/aws/src/knowledge-connectors/helpers/text_chunker.ts
tmccaughey 84940b1
Update extensions/aws/src/knowledge-connectors/helpers/list_files.ts
tmccaughey cad3a5f
deleted creds.env as not needed
tmccaughey 75b65f7
removed logs, adjusted processing to match manual upload functionalit…
tmccaughey 4da3e66
Update extensions/aws/src/knowledge-connectors/helpers/list_files.ts
tmccaughey 1609e03
Update extensions/aws/src/knowledge-connectors/s3Connector.ts
tmccaughey e966b24
Update extensions/aws/src/knowledge-connectors/helpers/utils/config.ts
tmccaughey 4c59dd8
implemented feedack: - added helpers into main file - renamed file , …
tmccaughey 184bc62
Merge branch 'feature/knowledge-connector-s3' of github.com:Cognigy/E…
tmccaughey e23fe89
removed default parameter as never used
tmccaughey 2a8a18b
Update extensions/aws/src/knowledge-connectors/helpers/text_chunker.ts
tmccaughey e11eaef
Removed logs
tmccaughey 58c8307
removed loggs
tmccaughey 56828f3
cleaned up files
tmccaughey File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| legacy-peer-deps=true |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
90 changes: 90 additions & 0 deletions
90
extensions/aws/src/knowledge-connectors/helpers/chunk_extractor.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| import type { IKnowledge } from "@cognigy/extension-tools"; | ||
| import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3"; | ||
| import { lsExtractor } from "./text_extractor"; | ||
| import * as fs from 'fs'; | ||
| import * as path from 'path'; | ||
| import * as os from 'os'; | ||
|
|
||
| export type ChunkContent = Pick< | ||
| IKnowledge.CreateKnowledgeChunkParams, | ||
| "text" | "data" | ||
| >; | ||
|
|
||
| export type S3Connection = { | ||
| accessKeyId: string; | ||
| secretAccessKey: string; | ||
| region: string; | ||
| }; | ||
|
|
||
| const MAX_CHUNKS_PER_FILE = 500; // Limit chunks per file to avoid timeout issues | ||
|
|
||
| // Download file from S3 and extract chunks | ||
| export const getS3FileChunks = async ( | ||
| connection: S3Connection, | ||
| bucketName: string, | ||
| fileKey: string | ||
| ): Promise<ChunkContent[]> => { | ||
| const s3Client = new S3Client({ | ||
| region: connection.region, | ||
| credentials: { | ||
| accessKeyId: connection.accessKeyId, | ||
| secretAccessKey: connection.secretAccessKey, | ||
| }, | ||
| }); | ||
|
|
||
| // Download file from S3 | ||
| const command = new GetObjectCommand({ | ||
| Bucket: bucketName, | ||
| Key: fileKey, | ||
| }); | ||
|
|
||
| const response = await s3Client.send(command); | ||
| const bodyContents = await streamToBuffer(response.Body as any); | ||
|
|
||
| // Save to temp file (text_extractor needs file path) | ||
| const tempDir = os.tmpdir(); | ||
| const tempFileName = `${Date.now()}_${path.basename(fileKey)}`; | ||
| const tempFilePath = path.join(tempDir, tempFileName); | ||
|
|
||
| fs.writeFileSync(tempFilePath, bodyContents); | ||
|
|
||
| try { | ||
| // Extract text using lsExtractor | ||
| const fileExtension = path.extname(fileKey).slice(1); // Remove the dot | ||
|
|
||
| const extractedText = await lsExtractor(fileExtension, tempFilePath); | ||
|
|
||
|
|
||
| // The lsExtractor returns text that's already been chunked and joined with \n\n | ||
| // Split by \n\n to get the individual chunks back | ||
| const chunks: ChunkContent[] = extractedText | ||
| .split('\n\n') | ||
| .filter(chunk => chunk.trim().length > 0) | ||
| .slice(0, MAX_CHUNKS_PER_FILE) | ||
| .map((chunk, index) => ({ | ||
| text: chunk.trim(), | ||
| data: { | ||
| title: `${fileKey} - Part ${index + 1}`, | ||
| source: fileKey, | ||
| fileType: fileExtension, | ||
| }, | ||
| })); | ||
|
|
||
| return chunks; | ||
|
|
||
| } finally { | ||
| // Clean up temp file | ||
| if (fs.existsSync(tempFilePath)) { | ||
| fs.unlinkSync(tempFilePath); | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| // Helper to convert stream to buffer | ||
| async function streamToBuffer(stream: any): Promise<Buffer> { | ||
| const chunks: Uint8Array[] = []; | ||
| for await (const chunk of stream) { | ||
| chunks.push(chunk); | ||
| } | ||
| return Buffer.concat(chunks); | ||
| } | ||
60 changes: 60 additions & 0 deletions
60
extensions/aws/src/knowledge-connectors/helpers/list_files.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| import { S3Client, ListObjectsV2Command } from "@aws-sdk/client-s3"; | ||
| interface S3Object { | ||
| Key: string; | ||
| Size: number; | ||
| LastModified: Date; | ||
| } | ||
|
|
||
| interface S3Connection { | ||
| accessKeyId: string; | ||
| secretAccessKey: string; | ||
| region: string; | ||
| } | ||
|
|
||
| export async function getS3Object( | ||
| connection: S3Connection, | ||
| bucketName: string, | ||
| prefix?: string, | ||
| ): Promise<S3Object[]> { | ||
| const s3Client = new S3Client({ | ||
| region: connection.region, | ||
| credentials: { | ||
| accessKeyId: connection.accessKeyId, | ||
| secretAccessKey: connection.secretAccessKey, | ||
| }, | ||
| }); | ||
|
|
||
| try { | ||
|
|
||
|
|
||
| const command = new ListObjectsV2Command({ | ||
| Bucket: bucketName, | ||
| MaxKeys: 1000, | ||
| Prefix: prefix, | ||
| }); | ||
|
|
||
| const response = await s3Client.send(command); | ||
|
|
||
| if (!response.Contents) { | ||
| return []; | ||
| } | ||
|
|
||
| const s3Objects: S3Object[] = response.Contents | ||
| .filter(obj => obj.Key && obj.Size && obj.Size > 0) | ||
| .map(obj => ({ | ||
| Key: obj.Key!, | ||
| Size: obj.Size!, | ||
| LastModified: obj.LastModified! | ||
| })); // Filter out empty files and folders | ||
|
|
||
| // Log first few files for debugging | ||
| s3Objects.slice(0, 3).forEach((obj, index) => { | ||
| }); | ||
|
|
||
| return s3Objects; | ||
|
|
||
| } catch (error) { | ||
| console.error("Error listing objects from S3:", error); | ||
| throw error; | ||
| } | ||
| } |
31 changes: 31 additions & 0 deletions
31
extensions/aws/src/knowledge-connectors/helpers/text_chunker.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| import * as splitters from "@langchain/textsplitters"; | ||
|
|
||
| import { getMaxChunkSize, langchainDefaultChunkSizeInChars } from "./utils/config"; | ||
|
|
||
|
|
||
| export async function splitDocs(documents: any): Promise<any[]> { | ||
tmccaughey marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let splitter; | ||
| splitter = getRecursiveCharacterTextSplitter(); | ||
tmccaughey marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| const splitParagraphs = await splitter.splitDocuments(documents); | ||
| return splitParagraphs; | ||
| } | ||
|
|
||
| const getChunkSizeInChars = () => { | ||
| // Langchain has issues and creates chunks larger than the limit set. | ||
| // Therefore a margin is added to chunk size | ||
| const margin = 400; | ||
| const chunkMaxSize = Math.min(langchainDefaultChunkSizeInChars(), getMaxChunkSize()) - margin; | ||
| const chunkSize = chunkMaxSize > 0 ? chunkMaxSize : 1800; | ||
| return chunkSize; | ||
| }; | ||
|
|
||
| const getRecursiveCharacterTextSplitter = () => { | ||
| const chunkSize = getChunkSizeInChars(); | ||
| const chunkOverlap = 0; | ||
| const splitter = new splitters.RecursiveCharacterTextSplitter({ | ||
| chunkSize, | ||
| chunkOverlap, | ||
| keepSeparator: false | ||
| }); | ||
| return splitter; | ||
| }; | ||
130 changes: 130 additions & 0 deletions
130
extensions/aws/src/knowledge-connectors/helpers/text_extractor.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| import { TextLoader } from 'langchain/document_loaders/fs/text'; | ||
| import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf'; | ||
| import { DocxLoader } from '@langchain/community/document_loaders/fs/docx'; | ||
| import { CSVLoader } from '@langchain/community/document_loaders/fs/csv'; | ||
| import { JSONLoader, JSONLinesLoader } from 'langchain/document_loaders/fs/json'; | ||
| import { Document } from '@langchain/core/documents'; | ||
|
|
||
| import { splitDocs } from './text_chunker'; | ||
| import { BufferLoader } from 'langchain/document_loaders/fs/buffer'; | ||
| import { parseOfficeAsync } from 'officeparser'; | ||
|
|
||
| export const logger = { | ||
| log: (level: string, context: any, message: string) => { | ||
| const timestamp = new Date().toISOString(); | ||
| console.log(`[${timestamp}] [${level.toUpperCase()}] ${message}`); | ||
| if (context && Object.keys(context).length > 0) { | ||
| console.log('Context:', JSON.stringify(context, null, 2)); | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| export const removeUnnecessaryChars = (text: string): string => { | ||
| if (!text) return ""; | ||
|
|
||
| return text | ||
| // Remove multiple spaces but preserve newlines | ||
| .replace(/[ \t]+/g, ' ') | ||
| // Remove multiple newlines (keep max 2) | ||
| .replace(/\n\s*\n\s*\n/g, '\n\n') | ||
| // Remove zero-width characters | ||
| .replace(/[\u200B-\u200D\uFEFF]/g, '') | ||
| // Trim whitespace | ||
| .trim(); | ||
| }; | ||
|
|
||
| export const lsExtractor = async (type: string, inputFile: string): Promise<string> => { | ||
| let documentLoader; | ||
| switch (type) { | ||
| case "txt": | ||
| documentLoader = new TextLoader(inputFile); | ||
| break; | ||
|
|
||
| case "pdf": | ||
| // possible config: { splitPage: true } | ||
| // https://js.langchain.com/docs/modules/indexes/document_loaders/examples/file_loaders/pdf | ||
| documentLoader = new PDFLoader(inputFile, { splitPages: false }); | ||
| break; | ||
|
|
||
| case "docx": | ||
| // https://js.langchain.com/docs/modules/indexes/document_loaders/examples/file_loaders/docx | ||
| documentLoader = new DocxLoader(inputFile); | ||
| break; | ||
|
|
||
| case "csv": | ||
| // possible config: columnName | ||
| // https://js.langchain.com/docs/modules/indexes/document_loaders/examples/file_loaders/csv#usage-extracting-a-single-column | ||
| documentLoader = new CSVLoader(inputFile); | ||
| break; | ||
|
|
||
| case "json": | ||
| // possible config: pointer | ||
| // https://js.langchain.com/docs/modules/indexes/document_loaders/examples/file_loaders/json#using-json-pointer-example | ||
| documentLoader = new JSONLoader(inputFile); | ||
| break; | ||
|
|
||
| case "jsonl": | ||
| // possible config: pointer | ||
| // https://js.langchain.com/docs/modules/indexes/document_loaders/examples/file_loaders/jsonlines | ||
| documentLoader = new JSONLinesLoader(inputFile, ""); | ||
| break; | ||
|
|
||
| case 'md': | ||
| documentLoader = new TextLoader(inputFile); | ||
| break; | ||
|
|
||
| case 'pptx': | ||
| // https://js.langchain.com/docs/integrations/document_loaders/file_loaders/pptx/ | ||
| documentLoader = new PPTXLoader(inputFile); | ||
| break; | ||
|
|
||
| default: | ||
| documentLoader = new TextLoader(inputFile); | ||
| } | ||
|
|
||
| // load and extract document | ||
| const docs = await documentLoader.load(); | ||
|
|
||
| // Clean up text for all file types | ||
| docs.forEach((doc) => { | ||
| doc.pageContent = removeUnnecessaryChars(doc?.pageContent); | ||
| }); | ||
|
|
||
| // split document into paragraphs according to specified or default splitter | ||
| const splitDocuments = ( | ||
| await splitDocs(docs) | ||
| ).map((doc) => doc.pageContent); | ||
|
|
||
| // join the paragraphs into the format we want | ||
| const textParagraphs = splitDocuments.join('\n\n'); | ||
|
|
||
| logger.log("info", null, "Successfully used langchain to extract content"); | ||
|
|
||
| return textParagraphs; | ||
| }; | ||
|
|
||
| /** | ||
| * Custom PPTXLoader class to handle pptx files. Implementation adapted | ||
| * from langchain's PPTXLoader, but it uses newer version of officeparser package | ||
| * to handle pptx entirely in memory, instead of writing to a temp file in the | ||
| * current directory. | ||
| */ | ||
| class PPTXLoader extends BufferLoader { | ||
| constructor(filePathOrBlob: string | Blob) { | ||
| super(filePathOrBlob); | ||
| } | ||
|
|
||
| async parse(raw: Buffer, metadata: Record<string, any>): Promise<Document[]> { | ||
| const pptx = await parseOfficeAsync(raw, { | ||
| outputErrorToConsole: true, | ||
| }); | ||
| if (!pptx) | ||
| return []; | ||
| return [ | ||
| new Document({ | ||
| pageContent: pptx, | ||
| metadata, | ||
| }), | ||
| ]; | ||
| } | ||
| } |
9 changes: 9 additions & 0 deletions
9
extensions/aws/src/knowledge-connectors/helpers/utils/config.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| export const getMaxChunkSize = (defaultLimit: number = 2000): number => { | ||
| const parsed = parseInt(process.env.MAX_CHUNK_SIZE, 10); | ||
| return isNaN(parsed) ? defaultLimit : parsed; | ||
| }; | ||
|
|
||
| export const langchainDefaultChunkSizeInChars = (defaultLimit: number = 2000): number => { | ||
| const parsed = parseInt(process.env.LANGCHAIN_DEFAULT_CHUNK_SIZE_IN_CHARS, 10); | ||
| return isNaN(parsed) ? defaultLimit : parsed; | ||
| }; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.