Skip to content

Commit

Permalink
fix(db/indexSync.js): update import paths for Conversation and Messag…
Browse files Browse the repository at this point in the history
…e models

feat(db/indexSync.js): add synchronization logic between MongoDB collection and MeiliSearch index
fix(models/plugins/mongoMeili.js): update createMeiliMongooseModel function to remove unused parameters and add documentation for syncWithMeili method
  • Loading branch information
danny-avila committed Jul 28, 2023
1 parent ae51e61 commit f88a068
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 27 deletions.
9 changes: 4 additions & 5 deletions api/lib/db/indexSync.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const mongoose = require('mongoose');
const Conversation = mongoose.models.Conversation;
const Message = mongoose.models.Message;
const Conversation = require('../../models/schema/convoSchema');
const Message = require('../../models/schema/messageSchema');
const { MeiliSearch } = require('meilisearch');
let currentTimeout = null;

Expand Down Expand Up @@ -37,12 +36,12 @@ async function indexSync(req, res, next) {

if (messageCount !== messagesIndexed) {
console.log('Messages out of sync, indexing');
await Message.syncWithMeili();
Message.syncWithMeili();
}

if (convoCount !== convosIndexed) {
console.log('Convos out of sync, indexing');
await Conversation.syncWithMeili();
Conversation.syncWithMeili();
}
} catch (err) {
// console.log('in index sync');
Expand Down
124 changes: 102 additions & 22 deletions api/models/plugins/mongoMeili.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,113 @@ const validateOptions = function (options) {
});
};

const createMeiliMongooseModel = function ({ index, indexName, client, attributesToIndex }) {
// console.log('attributesToIndex', attributesToIndex);
// const createMeiliMongooseModel = function ({ index, indexName, client, attributesToIndex }) {
const createMeiliMongooseModel = function ({ index, attributesToIndex }) {
const primaryKey = attributesToIndex[0];
// MeiliMongooseModel is of type Mongoose.Model
class MeiliMongooseModel {
// Clear Meili index
static async clearMeiliIndex() {
await index.delete();
// await index.deleteAllDocuments();
await this.collection.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } });
}

static async resetIndex() {
await this.clearMeiliIndex();
await client.createIndex(indexName, { primaryKey });
}
// Clear Meili index
// Push a mongoDB collection to Meili index
/**
* `syncWithMeili`: synchronizes the data between a MongoDB collection and a MeiliSearch index,
* only triggered if there's ever a discrepancy determined by `api\lib\db\indexSync.js`.
*
* 1. Fetches all documents from the MongoDB collection and the MeiliSearch index.
* 2. Compares the documents from both sources.
* 3. If a document exists in MeiliSearch but not in MongoDB, it's deleted from MeiliSearch.
* 4. If a document exists in MongoDB but not in MeiliSearch, it's added to MeiliSearch.
* 5. If a document exists in both but has different `text` or `title` fields (depending on the `primaryKey`), it's updated in MeiliSearch.
* 6. After all operations, it updates the `_meiliIndex` field in MongoDB to indicate whether the document is indexed in MeiliSearch.
*
* Note: This strategy does not use batch operations for Meilisearch as the `index.addDocuments` will discard
* the entire batch if there's an error with one document, and will not throw an error if there's an issue.
* Also, `index.getDocuments` needs an exact limit on the amount of documents to return, so we build the map in batches.
*
* @returns {Promise} A promise that resolves when the synchronization is complete.
*
* @throws {Error} Throws an error if there's an issue with adding a document to MeiliSearch.
*/
static async syncWithMeili() {
await this.resetIndex();
const docs = await this.find({ _meiliIndex: { $in: [null, false] } });
console.log('docs', docs.length);
const objs = docs.map((doc) => doc.preprocessObjectForIndex());
try {
await index.addDocuments(objs);
const ids = docs.map((doc) => doc._id);
await this.collection.updateMany({ _id: { $in: ids } }, { $set: { _meiliIndex: true } });
let moreDocuments = true;
const mongoDocuments = await this.find().lean();
const format = (doc) => _.pick(doc, attributesToIndex);

// Prepare for comparison
const mongoMap = new Map(mongoDocuments.map((doc) => [doc[primaryKey], format(doc)]));
const indexMap = new Map();
let offset = 0;
const batchSize = 1000;

while (moreDocuments) {
const batch = await index.getDocuments({ limit: batchSize, offset });

if (batch.results.length === 0) {
moreDocuments = false;
}

for (const doc of batch.results) {
indexMap.set(doc[primaryKey], format(doc));
}

offset += batchSize;
}

console.log('indexMap', indexMap.size);
console.log('mongoMap', mongoMap.size);

const updateOps = [];

// Iterate over Meili index documents
for (const [id, doc] of indexMap) {
const update = {};
update[primaryKey] = id;
if (mongoMap.has(id)) {
// Case: Update
// If document also exists in MongoDB, would be update case
if (
(doc.text && doc.text !== mongoMap.get(id).text) ||
(doc.title && doc.title !== mongoMap.get(id).title)
) {
console.log(`${id} had document discrepancy in ${doc.text ? 'text' : 'title'} field`);
updateOps.push({
updateOne: { filter: update, update: { $set: { _meiliIndex: true } } },
});
await index.addDocuments([doc]);
}
} else {
// Case: Delete
// If document does not exist in MongoDB, its a delete case from meili index
await index.deleteDocument(id);
updateOps.push({
updateOne: { filter: update, update: { $set: { _meiliIndex: false } } },
});
}
}

// Iterate over MongoDB documents
for (const [id, doc] of mongoMap) {
const update = {};
update[primaryKey] = id;
// Case: Insert
// If document does not exist in Meili Index, Its an insert case
if (!indexMap.has(id)) {
console.log(`${id} is not indexed`);
await index.addDocuments([doc]);
updateOps.push({
updateOne: { filter: update, update: { $set: { _meiliIndex: true } } },
});
} else if (doc._meiliIndex === false) {
updateOps.push({
updateOne: { filter: update, update: { $set: { _meiliIndex: true } } },
});
}
}

if (updateOps.length > 0) {
await this.collection.bulkWrite(updateOps);
console.log(
`Finished indexing ${primaryKey === 'messageId' ? 'messages' : 'conversations'}`,
);
}
} catch (error) {
console.log('Error adding document to Meili');
console.error(error);
Expand Down

0 comments on commit f88a068

Please sign in to comment.