Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequelize Transactions - not easy-peasy #193

Open
MaciekLeks opened this issue May 7, 2020 · 2 comments
Open

Sequelize Transactions - not easy-peasy #193

MaciekLeks opened this issue May 7, 2020 · 2 comments

Comments

@MaciekLeks
Copy link

I'm thinking over adding transaction support to my service leveraging Sequelize Db Adapter. After going through the source code of moleculer-db and moleculer-db-adapter-sequelize I came to conclusion that the only option is to add a patch to moleculer-db (for transation argument to every method). Have enyone ever wonder of that?

@sunnygaikwad65
Copy link

sunnygaikwad65 commented Jun 21, 2021

@MaciekLeks How you have added patch. Please share i want to add transaction in my moleculer services. Please provice patch code

@MaciekLeks
Copy link
Author

MaciekLeks commented Jun 21, 2021

It's really hard for me right now because I've moved from moleculerjs to totally different area.
So let me put my code here which worked for me at that time:

src/extends/TransactionalSequelizeDbAdapter.js

const SequelizeDbAdapter = require("moleculer-db-adapter-sequelize");

class TransactionalSequelizeDbAdapter extends SequelizeDbAdapter {
	constructor(...opts) {
		super(...opts);
	}

	insert(entity) {
		console.log("MLK TransactionalSequelizeDbAdapter");
		return super.insert(entity);
	}
}

module.exports = TransactionalSequelizeDbAdapter;

src/mixins/db.js

"use strict";

const DbService = require("moleculer-db");
//const SqlAdapter = require("moleculer-db-adapter-sequelize");
const TransactionalSequelizeDbAdapter = require("../extends/TransactionalSequelizeDbAdapter");
const auth = require("../../auth");
const isDocker = require("is-docker");
const { log } = require("../utils/helpers");

module.exports = {
	mixins: [DbService],
	/*adapter: new SqlAdapter(
		`postgres://${auth.pgUser}:${auth.pgPassword}@${
			isDocker() ? "postgres" : "localhost:5458"
		}/${auth.pgDB}`
	), */
	adapter: new TransactionalSequelizeDbAdapter(
		`${auth.pgDB}`,
		`${auth.pgUser}`,
		`${auth.pgPassword}`,
		{
			host: isDocker() ? "postgres" : "localhost",
			port: isDocker() ? undefined : 5458,
			dialect: "postgres",
			pool: {
				max: 5,
				min: 0,
				idle: 10000,
			},
		}
	),
	actions: {
		begin(ctx) {
			//TODO
			this.logger.info("begin - TODO - no implementation!!!");
		},

		commit(ctx) {
			//TODO
			this.logger.info("commit - TODO - no implementation!!!");
		},

		rollback(ctx) {
			//TODO
			this.logger.info("rollback - TODO - no implementation!!!");
		},

		execute: {
			params: {
				_exec: "object",
			},
			async handler(ctx) {
				const exec = ctx.params._exec;
				let res;
				try {
					const callType = exec._type;
					switch (callType) {
						case "create": {
							res = await this._create(ctx, exec._data);
							this.logger.info("db res:" + log(res));
							return res;
						}
						case "find": {
							const query = {
								limit: 1,
								query: {
									...exec._data,
								},
							};
							//this.logger.info("MY QUERY:" + log(query));
							res = await this._find(ctx, query);
							this.logger.info("db res:" + log(res));
							if (res) return res[0];
						}
					}
					// this.logger.info(
					// 	"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!DB RESULT:" + log(res)
					// );
				} catch (e) {
					this.logger.info("e: " + log(e));
				}

				return null;
			},
		},
	},
};

src/mixins/oracle.js

"use strict";

const oracledb = require("oracledb");
const { log } = require("../../src/utils/helpers");

/**
 * Service mixin to use oracledb
 *
 * TODO: stable version move out to the module moleculer-db-adapter-oracle
 *
 * @param opt.auth.oraUser
 * @param opt.auth.oraPassword
 * @param opt.auth.oraConnectionString
 * @param opt.pool use pool or singleton connection
 * @param opt.pool.min pool min
 * @param opt.pool.max pool max
 *
 * @name moleculer-db-adapter-oracle
 * @module Service
 */
