Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-factor outbound functions to use less parameters #187

Merged
merged 1 commit into from
Apr 25, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions TODO
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,12 @@
how outbound.js does that as it is reading from disk.
4. Somehow work into all that ability to re-write the email after
adding banners etc.

Outbound improvements
- Add UUID to outbound log output
- Fix --qlist and --qstat (currently broken)
- Add the ability to force a run on a specific queue file or destination domain
- Make retry times configurable (handle RFC requirements for 5 days and DSN queued warnings)
- Limit concurrency by domain
- Disable deliveries for a domain
- Pool connections by domain/MX
69 changes: 37 additions & 32 deletions outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,21 +210,20 @@ exports.send_trans_email = function (transaction, next) {
}

for (var dom in recips) {
var from = transaction.mail_from;
var data_lines = transaction.data_lines;
this.process_domain(dom, recips[dom], from, data_lines, hmails, transaction.notes, mynext);
var todo = new TODOItem(dom, recips[dom], transaction);
this.process_domain(todo, hmails, mynext);
}
}

exports.process_domain = function (dom, recips, from, data_lines, hmails, notes, cb) {
exports.process_domain = function (todo, hmails, cb) {
var plugin = this;
this.loginfo("Processing domain: " + dom);
this.loginfo("Processing domain: " + todo.domain);
var fname = _fname();
var tmp_path = path.join(queue_dir, '.' + fname);
var ws = fs.createWriteStream(tmp_path, { flags: WRITE_EXCL });
var data_pos = 0;
var write_more = function () {
if (data_pos === data_lines.length) {
if (data_pos === todo.data_lines.length) {
ws.on('close', function () {
var dest_path = path.join(queue_dir, fname);
fs.rename(tmp_path, dest_path, function (err) {
Expand All @@ -233,7 +232,7 @@ exports.process_domain = function (dom, recips, from, data_lines, hmails, notes,
cb(tmp_path, DENY, "Queue error");
}
else {
hmails.push(new HMailItem (fname, dest_path, notes));
hmails.push(new HMailItem (fname, dest_path, todo.notes));
cb(tmp_path, OK, "Queued!");
}
});
Expand All @@ -244,7 +243,7 @@ exports.process_domain = function (dom, recips, from, data_lines, hmails, notes,

// write, but fixup "." at the beginning of the line to be ".."
// and fixup \n to be \r\n
if (ws.write(data_lines[data_pos++].replace(/^\./m, '..').replace(/\r?\n/g, "\r\n"))) {
if (ws.write(todo.data_lines[data_pos++].replace(/^\./m, '..').replace(/\r?\n/g, "\r\n"))) {
write_more();
}
};
Expand All @@ -257,19 +256,21 @@ exports.process_domain = function (dom, recips, from, data_lines, hmails, notes,

ws.on('drain', write_more);

plugin.build_todo(dom, recips, from, notes, ws, write_more);
plugin.build_todo(todo, ws, write_more);
}

exports.build_todo = function (dom, recips, from, notes, ws, write_more) {
var todo_str = JSON.stringify(
{
domain: dom,
mail_from: from,
rcpt_to: recips,
notes: notes
exports.build_todo = function (todo, ws, write_more) {
// Replacer function to exclude items from the queue file header
function exclude_from_json(key, value) {
switch (key) {
case 'data_lines':
return undefined;
default:
return value;
}
);

}
var todo_str = JSON.stringify(todo, exclude_from_json);

// since JS has no pack() we have to manually write the bytes of a long
var todo_length = new Buffer(4);
var todo_l = todo_str.length;
Expand Down Expand Up @@ -333,7 +334,8 @@ exports.split_to_new_recipients = function (hmail, recipients) {

ws.on('drain', write_more);

plugin.build_todo(hmail.todo.domain, recipients, hmail.todo.mail_from, hmail.todo.notes, ws, write_more);
hmail.todo.rcpt_to = recipients;
plugin.build_todo(hmail.todo, ws, write_more);
}

exports._load_cur_queue = function (cb_name) {
Expand Down Expand Up @@ -408,7 +410,16 @@ exports.stats = function () {
return results;
}


// TODOItem - queue file header data
function TODOItem (domain, recipients, transaction) {
this.domain = domain;
this.rcpt_to = recipients;
this.mail_from = transaction.mail_from;
this.data_lines = transaction.data_lines;
this.notes = transaction.notes;
this.uuid = transaction.uuid;
return this;
}

/////////////////////////////////////////////////////////////////////////////
// HMailItem - encapsulates an individual outbound mail item
Expand Down Expand Up @@ -964,24 +975,18 @@ HMailItem.prototype.bounce_respond = function (retval, msg) {

var from = new Address ('<>');
var recip = new Address (this.todo.mail_from.user, this.todo.mail_from.host);
var dom = recip.host;
populate_bounce_message(from, recip, err, this, function (err, data_lines) {
if (err) {
return self.double_bounce("Error populating bounce message: " + err);
}

var hmails = [];

exports.process_domain(dom, [recip], from, data_lines, hmails, self.notes,
function (path, code, msg) {
fs.unlink(self.path);
if (code === DENY) {
// failed to even queue the mail
return self.double_bounce("Unable to queue the bounce message. Not sending bounce!");
}
setTimeout(function () {hmails[0].send()}, 0);
exports.send_email(from, recip, data_lines.join(''), function (code, msg) {
fs.unlink(self.path); // remove the bouncing message from the queue
if (code === DENY) {
// failed to even queue the mail
return self.double_bounce("Unable to queue the bounce message. Not sending bounce!");
}
);
});
});
}

Expand Down