Skip to content
6 changes: 6 additions & 0 deletions lib/email-client/base-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ class BaseClient {
return `${REDIS_PREFIX}iar:s:${this.account}`;
}

getThreadRootsKey() {
return `${REDIS_PREFIX}iar:tr:${this.account}`;
}

getLogKey() {
// this format ensures that the key is deleted when user is removed
return `${REDIS_PREFIX}iam:${this.account}:g`;
Expand Down Expand Up @@ -3215,6 +3219,8 @@ class BaseClient {
networkRouting
});

await this.redis.sadd(this.getThreadRootsKey(), info.messageId);

// clean up possible cached SMTP error
try {
await this.redis.hset(
Expand Down
3 changes: 3 additions & 0 deletions lib/email-client/gmail-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,9 @@ class GmailClient extends BaseClient {
envelope
});

// Track thread root for filtering incoming replies
await this.redis.sadd(this.getThreadRootsKey(), gmailMessageId);

// Update feedback key if provided
if (data.feedbackKey) {
await this.redis
Expand Down
230 changes: 201 additions & 29 deletions lib/email-client/imap/mailbox.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,134 @@ class Mailbox {
this.syncing = false; // Whether currently syncing
}

/**
* Extracts the thread root (first message ID) from References or In-Reply-To headers
* @param {Object} headers - Message headers object
* @returns {String|null} Thread root message ID or null if not found
*/
extractThreadRoot(headers) {
if (!headers) return null;

// Try References header first (contains full thread chain)
let referencesHeader = headers.references;
let referencesString = null;

if (referencesHeader) {
if (Array.isArray(referencesHeader) && referencesHeader.length > 0) {
// References header format: ["<msg1> <msg2> <msg3>"]
// The first element contains the entire references string, need to parse it
referencesString = referencesHeader[0];
} else if (typeof referencesHeader === 'string') {
// References header is already a string from our Buffer parsing
referencesString = referencesHeader;
}

if (referencesString && typeof referencesString === 'string') {
// Split on spaces to get individual message IDs
let messageIds = referencesString.trim().split(/\s+/);
if (messageIds.length > 0) {
// Take the first (oldest) message ID as thread root
let firstRef = messageIds[0].trim();
if (firstRef) {
// Clean up the message ID and preserve original case per RFC 5322
if (!firstRef.startsWith('<')) firstRef = '<' + firstRef;
if (!firstRef.endsWith('>')) firstRef = firstRef + '>';
return firstRef;
}
}
}
}

// Fall back to In-Reply-To header (direct parent)
let inReplyTo = headers['in-reply-to'];
let inReplyToValue = null;

if (inReplyTo) {
if (Array.isArray(inReplyTo) && inReplyTo.length > 0) {
inReplyToValue = inReplyTo[0];
} else if (typeof inReplyTo === 'string') {
inReplyToValue = inReplyTo;
}

if (inReplyToValue && typeof inReplyToValue === 'string') {
// Clean up the message ID and preserve original case per RFC 5322
let cleanRef = inReplyToValue.trim();
if (!cleanRef.startsWith('<')) cleanRef = '<' + cleanRef;
if (!cleanRef.endsWith('>')) cleanRef = cleanRef + '>';
return cleanRef;
}
}

return null;
}

/**
* Checks if a message should be processed based on fetched headers (optimized for early filtering)
* Only processes messages that are part of threads initiated by this platform
* @param {Object} headers - Fetched headers object from IMAP
* @param {String} emailId - Message emailId if available
* @returns {Promise<Boolean>} True if message should be processed
*/
async shouldProcessMessageWithFetchedHeaders(headers, emailId) {
if (!headers) {
return false; // No headers, skip
}

// Parse headers if they are returned as Buffer (when using array of header names)
let parsedHeaders = headers;
if (Buffer.isBuffer(headers)) {
const headerText = headers.toString('utf8');

parsedHeaders = {};
const lines = headerText.split(/\r?\n/);
let currentHeader = null;

for (let line of lines) {
line = line.trim();
if (!line) continue; // Skip empty lines

// Check if this is a new header line
const headerMatch = line.match(/^([^:]+):\s*(.*)$/);
if (headerMatch) {
// New header line
currentHeader = headerMatch[1].toLowerCase().trim();
parsedHeaders[currentHeader] = headerMatch[2].trim();
} else if (currentHeader && line.match(/^\s/)) {
// Continuation of previous header (folded header)
parsedHeaders[currentHeader] += ' ' + line.trim();
}
}
}

// Extract message-id from headers
let messageId = null;
if (parsedHeaders['message-id']) {
messageId = Array.isArray(parsedHeaders['message-id']) ? parsedHeaders['message-id'][0] : parsedHeaders['message-id'];
if (messageId) {
// Keep the <> brackets since they're stored with brackets in Redis
// Preserve original case per RFC 5322
if (!messageId.startsWith('<')) messageId = '<' + messageId;
if (!messageId.endsWith('>')) messageId = messageId + '>';
}
}
// Extract thread root from message headers using the same logic as extractThreadRoot
let threadRoot = this.extractThreadRoot(parsedHeaders);

const threadRootsKey = this.connection.getThreadRootsKey();

if (threadRoot) {
// This is a reply - check if thread root is tracked
const isTrackedThread = await this.connection.redis.sismember(threadRootsKey, threadRoot);
return isTrackedThread;
} else if (messageId) {
// No references found - this might be an originating message
// Check if this message's ID is in our tracked roots
const isOurMessage = await this.connection.redis.sismember(threadRootsKey, messageId);
return isOurMessage;
}
return false; // No thread root and no messageId match, skip
}

/**
* Gets current mailbox status from IMAP connection
* @param {Object} connectionClient - IMAP client to use (defaults to main connection)
Expand Down Expand Up @@ -730,6 +858,10 @@ class Mailbox {
this.logger.debug({ msg: 'Missing message', status: 'found', uid: messageData.uid, missingRetries, missingDelay });
}


// Note: Message filtering is now handled earlier in runFastSync to avoid expensive full message fetches
// for messages that would be filtered out anyway

// Filter headers based on what was requested
if (options.headers && Array.isArray(requestedHeaders)) {
let filteredHeaders = {};
Expand Down Expand Up @@ -763,6 +895,7 @@ class Mailbox {
return;
}


let bounceNotifyInfo;
let complaintNotifyInfo;
let content;
Expand Down Expand Up @@ -1824,7 +1957,15 @@ class Mailbox {

if (knownUidNext && mailboxStatus.messages) {
// detected new emails
let fields = { uid: true, flags: true, modseq: true, emailId: true, labels: true, internalDate: true };
let fields = {
uid: true,
flags: true,
modseq: true,
emailId: true,
labels: true,
internalDate: true,
headers: ['message-id', 'references', 'in-reply-to'] // Fetch specific headers for filtering
};

let imapClient = this.connection.imapClient;

Expand Down Expand Up @@ -1876,22 +2017,34 @@ class Mailbox {
for (let messageData of messages) {
// Update uidNext if this is a new message
let updated = await this.connection.redis.hUpdateBigger(this.getMailboxKey(), 'uidNext', messageData.uid + 1, messageData.uid + 1);

if (updated) {
// new email! Queue for processing
await this.connection.redis.zadd(
this.getNotificationsKey(),
messageData.uid,
JSON.stringify({
// Check if this message should be processed based on thread roots
// Only process messages that are part of threads initiated by this platform
let shouldProcessMessage = await this.shouldProcessMessageWithFetchedHeaders(messageData.headers, messageData.emailId);

if (shouldProcessMessage) {
// new email that should be processed! Queue for processing
await this.connection.redis.zadd(
this.getNotificationsKey(),
messageData.uid,
JSON.stringify({
uid: messageData.uid,
flags: messageData.flags,
internalDate:
(messageData.internalDate &&
typeof messageData.internalDate.toISOString === 'function' &&
messageData.internalDate.toISOString()) ||
null
})
);
} else {
// Skip this message - not part of our tracked threads
this.logger.debug({
msg: 'Skipping message in runFastSync - not part of tracked threads',
uid: messageData.uid,
flags: messageData.flags,
internalDate:
(messageData.internalDate &&
typeof messageData.internalDate.toISOString === 'function' &&
messageData.internalDate.toISOString()) ||
null
})
);
account: this.connection.account
});
}
}
}

Expand Down Expand Up @@ -2094,7 +2247,15 @@ class Mailbox {
* Most thorough but slowest sync method
*/
async runFullSync() {
let fields = { uid: true, flags: true, modseq: true, emailId: true, labels: true, internalDate: true };
let fields = {
uid: true,
flags: true,
modseq: true,
emailId: true,
labels: true,
internalDate: true,
headers: ['message-id', 'references', 'in-reply-to'] // Fetch specific headers for filtering
};
let opts = {};

let lock = await this.getMailboxLock(null, { description: 'Full sync' });
Expand Down Expand Up @@ -2174,22 +2335,33 @@ class Mailbox {

let storedMessage = await this.entryListGet(messageData.uid, { uid: true });
if (!storedMessage) {
// New message
// New message - check if it should be processed based on thread roots
let shouldProcessMessage = await this.shouldProcessMessageWithFetchedHeaders(messageData.headers, messageData.emailId);

let seq = await this.entryListSet(messageData);
if (seq) {
await this.connection.redis.zadd(
this.getNotificationsKey(),
messageData.uid,
JSON.stringify({
if (shouldProcessMessage) {
await this.connection.redis.zadd(
this.getNotificationsKey(),
messageData.uid,
JSON.stringify({
uid: messageData.uid,
flags: messageData.flags,
internalDate:
(messageData.internalDate &&
typeof messageData.internalDate.toISOString === 'function' &&
messageData.internalDate.toISOString()) ||
null
})
);
} else {
// Skip this message - not part of our tracked threads
this.logger.debug({
msg: 'Skipping message in runFullSync - not part of tracked threads',
uid: messageData.uid,
flags: messageData.flags,
internalDate:
(messageData.internalDate &&
typeof messageData.internalDate.toISOString === 'function' &&
messageData.internalDate.toISOString()) ||
null
})
);
account: this.connection.account
});
}
}
} else {
// Check for deleted messages between stored and current sequence
Expand Down
3 changes: 3 additions & 0 deletions lib/email-client/outlook-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,9 @@ class OutlookClient extends BaseClient {
envelope
});

// Track thread root for filtering incoming replies
await this.redis.sadd(this.getThreadRootsKey(), messageId);

// Mark as successful in Redis for webhook feedback
if (data.feedbackKey) {
await this.redis
Expand Down
Loading
Loading