module.exports = (opt) => ({
	name: "db-adapter-oracle",

	//mixins: [UUID],

	/* version: "v1", */
	meta: {
		scalable: true,
	},
	//dependencies: ["auth", "users"],

	methods: {
		async _execute(ctx) {
			const { _exec: exec, _uuid: uuid } = ctx.params;

			this.logger.info("PL/SQL call _spec:" + exec._spec);
			this.logger.info("\n\n\nUUIDs=" + this._uuids);

			const conn = opt.pool ? this._uuids[uuid] : this._conn; //always run begin() first

			//this.logger.info("CONN=" + log(conn) + " on:" + log(this._uuids));

			try {
				const result = await conn.execute(
					eval("`" + exec._spec + "`"), //TODO check if it's needed
					exec._data
				);

				console.log(result.outBinds);

				if (result.outBinds && result.outBinds[exec._out._error] == 0)
					return result.outBinds;
			} catch (e) {
				this.logger.error("e: " + log(e));
				//this._closePoolAndExit();
			}

			return null;
		},
	},

	/**
	 * Service started lifecycle event handler
	 */
	async started() {
		try {
			oracledb.autoCommit = false; //imperative way of commiting

			const attrs = {
				user: opt.auth.oraUser,
				password: opt.auth.oraPassword,
				connectString: opt.auth.oraConnectionString,
				...(opt.pool && { poolMin: opt.pool.min }),
				...(opt.pool && { poolMax: opt.pool.max }),
			};

			if (opt.pool) await oracledb.createPool(attrs);
			else this._conn = await oracledb.getConnection(attrs);
		} catch (e) {
			return Promise.reject(e);
		}
	},

	/**
	 * Service stopped lifecycle event handler
	 */
	async stopped() {
		if (opt.pool) {
			this.logger.info("this._uuids:" + log(this._uuids));
			if (this._uuids) {
				for (const [uuid, conn] of Object.keys(this._uuids)) {
					try {
						await conn.close();
					} catch (err) {
						this.logger.error(`Error while closing the conn ${uuid}`);
					}
				}
			}
			try {
				this.logger.info("Pool closing...");
				await oracledb.getPool().close(10);
				this.logger.info("Pool closed.");
			} catch (err) {
				this.logger.error(err.message);
				Promise.reject(err);
			}
		} else {
			try {
				this.logger.info("Connection closing...");
				if (this._conn) await this._conn.close();
				this.logger.info("Connection closed.");
			} catch (err) {
				this.logger.error("Error while closing this._conn: ${err}");
				Promise.reject(err);
			}
		}
	},

	actions: {
		execute(ctx) {
			return this._execute(ctx);
		},
		/**
			Begin transaction using external uuid (given by the orchestrator srv).
		 */
		async begin(ctx) {
			console.log("begin started...");
			if (opt.pool) {
				const { _uuid: uuid } = ctx.params;
				const uuids = this._uuids || (this._uuids = {}); //TODO Redis!!!
				if (!uuids[uuid]) uuids[uuid] = await oracledb.getConnection(); //start a new one only if not exists

				//console.log("begin:" + log(this._uuids));
			}
		},
		rollback(ctx) {
			this.logger.error("ROLLBAK");

			if (opt.pool) return this._uuids[ctx.params._uuid].rollback();
			else return this._conn.rollback();
		},
		commit(ctx) {
			this.logger.info("COMMIT!!!");
			if (opt.pool) return this._uuids[ctx.params._uuid].commit();
			else return this._conn.commit();
		},
	},

	events: {
		"$node.connected"({ node }) {
			this.logger.info(`Node '${node.id}' is connected!`);
		},
		"remote.order.create"(msg) {
			this.logger.info("remote.order.create msg:" + log(msg));
		},
	},
});

I don't remember anything but you probably can find here something usefull.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants