Skip to content

Commit

Permalink
feat(streams): parse server plugin for moralis streams
Browse files Browse the repository at this point in the history
  • Loading branch information
sogunshola committed Oct 28, 2022
1 parent c7ce69d commit 07c8ca6
Show file tree
Hide file tree
Showing 51 changed files with 1,937 additions and 19 deletions.
7 changes: 7 additions & 0 deletions .changeset/cuddly-schools-swim.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'demo-parse-server-migration': minor
'@moralisweb3/streams': minor
'@moralisweb3/parse-server': minor
---

Incuded parse server package and added document b
7 changes: 7 additions & 0 deletions .changeset/red-trees-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'demo-parse-server-migration': minor
'@moralisweb3/streams': minor
'@moralisweb3/parse-server': minor
---

Included parse server package and added document builder logic to streams package
3 changes: 2 additions & 1 deletion demos/parse-server-migration/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
"main": "dist/index.js",
"private": true,
"dependencies": {
"@moralisweb3/parse-server": "2.6.7",
"@codemirror/language": "^0.20.0",
"@moralisweb3/core": "^2.2.0",
"@moralisweb3/core": "^2.6.7",
"@types/node": "^18.7.15",
"dotenv": "^16.0.1",
"envalid": "7.3.1",
Expand Down
10 changes: 9 additions & 1 deletion demos/parse-server-migration/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { parseServer } from './parseServer';
// @ts-ignore
import ParseServer from 'parse-server';
import http from 'http';
import { streamsSync } from '@moralisweb3/parse-server';

export const app = express();

Expand All @@ -18,7 +19,14 @@ app.use(express.json());

app.use(cors());

app.use(`/server`, parseServer);
app.use(
streamsSync(parseServer, {
apiKey: config.MORALIS_API_KEY,
webhookUrl: '/streams',
}),
);

app.use(`/server`, parseServer.app);

const httpServer = http.createServer(app);
httpServer.listen(config.PORT, () => {
Expand Down
2 changes: 1 addition & 1 deletion demos/parse-server-migration/src/parseServer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// @ts-ignore
import { ParseServer } from 'parse-server';
import ParseServer from 'parse-server';
import config from './config';
import MoralisEthAdapter from './auth/MoralisEthAdapter';

Expand Down
1 change: 1 addition & 0 deletions packages/common/core/src/version.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const LIB_VERSION = '2.6.7';
5 changes: 5 additions & 0 deletions packages/parseServer/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = {
extends: ['@moralisweb3'],
plugins: ['jest'],
ignorePatterns: ['**/lib/**/*', '**/*.test.ts', '**/dist/**/*', '**/build/**/*', '**/generated/**/*'],
};
106 changes: 106 additions & 0 deletions packages/parseServer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# @moralisweb3/parse-server

# Parse Server Moralis Streams

This Plugin adapts parse-server to support [streams](https://github.com/MoralisWeb3/streams-beta)

# Usage

Since parse server is runs on express, this plugin is a middleware that can be added to the express app.

## Installations

First add parse-server to your express app:

```bash
yarn add parse-server
```

Then add moralis parse server plugin:

```bash
yarn add @moralisweb3/parse-server
```

## Setup parse server

Initialize parse server in your express app:

```javascript
import ParseServer from 'parse-server';
import config from './config';

export const parseServer = new ParseServer({
databaseURI: config.DATABASE_URI,
cloud: config.CLOUD_PATH,
appId: config.APPLICATION_ID,
masterKey: config.MASTER_KEY,
serverURL: config.SERVER_URL,
});
```

## Setup moralis parse server plugin

Then add the plugin to your express app:

```typescript
import { initializeStreams } from '@moralisweb3/parse-server';

```

the initializeStreams function takes the following options:
- the parse server instance
- Other options

```typescript
interface StreamOptions {
webhookUrl?: string;
apiKey: string;
}
```

- `apiKey`: Your Moralis API key
- `webhookUrl` - the url of choice to receive the stream data (optional). default path is `/streams`


## Putting all together

```typescript
import Moralis from 'moralis';
import express from 'express';
import config from './config';
import { streamsSync } from '@moralisweb3/parse-server';

const expressApp = express();

Moralis.start({
apiKey: config.MORALIS_API_KEY,
});

expressApp.use(express.urlencoded({ extended: true }));
expressApp.use(express.json());

expressApp.use(cors());

expressApp.use(
streamsSync(parseServer, {
apiKey: config.MORALIS_API_KEY,
webhookUrl: '/streams-webhook',
}),
);

expressApp.use(`/${config.SERVER_ENDPOINT}`, parseServer.app);
expressApp.use(errorHandler);

app.listen(config.PORT, () => {
console.log(`${config.APP_NAME} is running on port ${config.PORT}`);
});
```

The endpoint to receive webhooks is `YOUR_EXPRESSAPP_URL/SET_WEBHOOKURL`. This is the URL that you should use when setting up a stream.

# Done!

After you have configured the plugin and created a stream you can see the data in the dashboard. Note that the stream tag will be concatenated with `Txs` and `Logs` meaning if you have a tag called "MyStream" you will have two collections in DB called "MyStreamTxs" and "MyStreamLogs", which will contain the transactions and logs respectively.

Full example can be found [here](https://github.com/MoralisWeb3/Moralis-JS-SDK/tree/main/demos/parse-server-migration)
4 changes: 4 additions & 0 deletions packages/parseServer/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/* eslint-disable global-require */
module.exports = {
...require('../../jest.config'),
};
32 changes: 32 additions & 0 deletions packages/parseServer/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"name": "@moralisweb3/parse-server",
"author": "Moralis",
"version": "2.6.7",
"license": "MIT",
"private": false,
"main": "./lib/index.js",
"types": "./lib/index.d.ts",
"sideEffects": false,
"files": [
"lib/*"
],
"scripts": {
"lint": "eslint . --ext .js,.ts,.tsx,jsx",
"clean": "rm -rf lib && rm -rf tsconfig.tsbuildinfo && rm -rf ./node_modules/.cache/nx",
"build": "tsc",
"dev": "tsc --watch"
},
"devDependencies": {
"@types/parse": "^2.18.18",
"@types/express": "4.17.14",
"prettier": "^2.5.1",
"typescript": "^4.5.5"
},
"dependencies": {
"@moralisweb3/streams": "^2.6.7",
"moralis": "^2.6.7",
"express": "^4.18.1",
"parse": "3.4.4",
"body-parser": "^1.20.0"
}
}
1 change: 1 addition & 0 deletions packages/parseServer/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './streams';
1 change: 1 addition & 0 deletions packages/parseServer/src/streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './processor';
19 changes: 19 additions & 0 deletions packages/parseServer/src/streams/processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { webhookRouter } from './webbhook';
import { MoralisCore } from 'moralis/core';
import { MoralisStreams } from '@moralisweb3/streams';
import { MoralisApiUtils } from '@moralisweb3/api-utils';

interface StreamOptions {
webhookUrl?: string;
apiKey: string;
}

export const streamsSync = (parseInstance: any, options: StreamOptions) => {
const core = MoralisCore.create();
const streams = MoralisStreams.create(core);
const apiUtils = MoralisApiUtils.create(core);
core.registerModules([streams, apiUtils]);
core.start({ apiKey: options.apiKey });
return webhookRouter(parseInstance, options?.webhookUrl || '/streams-webhook', streams);
};
55 changes: 55 additions & 0 deletions packages/parseServer/src/streams/upsert.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { Document, Update } from '@moralisweb3/streams';
import Parse from 'parse/node';

export class Upsert {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
constructor(private parseServer: any) {
Parse.initialize(this.parseServer.config.appId);
Parse.serverURL = this.parseServer.config.serverURL;
Parse.masterKey = this.parseServer.config.masterKey;
}

async execute(path: string, filter: Record<string, unknown>, update: Update) {
return this.upsert(update.collectionName + path, filter, update.document);
}

private async upsert(className: string, filter: Record<string, unknown>, update: Document) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const results = await this.lazyUpsert(className, filter, update as any);
await Parse.Object.saveAll(results, { useMasterKey: true });
}

private async lazyUpsert(className: string, filter: Record<string, unknown>, update: Record<string, unknown>) {
delete update.id;
const query = new Parse.Query(className);

for (const key in filter) {
if (Object.prototype.hasOwnProperty.call(filter, key)) {
query.equalTo(key, filter[key]);
}
}

const results = await query.find({ useMasterKey: true });

if (results.length > 0) {
for (let i = 0; i < results.length; i++) {
for (const updateKey in update) {
if (Object.prototype.hasOwnProperty.call(update, updateKey)) {
results[i].set(updateKey, update[updateKey]);
}
}
}

return results;
}

const objectClass = Parse.Object.extend(className);
const object = new objectClass();
// eslint-disable-next-line guard-for-in
for (const updateKey in update) {
object.set(updateKey, update[updateKey]);
}

return [object];
}
}
99 changes: 99 additions & 0 deletions packages/parseServer/src/streams/webbhook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { IWebhook } from '@moralisweb3/streams-typings';
import express, { Request } from 'express';
import MoralisStreams, {
LogsProcessor,
TxsProcessor,
CollectionNameBuilder,
InternalTxsProcessor,
Update,
} from '@moralisweb3/streams';
import bodyParser from 'body-parser';
import { Upsert } from './upsert';

export const tagsMap = new Map();

const collectionNameBuilder = new CollectionNameBuilder();
const logsProcessor = new LogsProcessor(collectionNameBuilder);
const txsProcessor = new TxsProcessor(collectionNameBuilder);
const internalTxProcessor = new InternalTxsProcessor(collectionNameBuilder);

const verifySignature = (req: Request, streams: MoralisStreams) => {
const providedSignature = req.headers['x-signature'];
if (!providedSignature) {
throw new Error('Signature not provided');
}
streams.verifySignature({
body: req.body,
signature: providedSignature as string,
});
};

export const webhookRouter = (parseObject: any, webhookUrl: string, streams: MoralisStreams) => {
return express.Router().post(webhookUrl, bodyParser.json({ limit: '50mb' }), async (req, res) => {
try {
verifySignature(req, streams);
} catch (e) {
return res.status(401).json({ message: e.message });
}
try {
const updates: Record<string, any> = {};
const batch = req.body as IWebhook;

const logUpdates = logsProcessor.process(batch);
const txUpdates = txsProcessor.process(batch);
const internalTxUpdates = internalTxProcessor.process(batch);

// Prepare updates
if (!updates['Logs']) {
updates['Logs'] = [];
}
updates['Logs'].push(prepareUpdate(logUpdates, ['logIndex', 'transactionHash']));

if (!updates['Txs']) {
updates['Txs'] = [];
}
updates['Txs'].push(prepareUpdate(txUpdates, ['transactionIndex']));

if (!updates['TxsInternal']) {
updates['TxsInternal'] = [];
}
updates['TxsInternal'].push(prepareUpdate(internalTxUpdates, ['hash']));

const results: unknown[] = [];
const upsert = new Upsert(parseObject);
// eslint-disable-next-line guard-for-in
for (const tableName in updates) {
for (let index = 0; index < updates[tableName].length; index++) {
const data = updates[tableName][index];
data.forEach(({ filter, update }: any) => {
results.push(upsert.execute(tableName, filter, update));
});
}
}
await Promise.all(results);
} catch (e) {
// eslint-disable-next-line no-console
console.log('error while inserting logs', e.message);
return res.status(500).json({ message: 'error while inserting logs' });
}

return res.status(200).json({ message: 'ok' });
});
};

const prepareUpdate = (updates: Update[], filters: string[]) => {
const results: unknown[] = [];
for (const update of updates) {
results.push({
filter: filters.reduce((acc: Record<string, any>, filter: string) => {
// @ts-ignore
acc[filter] = update.document[filter];
return acc;
}, {}),
update,
upsert: true,
});
}
return results;
};
Loading

0 comments on commit 07c8ca6

Please sign in to comment.