Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,6 @@ The export worker (`workers/export.js`) processes bulk email export jobs via Bul
- **Transient errors** (network timeouts, 5xx responses): Retry with exponential backoff
- **Skippable errors** (message not found, 404): Skip message, increment counter
- **Account validation**: Checks every 60s if account still exists
- **Resume capability**: Failed exports with progress can be resumed from checkpoint

**Resumability:**
An export is marked resumable when:
- Export made progress (`lastProcessedScore > 0`)
- Messages remain to process (`messagesExported < messagesQueued`)
- Account was not deleted during export

**Retry configuration:**
- IMAP messages: 3 retries with 2s base delay (exponential backoff)
- API batch requests: 5 retries for rate limits (429) with 5s base delay
Expand All @@ -317,7 +309,6 @@ An export is marked resumable when:
- `POST /v1/account/{account}/export` - Create export job
- `GET /v1/account/{account}/export/{exportId}` - Get export status
- `GET /v1/account/{account}/export/{exportId}/download` - Download completed export
- `POST /v1/account/{account}/export/{exportId}/resume` - Resume failed export
- `DELETE /v1/account/{account}/export/{exportId}` - Cancel/delete export
- `GET /v1/account/{account}/exports` - List exports with pagination

Expand Down
15 changes: 12 additions & 3 deletions lib/api-routes/account-routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async function init(args) {

logs: Joi.boolean().example(false).description('Store recent logs').default(false),

notifyFrom: accountSchemas.notifyFrom.default(null),
notifyFrom: accountSchemas.notifyFrom.default('now'),
syncFrom: accountSchemas.syncFrom.default(null),

proxy: settingsSchema.proxyUrl,
Expand Down Expand Up @@ -630,7 +630,7 @@ async function init(args) {

payload: Joi.object({
flush: Joi.boolean().truthy('Y', 'true', '1').falsy('N', 'false', 0).default(false).description('Only flush the account if true'),
notifyFrom: accountSchemas.notifyFrom.default(null),
notifyFrom: accountSchemas.notifyFrom.default('now'),
imapIndexer: accountSchemas.imapIndexer,
syncFrom: accountSchemas.syncFrom
}).label('RequestFlush')
Expand Down Expand Up @@ -825,6 +825,16 @@ async function init(args) {
}
}

if (result.outlookSubscription) {
try {
let parsed = typeof result.outlookSubscription === 'string' ? JSON.parse(result.outlookSubscription) : result.outlookSubscription;
delete parsed.clientState;
result.outlookSubscription = parsed;
} catch (err) {
result.outlookSubscription = {};
}
}

// default false
for (let key of ['logs']) {
result[key] = !!result[key];
Expand Down Expand Up @@ -1018,7 +1028,6 @@ async function init(args) {
outlookSubscription: Joi.object({
id: Joi.string().description('Microsoft Graph subscription ID'),
expirationDateTime: Joi.date().iso().description('When the subscription expires'),
clientState: Joi.string().description('Shared secret for validating webhook notifications'),
state: Joi.object({
state: Joi.string().valid('creating', 'created', 'error').description('Subscription state'),
time: Joi.number().description('Timestamp of last state change'),
Expand Down
64 changes: 7 additions & 57 deletions lib/api-routes/export-routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async function init(args) {

response: {
schema: Joi.object({
exportId: Joi.string().example('exp_abc123def456').description('Export job identifier'),
exportId: Joi.string().example('exp_abc123def456abc123def456').description('Export job identifier'),
status: Joi.string().example('queued').description('Export status'),
created: Joi.date().iso().example('2024-01-15T10:30:00Z').description('When export was created')
}).label('CreateExportResponse'),
Expand Down Expand Up @@ -140,7 +140,8 @@ async function init(args) {
throw Boom.notFound('Export not found');
}

let stream = fs.createReadStream(fileInfo.filePath);
const fileReadStream = fs.createReadStream(fileInfo.filePath);
let stream = fileReadStream;

stream.on('error', err => {
request.logger.error({ msg: 'Export download stream error', exportId, err });
Expand All @@ -149,13 +150,15 @@ async function init(args) {
if (fileInfo.isEncrypted) {
const secret = await getSecret();
if (!secret) {
fileReadStream.destroy();
throw Boom.serverUnavailable('Encryption secret not available for decryption');
}
const decryptStream = createDecryptStream(secret);
const decryptStream = await createDecryptStream(secret);
decryptStream.on('error', err => {
request.logger.error({ msg: 'Export decryption error', exportId, err });
fileReadStream.destroy();
});
stream = stream.pipe(decryptStream);
stream = fileReadStream.pipe(decryptStream);
}

return h
Expand Down Expand Up @@ -251,59 +254,6 @@ async function init(args) {
}
});

server.route({
method: 'POST',
path: '/v1/account/{account}/export/{exportId}/resume',

async handler(request) {
try {
return await Export.resume(request.params.account, request.params.exportId);
} catch (err) {
handleError(request, err);
}
},

options: {
description: 'Resume failed export',
notes: 'Resumes a failed export from its last checkpoint. Only works for exports that are marked as resumable.',
tags: ['api', 'Export (Beta)'],

auth: {
strategy: 'api-token',
mode: 'required'
},
cors: CORS_CONFIG,

validate: {
options: {
stripUnknown: false,
abortEarly: false,
convert: true
},
failAction,

params: Joi.object({
account: accountIdSchema.required(),
exportId: exportIdSchema
}).label('ResumeExportParams')
},

response: {
schema: Joi.object({
exportId: Joi.string().example('exp_abc123def456').description('Export job identifier'),
status: Joi.string().example('queued').description('Export status'),
resumed: Joi.boolean().example(true).description('True if export was resumed'),
progress: Joi.object({
messagesExported: Joi.number().integer().example(500).description('Messages already exported'),
messagesQueued: Joi.number().integer().example(1500).description('Total messages queued'),
messagesSkipped: Joi.number().integer().example(5).description('Messages skipped')
}).label('ResumeExportProgress')
}).label('ResumeExportResponse'),
failAction: 'log'
}
}
});

server.route({
method: 'GET',
path: '/v1/account/{account}/exports',
Expand Down
2 changes: 1 addition & 1 deletion lib/email-client/base-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1984,7 +1984,7 @@ class BaseClient {
// Store message in Redis
await this.redis.hsetBuffer(`${REDIS_PREFIX}iaq:${this.account}`, queueId, msgEntry);

let queueKeep = (await settings.get('queueKeep')) || true;
let queueKeep = (await settings.get('queueKeep')) ?? true;

// Configure delivery retry settings
let defaultDeliveryAttempts = await settings.get('deliveryAttempts');
Expand Down
18 changes: 13 additions & 5 deletions lib/email-client/gmail-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const settings = require('../settings');

const { GMAIL_API_BASE, LIST_BATCH_SIZE, request: gmailApiRequest } = require('./gmail/gmail-api');

const MAX_GMAIL_BATCH_SIZE = 50;

// Labels to exclude from folder listings
const SKIP_LABELS = ['UNREAD', 'STARRED', 'IMPORTANT', 'CHAT', 'CATEGORY_PERSONAL'];

Expand Down Expand Up @@ -611,6 +613,7 @@ class GmailClient extends BaseClient {
// Add virtual "All Mail" folder for Gmail API
// This allows exporting all messages without scanning individual labels
mailboxes.unshift({
id: 'virtual_all',
path: '\\All',
delimiter: '/',
parentPath: '',
Expand Down Expand Up @@ -1361,15 +1364,19 @@ class GmailClient extends BaseClient {
await this.prepare();

// Pre-fetch labels to resolve label IDs to names
const labelsResult = await this.getLabels();
const labelMap = new Map();
for (const label of labelsResult || []) {
labelMap.set(label.id, label.name);
try {
const labelsResult = await this.getLabels();
for (const label of labelsResult || []) {
labelMap.set(label.id, label.name);
}
} catch (err) {
this.logger.warn({ msg: 'Failed to fetch labels for export, using raw label IDs', account: this.account, err });
}

const results = [];
const settingsBatchSize = await settings.get('gmailExportBatchSize');
const batchSize = settingsBatchSize || DEFAULT_GMAIL_EXPORT_BATCH_SIZE;
const batchSize = Math.min(settingsBatchSize || DEFAULT_GMAIL_EXPORT_BATCH_SIZE, MAX_GMAIL_BATCH_SIZE);

for (let i = 0; i < emailIds.length; i += batchSize) {
const batch = emailIds.slice(i, i + batchSize);
Expand Down Expand Up @@ -1696,7 +1703,8 @@ class GmailClient extends BaseClient {
await this.redis
.multi()
.hset(data.feedbackKey, 'success', 'true')
.expire(1 * 60 * 60);
.expire(data.feedbackKey, 1 * 60 * 60)
.exec();
}

return {
Expand Down
2 changes: 1 addition & 1 deletion lib/email-client/notification-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class NotificationHandler {
}

// Get queue retention setting
const queueKeep = (await settings.get('queueKeep')) || true;
const queueKeep = (await settings.get('queueKeep')) ?? true;

// Process notification based on required destinations
if (!skipWebhook && addDocumentQueueJob) {
Expand Down
24 changes: 20 additions & 4 deletions lib/email-client/outlook-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ class OutlookClient extends BaseClient {
// Add virtual "All Mail" folder for Outlook API
// This allows exporting all messages without scanning individual folders
mailboxes.unshift({
id: 'virtual_all',
path: '\\All',
delimiter: '/',
parentPath: '',
Expand Down Expand Up @@ -1684,7 +1685,7 @@ class OutlookClient extends BaseClient {
const request = {
id: reqId,
method: 'GET',
url: `/v1.0/${this.oauth2UserPath}/messages/${emailId}?$select=${selectFields}&$expand=${expandFields}`
url: `/${this.oauth2UserPath}/messages/${emailId}?$select=${selectFields}&$expand=${expandFields}`
};

// Add Prefer header for body content type if specified
Expand All @@ -1705,7 +1706,7 @@ class OutlookClient extends BaseClient {

for (let attempt = 0; attempt <= BATCH_MAX_RETRIES; attempt++) {
try {
responseData = await this.requestWithRetry('/$batch', 'post', { requests });
responseData = await this.request('/$batch', 'post', { requests });
batchSuccess = true;
break;
} catch (err) {
Expand Down Expand Up @@ -1741,13 +1742,16 @@ class OutlookClient extends BaseClient {
continue;
}

// Process batch responses - match by request ID
// Process batch responses - match by request ID and track which were received
const respondedIds = new Set();
for (const response of responseData?.responses || []) {
const emailId = messageMap.get(response.id);
if (!emailId) {
continue;
}

respondedIds.add(response.id);

if (response.status >= 200 && response.status < 300 && response.body) {
try {
const messageData = response.body;
Expand Down Expand Up @@ -1820,6 +1824,17 @@ class OutlookClient extends BaseClient {
});
}
}

// Reconcile: detect messages that got no response from the batch API
for (const [reqId, emailId] of messageMap) {
if (!respondedIds.has(reqId)) {
results.push({
messageId: emailId,
data: null,
error: { message: 'No response received from batch API', code: 'EMISSING_RESPONSE' }
});
}
}
}

return results;
Expand Down Expand Up @@ -2342,7 +2357,8 @@ class OutlookClient extends BaseClient {
await this.redis
.multi()
.hset(data.feedbackKey, 'success', 'true')
.expire(1 * 60 * 60);
.expire(data.feedbackKey, 1 * 60 * 60)
.exec();
}

return {
Expand Down
Loading
Loading