Skip to content

Commit

Permalink
Merge pull request #187 from smfreegard/master
Browse files Browse the repository at this point in the history
Re-factor outbound functions to use less parameters
  • Loading branch information
baudehlo committed Apr 25, 2012
2 parents 12b19c7 + 59ea70f commit 9348e0a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 32 deletions.
9 changes: 9 additions & 0 deletions TODO
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,12 @@
how outbound.js does that as it is reading from disk.
4. Somehow work into all that ability to re-write the email after
adding banners etc.

Outbound improvements
- Add UUID to outbound log output
- Fix --qlist and --qstat (currently broken)
- Add the ability to force a run on a specific queue file or destination domain
- Make retry times configurable (handle RFC requirements for 5 days and DSN queued warnings)
- Limit concurrency by domain
- Disable deliveries for a domain
- Pool connections by domain/MX
69 changes: 37 additions & 32 deletions outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,21 +210,20 @@ exports.send_trans_email = function (transaction, next) {
}

for (var dom in recips) {
var from = transaction.mail_from;
var data_lines = transaction.data_lines;
this.process_domain(dom, recips[dom], from, data_lines, hmails, transaction.notes, mynext);
var todo = new TODOItem(dom, recips[dom], transaction);
this.process_domain(todo, hmails, mynext);
}
}

exports.process_domain = function (dom, recips, from, data_lines, hmails, notes, cb) {
exports.process_domain = function (todo, hmails, cb) {
var plugin = this;
this.loginfo("Processing domain: " + dom);
this.loginfo("Processing domain: " + todo.domain);
var fname = _fname();
var tmp_path = path.join(queue_dir, '.' + fname);
var ws = fs.createWriteStream(tmp_path, { flags: WRITE_EXCL });
var data_pos = 0;
var write_more = function () {
if (data_pos === data_lines.length) {
if (data_pos === todo.data_lines.length) {
ws.on('close', function () {
var dest_path = path.join(queue_dir, fname);
fs.rename(tmp_path, dest_path, function (err) {
Expand All @@ -233,7 +232,7 @@ exports.process_domain = function (dom, recips, from, data_lines, hmails, notes,
cb(tmp_path, DENY, "Queue error");
}
else {
hmails.push(new HMailItem (fname, dest_path, notes));
hmails.push(new HMailItem (fname, dest_path, todo.notes));
cb(tmp_path, OK, "Queued!");
}
});
Expand All @@ -244,7 +243,7 @@ exports.process_domain = function (dom, recips, from, data_lines, hmails, notes,

// write, but fixup "." at the beginning of the line to be ".."
// and fixup \n to be \r\n
if (ws.write(data_lines[data_pos++].replace(/^\./m, '..').replace(/\r?\n/g, "\r\n"))) {
if (ws.write(todo.data_lines[data_pos++].replace(/^\./m, '..').replace(/\r?\n/g, "\r\n"))) {
write_more();
}
};
Expand All @@ -257,19 +256,21 @@ exports.process_domain = function (dom, recips, from, data_lines, hmails, notes,

ws.on('drain', write_more);

plugin.build_todo(dom, recips, from, notes, ws, write_more);
plugin.build_todo(todo, ws, write_more);
}

exports.build_todo = function (dom, recips, from, notes, ws, write_more) {
var todo_str = JSON.stringify(
{
domain: dom,
mail_from: from,
rcpt_to: recips,
notes: notes
exports.build_todo = function (todo, ws, write_more) {
// Replacer function to exclude items from the queue file header
function exclude_from_json(key, value) {
switch (key) {
case 'data_lines':
return undefined;
default:
return value;
}
);

}
var todo_str = JSON.stringify(todo, exclude_from_json);

// since JS has no pack() we have to manually write the bytes of a long
var todo_length = new Buffer(4);
var todo_l = todo_str.length;
Expand Down Expand Up @@ -333,7 +334,8 @@ exports.split_to_new_recipients = function (hmail, recipients) {

ws.on('drain', write_more);

plugin.build_todo(hmail.todo.domain, recipients, hmail.todo.mail_from, hmail.todo.notes, ws, write_more);
hmail.todo.rcpt_to = recipients;
plugin.build_todo(hmail.todo, ws, write_more);
}

exports._load_cur_queue = function (cb_name) {
Expand Down Expand Up @@ -408,7 +410,16 @@ exports.stats = function () {
return results;
}


// TODOItem - queue file header data
function TODOItem (domain, recipients, transaction) {
this.domain = domain;
this.rcpt_to = recipients;
this.mail_from = transaction.mail_from;
this.data_lines = transaction.data_lines;
this.notes = transaction.notes;
this.uuid = transaction.uuid;
return this;
}

/////////////////////////////////////////////////////////////////////////////
// HMailItem - encapsulates an individual outbound mail item
Expand Down Expand Up @@ -964,24 +975,18 @@ HMailItem.prototype.bounce_respond = function (retval, msg) {

var from = new Address ('<>');
var recip = new Address (this.todo.mail_from.user, this.todo.mail_from.host);
var dom = recip.host;
populate_bounce_message(from, recip, err, this, function (err, data_lines) {
if (err) {
return self.double_bounce("Error populating bounce message: " + err);
}

var hmails = [];

exports.process_domain(dom, [recip], from, data_lines, hmails, self.notes,
function (path, code, msg) {
fs.unlink(self.path);
if (code === DENY) {
// failed to even queue the mail
return self.double_bounce("Unable to queue the bounce message. Not sending bounce!");
}
setTimeout(function () {hmails[0].send()}, 0);
exports.send_email(from, recip, data_lines.join(''), function (code, msg) {
fs.unlink(self.path); // remove the bouncing message from the queue
if (code === DENY) {
// failed to even queue the mail
return self.double_bounce("Unable to queue the bounce message. Not sending bounce!");
}
);
});
});
}

Expand Down

0 comments on commit 9348e0a

Please sign in to comment.