diff --git a/outbound/client_pool.js b/outbound/client_pool.js new file mode 100644 index 000000000..7cd624224 --- /dev/null +++ b/outbound/client_pool.js @@ -0,0 +1,163 @@ +"use strict"; + +var generic_pool = require('generic-pool'); +var sock = require('../line_socket'); +var server = require('../server'); +var logger = require('../logger'); +var cfg = require('./config'); + +function _create_socket (port, host, local_addr, is_unix_socket, connect_timeout, pool_timeout, callback) { + var socket = is_unix_socket ? sock.connect({path: host}) : + sock.connect({port: port, host: host, localAddress: local_addr}); + socket.setTimeout(connect_timeout * 1000); + logger.logdebug('[outbound] host=' + + host + ' port=' + port + ' pool_timeout=' + pool_timeout + ' created'); + socket.once('connect', function () { + socket.removeAllListeners('error'); // these get added after callback + callback(null, socket); + }); + socket.once('error', function (err) { + socket.end(); + var name = 'outbound::' + port + ':' + host + ':' + local_addr + ':' + pool_timeout; + if (server.notes.pool[name]) { + delete server.notes.pool[name]; + } + callback("Outbound connection error: " + err, null); + }); + socket.once('timeout', function () { + socket.end(); + callback("Outbound connection timed out to " + host + ":" + port, null); + }); +} + +// Separate pools are kept for each set of server attributes. +function get_pool (port, host, local_addr, is_unix_socket, connect_timeout, pool_timeout, max) { + port = port || 25; + host = host || 'localhost'; + connect_timeout = (connect_timeout === undefined) ? 30 : connect_timeout; + var name = 'outbound::' + port + ':' + host + ':' + local_addr + ':' + pool_timeout; + if (!server.notes.pool) { + server.notes.pool = {}; + } + if (!server.notes.pool[name]) { + var pool = generic_pool.Pool({ + name: name, + create: function (done) { + _create_socket(port, host, local_addr, is_unix_socket, connect_timeout, pool_timeout, done); + }, + validate: function (socket) { + return socket.writable; + }, + destroy: function (socket) { + logger.logdebug('[outbound] destroying pool entry for ' + host + ':' + port); + // Remove pool object from server notes once empty + var size = pool.getPoolSize(); + if (size === 0) { + delete server.notes.pool[name]; + } + socket.removeAllListeners(); + socket.once('error', function (err) { + logger.logwarn("[outbound] Socket got an error while shutting down: " + err); + }); + if (!socket.writable) return; + logger.logprotocol("[outbound] C: QUIT"); + socket.write("QUIT\r\n"); + socket.end(); // half close + socket.once('line', function (line) { + // Just assume this is a valid response + logger.logprotocol("[outbound] S: " + line); + socket.destroy(); + }); + }, + max: max || 10, + idleTimeoutMillis: pool_timeout * 1000, + log: function (str, level) { + if (/this._availableObjects.length=/.test(str)) return; + level = (level === 'verbose') ? 'debug' : level; + logger['log' + level]('[outbound] [' + name + '] ' + str); + } + }); + server.notes.pool[name] = pool; + } + return server.notes.pool[name]; +} + +// Get a socket for the given attributes. +exports.get_client = function (port, host, local_addr, is_unix_socket, callback) { + if (cfg.pool_concurrency_max == 0) { + return _create_socket(port, host, local_addr, is_unix_socket, cfg.connect_timeout, cfg.pool_timeout, callback); + } + + var pool = get_pool(port, host, local_addr, is_unix_socket, cfg.connect_timeout, cfg.pool_timeout, cfg.pool_concurrency_max); + if (pool.waitingClientsCount() >= cfg.pool_concurrency_max) { + return callback("Too many waiting clients for pool", null); + } + pool.acquire(function (err, socket) { + if (err) return callback(err); + socket.__acquired = true; + callback(null, socket); + }); +} + +exports.release_client = function (socket, port, host, local_addr, error) { + logger.logdebug("[outbound] release_client: " + host + ":" + port + " to " + local_addr); + + if (cfg.pool_concurrency_max == 0) { + return sockend(); + } + + if (!socket.__acquired) { + logger.logerror("Release an un-acquired socket. Stack: " + (new Error()).stack); + return; + } + socket.__acquired = false; + + var pool_timeout = cfg.pool_timeout; + var name = 'outbound::' + port + ':' + host + ':' + local_addr + ':' + pool_timeout; + if (!(server.notes && server.notes.pool)) { + logger.logcrit("[outbound] Releasing a pool (" + name + ") that doesn't exist!"); + return; + } + var pool = server.notes.pool[name]; + if (!pool) { + logger.logcrit("[outbound] Releasing a pool (" + name + ") that doesn't exist!"); + return; + } + + if (error) { + return sockend(); + } + + if (cfg.pool_timeout == 0) { + logger.loginfo("[outbound] Pool_timeout is zero - shutting it down"); + return sockend(); + } + + socket.removeAllListeners('close'); + socket.removeAllListeners('error'); + socket.removeAllListeners('end'); + socket.removeAllListeners('timeout'); + socket.removeAllListeners('line'); + + socket.__fromPool = true; + + socket.once('error', function (err) { + logger.logwarn("[outbound] Socket [" + name + "] in pool got an error: " + err); + sockend(); + }); + + socket.once('end', function () { + logger.logwarn("[outbound] Socket [" + name + "] in pool got FIN"); + sockend(); + }); + + pool.release(socket); + + function sockend () { + if (server.notes.pool[name]) { + server.notes.pool[name].destroy(socket); + } + socket.removeAllListeners(); + socket.destroy(); + } +} diff --git a/outbound/config.js b/outbound/config.js new file mode 100644 index 000000000..e548bc893 --- /dev/null +++ b/outbound/config.js @@ -0,0 +1,49 @@ +"use strict"; + +var config = require('../config'); + +var cfg = module.exports; + +function load_config () { + cfg = config.get('outbound.ini', { + booleans: [ + '-disabled', + '-always_split', + '+enable_tls', + '-ipv6_enabled', + ], + }, function () { + load_config(); + }).main; + + // legacy config file support. Remove in Haraka 4.0 + if (!cfg.disabled && config.get('outbound.disabled')) { + cfg.disabled = true; + } + if (!cfg.enable_tls && config.get('outbound.enable_tls')) { + cfg.enable_tls = true; + } + if (!cfg.maxTempFailures) { + cfg.maxTempFailures = config.get('outbound.maxTempFailures') || 13; + } + if (!cfg.concurrency_max) { + cfg.concurrency_max = config.get('outbound.concurrency_max') || 10000; + } + if (!cfg.connect_timeout) { + cfg.connect_timeout = 30; + } + if (cfg.pool_timeout === undefined) { + cfg.pool_timeout = 50; + } + if (!cfg.pool_concurrency_max) { + cfg.pool_concurrency_max = 10; + } + if (!cfg.ipv6_enabled && config.get('outbound.ipv6_enabled')) { + cfg.ipv6_enabled = true; + } + if (!cfg.received_header) { + cfg.received_header = config.get('outbound.received_header') || 'Haraka outbound'; + } +} + +load_config(); diff --git a/outbound.js b/outbound/hmail.js similarity index 56% rename from outbound.js rename to outbound/hmail.js index 159932221..c3a6f3561 100644 --- a/outbound.js +++ b/outbound/hmail.js @@ -1,917 +1,72 @@ -'use strict'; +"use strict"; -var dns = require('dns'); -var events = require('events'); -var fs = require('fs'); -var path = require('path'); -var net = require('net'); +var util = require('util'); +var events = require('events'); +var fs = require('fs'); +var dns = require('dns'); +var path = require('path'); +var net = require('net'); -var async = require('async'); var Address = require('address-rfc2821').Address; + var constants = require('haraka-constants'); -var generic_pool = require('generic-pool'); var net_utils = require('haraka-net-utils'); var utils = require('haraka-utils'); -var ResultStore = require('haraka-results'); - -var sock = require('./line_socket'); -var logger = require('./logger'); -var config = require('./config'); -var trans = require('./transaction'); -var plugins = require('./plugins'); -var TimerQueue = require('./timer_queue'); -var Header = require('./mailheader').Header; -var DSN = require('./dsn'); -var FsyncWriteStream = require('./outbound/fsync_writestream'); -var server = require('./server'); - -var core_consts = require('constants'); -var WRITE_EXCL = core_consts.O_CREAT | core_consts.O_TRUNC | core_consts.O_WRONLY | core_consts.O_EXCL; - -var my_hostname = require('os').hostname().replace(/\\/, '\\057').replace(/:/, '\\072'); - -var queue_dir; -if (config.get('queue_dir')) { - queue_dir = path.resolve(config.get('queue_dir')); -} -else if (process.env.HARAKA) { - queue_dir = path.resolve(process.env.HARAKA, 'queue'); -} -else { - queue_dir = path.resolve('tests', 'test-queue'); -} - -var cfg; -var platformDOT = ((['win32','win64'].indexOf( process.platform ) !== -1) ? '' : '__tmp__') + '.'; -exports.load_config = function () { - cfg = config.get('outbound.ini', { - booleans: [ - '-disabled', - '-always_split', - '+enable_tls', - '-ipv6_enabled', - ], - }, function () { - exports.load_config(); - }).main; - - // legacy config file support. Remove in Haraka 4.0 - if (!cfg.disabled && config.get('outbound.disabled')) { - cfg.disabled = true; - } - if (!cfg.enable_tls && config.get('outbound.enable_tls')) { - cfg.enable_tls = true; - } - if (!cfg.maxTempFailures) { - cfg.maxTempFailures = config.get('outbound.maxTempFailures') || 13; - } - if (!cfg.concurrency_max) { - cfg.concurrency_max = config.get('outbound.concurrency_max') || 10000; - } - if (!cfg.connect_timeout) { - cfg.connect_timeout = 30; - } - if (cfg.pool_timeout === undefined) { - cfg.pool_timeout = 50; - } - if (!cfg.pool_concurrency_max) { - cfg.pool_concurrency_max = 10; - } - if (!cfg.ipv6_enabled && config.get('outbound.ipv6_enabled')) { - cfg.ipv6_enabled = true; - } - if (!cfg.received_header) { - cfg.received_header = config.get('outbound.received_header') || 'Haraka outbound'; - } -}; -exports.load_config(); -exports.net_utils = net_utils; -exports.config = config; - -var load_queue = async.queue(function (file, cb) { - var hmail = new HMailItem(file, path.join(queue_dir, file)); - exports._add_file(hmail); - hmail.once('ready', cb); -}, cfg.concurrency_max); - -var in_progress = 0; -var delivery_queue = async.queue(function (hmail, cb) { - in_progress++; - hmail.next_cb = function () { - in_progress--; - cb(); - }; - hmail.send(); -}, cfg.concurrency_max); - -var temp_fail_queue = new TimerQueue(); - -var queue_count = 0; - -exports.get_stats = function () { - return in_progress + '/' + delivery_queue.length() + '/' + temp_fail_queue.length(); -}; - -exports.list_queue = function (cb) { - this._load_cur_queue(null, "_list_file", cb); -}; - -exports.stat_queue = function (cb) { - var self = this; - this._load_cur_queue(null, "_stat_file", function (err) { - if (err) return cb(err); - return cb(null, self.stats()); - }); -}; - -exports.scan_queue_pids = function (cb) { - - // Under cluster, this is called first by the master so - // we create the queue directory if it doesn't exist. - this.ensure_queue_dir(); - - fs.readdir(queue_dir, function (err, files) { - if (err) { - logger.logerror("[outbound] Failed to load queue directory (" + queue_dir + "): " + err); - return cb(err); - } - - var pids = {}; - - files.forEach(function (file) { - if (/^\./.test(file)) { - // dot-file... - logger.logwarn("[outbound] Removing left over dot-file: " + file); - return fs.unlink(file, function () {}); - } - - var parts = _qfile.parts(file); - if (!parts) { - logger.logerror("[outbound] Unrecognized file in queue directory: " + queue_dir + '/' + file); - return; - } - - pids[parts.pid] = true; - }); - - return cb(null, Object.keys(pids)); - }); -}; - -process.on('message', function (msg) { - if (msg.event && msg.event === 'outbound.load_pid_queue') { - exports.load_pid_queue(msg.data); - return; - } - if (msg.event && msg.event === 'outbound.flush_queue') { - exports.flush_queue(msg.domain, process.pid); - return; - } - if (msg.event && msg.event == 'outbound.shutdown') { - logger.loginfo("[outbound] Shutting down temp fail queue"); - exports.drain_pools(); - temp_fail_queue.shutdown(); - return; - } - if (msg.event && msg.event === 'outbound.drain_pools') { - exports.drain_pools(); - return; - } - // ignores the message -}); - -exports.drain_pools = function () { - if (!server.notes.pool || Object.keys(server.notes.pool).length == 0) { - return logger.logdebug("[outbound] Drain pools: No pools available"); - } - for (var p in server.notes.pool) { - logger.logdebug("[outbound] Drain pools: Draining SMTP connection pool " + p); - server.notes.pool[p].drain(function () { - if (!server.notes.pool[p]) return; - server.notes.pool[p].destroyAllNow(); - }); - } - logger.logdebug("[outbound] Drain pools: Pools shut down"); -} - -exports.flush_queue = function (domain, pid) { - if (domain) { - exports.list_queue(function (err, qlist) { - if (err) return logger.logerror("Failed to load queue: " + err); - qlist.forEach(function (todo) { - if (todo.domain.toLowerCase() != domain.toLowerCase()) return; - if (pid && todo.pid != pid) return; - // console.log("requeue: ", todo); - delivery_queue.push(new HMailItem(todo.file, todo.full_path)); - }); - }) - } - else { - temp_fail_queue.drain(); - } -}; - -exports.load_pid_queue = function (pid) { - logger.loginfo("[outbound] Loading queue for pid: " + pid); - this.load_queue(pid); -}; - -exports.ensure_queue_dir = function () { - // No reason not to do this stuff syncronously - - // this code is only run at start-up. - if (fs.existsSync(queue_dir)) return; - - logger.logdebug("[outbound] Creating queue directory " + queue_dir); - try { - fs.mkdirSync(queue_dir, 493); // 493 == 0755 - } - catch (err) { - if (err.code !== 'EEXIST') { - logger.logerror("Error creating queue directory: " + err); - throw err; - } - } -}; - -exports.load_queue = function (pid) { - // Initialise and load queue - // This function is called first when not running under cluster, - // so we create the queue directory if it doesn't already exist. - this.ensure_queue_dir(); - this._load_cur_queue(pid, "_add_file"); -}; - -exports._load_cur_queue = function (pid, cb_name, cb) { - var self = this; - logger.loginfo("[outbound] Loading outbound queue from ", queue_dir); - fs.readdir(queue_dir, function (err, files) { - if (err) { - return logger.logerror("Failed to load queue directory (" + - queue_dir + "): " + err); - } - - self.cur_time = new Date(); // set once so we're not calling it a lot - - self.load_queue_files(pid, cb_name, files, cb); - }); -}; -exports.load_queue_files = function (pid, cb_name, files, callback) { - var self = this; - if (files.length === 0) return; +var logger = require('../logger'); +var config = require('../config'); +var plugins = require('../plugins'); +var Header = require('../mailheader').Header; +var DSN = require('../dsn'); - if (cfg.disabled && cb_name === '_add_file') { - // try again in 1 second if delivery is disabled - setTimeout(function () { - exports.load_queue_files(pid, cb_name, files, callback); - }, 1000); - return; - } - - if (pid) { - // Pre-scan to rename PID files to my PID: - logger.loginfo("[outbound] Grabbing queue files for pid: " + pid); - async.eachLimit(files, 200, function (file, cb) { - - var parts = _qfile.parts(file); - if (parts && parts.pid === parseInt(pid)) { - var next_process = parts.next_attempt; - // maintain some original details for the rename - var new_filename = _qfile.name({ - arrival : parts.arrival, - uid : parts.uid, - next_attempt : parts.next_attempt, - attempts : parts.attempts, - }); - // logger.loginfo("new_filename: ", new_filename); - fs.rename(path.join(queue_dir, file), path.join(queue_dir, new_filename), function (err) { - if (err) { - logger.logerror("Unable to rename queue file: " + file + - " to " + new_filename + " : " + err); - return cb(); - } - if (next_process <= self.cur_time) { - load_queue.push(new_filename); - } - else { - temp_fail_queue.add(next_process - self.cur_time, function () { - load_queue.push(new_filename); - }); - } - cb(); - }); - } - else if (/^\./.test(file)) { - // dot-file... - logger.logwarn("Removing left over dot-file: " + file); - return fs.unlink(path.join(queue_dir, file), function (err) { - if (err) { - logger.logerror("Error removing dot-file: " + file + ": " + err); - } - cb(); - }); - } - else { - // Do this because otherwise we blow the stack - async.setImmediate(cb); - } - }, function (err) { - if (err) { - // no error cases yet, but log anyway - logger.logerror("Error fixing up queue files: " + err); - } - logger.loginfo("Done fixing up old PID queue files"); - logger.loginfo(delivery_queue.length() + " files in my delivery queue"); - logger.loginfo(load_queue.length() + " files in my load queue"); - logger.loginfo(temp_fail_queue.length() + " files in my temp fail queue"); +var client_pool = require('./client_pool'); +var cfg = require('./config'); +var _qfile = require('./qfile'); +var queuelib = require('./queue'); +var mx_lookup = require('./mx_lookup'); +var outbound = require('./index'); +var obtls = require('./tls'); - if (callback) callback(); - }); - } - else { - logger.loginfo("Loading the queue..."); - var good_file = function (file) { - if (/^\./.test(file)) { - logger.logwarn("Removing left over dot-file: " + file); - fs.unlink(path.join(queue_dir, file), function (err) { - if (err) console.error(err); - }); - return false; - } +var queue_dir = queuelib.queue_dir; +var temp_fail_queue = queuelib.temp_fail_queue; +var delivery_queue = queuelib.delivery_queue; - if (!_qfile.parts(file)) { - logger.logerror("Unrecognized file in queue folder: " + file); - return false; - } - return true; - } - async.mapSeries(files.filter(good_file), function (file, cb) { - // logger.logdebug("Loading queue file: " + file); - if (cb_name === '_add_file') { - var parts = _qfile.parts(file); - var next_process = parts.next_attempt; - - if (next_process <= self.cur_time) { - logger.logdebug("File needs processing now"); - load_queue.push(file); - } - else { - logger.logdebug("File needs processing later: " + (next_process - self.cur_time) + "ms"); - temp_fail_queue.add(next_process - self.cur_time, function () { load_queue.push(file);}); - } - cb(); - } - else { - self[cb_name](file, cb); - } - }, callback); - } -}; +var mx = require('./mx_lookup'); +var _qfile = require('./qfile'); +var cfg = require('./config'); -exports._add_file = function (hmail) { - if (hmail.next_process < this.cur_time) { - delivery_queue.push(hmail); - } - else { - temp_fail_queue.add(hmail.next_process - this.cur_time, function () { - delivery_queue.push(hmail); - }); - } -}; - -exports._list_file = function (file, cb) { - var tl_reader = fs.createReadStream(path.join(queue_dir, file), {start: 0, end: 3}); - tl_reader.on('error', function (err) { - console.error("Error reading queue file: " + file + ":", err); - }); - tl_reader.once('data', function (buf) { - // I'm making the assumption here we won't ever read less than 4 bytes - // as no filesystem on the planet should be that dumb... - tl_reader.destroy(); - var todo_len = (buf[0] << 24) + (buf[1] << 16) + (buf[2] << 8) + buf[3]; - var td_reader = fs.createReadStream(path.join(queue_dir, file), {encoding: 'utf8', start: 4, end: todo_len + 3}); - var todo = ''; - td_reader.on('data', function (str) { - todo += str; - if (Buffer.byteLength(todo) === todo_len) { - // we read everything - var todo_struct = JSON.parse(todo); - todo_struct.rcpt_to = todo_struct.rcpt_to.map(function (a) { return new Address (a); }); - todo_struct.mail_from = new Address (todo_struct.mail_from); - todo_struct.file = file; - todo_struct.full_path = path.join(queue_dir, file); - var parts = _qfile.parts(file); - todo_struct.pid = (parts && parts.pid) || null; - cb(null, todo_struct); - } - }); - td_reader.on('end', function () { - if (Buffer.byteLength(todo) !== todo_len) { - console.error("Didn't find right amount of data in todo for file:", file); - return cb(); - } - }); - }); -}; - -exports._stat_file = function (file, cb) { - queue_count++; - cb(); -}; - -exports.stats = function () { - // TODO: output more data here - var results = { - queue_dir: queue_dir, - queue_count: queue_count, - }; - - return results; -}; - - -var QFILECOUNTER = 0; -var _qfile = exports.qfile = { - // File Name Format: $arrival_$nextattempt_$attempts_$pid_$uniquetag_$counter_$host - name : function (overrides) { - var o = overrides || {}; - var time = _qfile.time(); - return [ - o.arrival || time, - o.next_attempt || time, - o.attempts || 0, - o.pid || process.pid, - o.uid || _qfile.rnd_unique(), - _qfile.next_counter(), - o.host || my_hostname - ].join('_'); - }, - - time : function () { - return new Date().getTime(); - }, - - next_counter: function () { - QFILECOUNTER = (QFILECOUNTER < 10000)?QFILECOUNTER+1:0; - return QFILECOUNTER; - }, - - rnd_unique: function (len) { - len = len || 6; - var chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'; - var result = []; - for (var i = len; i > 0; --i){ - result.push(chars[Math.floor(Math.random() * chars.length)]); - } - return result.join(''); - }, - - parts : function (filename) { - if (!filename){ - throw new Error("No filename provided"); - } - - var PARTS_EXPECTED_OLD = 4; - var PARTS_EXPECTED_CURRENT = 7; - var p = filename.split('_'); - - // bail on unknown split lengths - if (p.length !== PARTS_EXPECTED_OLD - && p.length !== PARTS_EXPECTED_CURRENT){ - return null; - } - - var time = _qfile.time(); - if (p.length === PARTS_EXPECTED_OLD){ - // parse the previous string structure - // $nextattempt_$attempts_$pid_$uniq.$host - // 1484878079415_0_12345_8888.mta1.example.com - // var fn_re = /^(\d+)_(\d+)_(\d+)(_\d+\..*)$/ - // match[1] = $nextattempt - // match[2] = $attempts - // match[3] = $pid - // match[4] = $uniq.$my_hostname - var fn_re = /^(\d+)_(\d+)_(\d+)_(\d+)\.(.*)$/; - var match = filename.match(fn_re); - if (!match){ - return null; - } - p = match.slice(1); // grab the capture groups minus the pattern - p.splice(3,1,_qfile.rnd_unique(),_qfile.next_counter()); // add a fresh UID and counter - p.unshift(time); // prepend current timestamp -- potentially inaccurate, but non-critical and shortlived - } - - return { - arrival : parseInt(p[0]), - next_attempt : parseInt(p[1]), - attempts : parseInt(p[2]), - pid : parseInt(p[3]), - uid : p[4], - counter : parseInt(p[5]), - host : p[6], - age : time - parseInt(p[0]) - }; - } -}; - - -exports.send_email = function () { - - if (arguments.length === 2) { - logger.loginfo("[outbound] Sending email as a transaction"); - return this.send_trans_email(arguments[0], arguments[1]); - } - - var from = arguments[0]; - var to = arguments[1]; - var contents = arguments[2]; - var next = arguments[3]; - var options = arguments[4]; - - var dot_stuffed = ((options && options.dot_stuffed) ? options.dot_stuffed : false); - var notes = ((options && options.notes) ? options.notes : null); - - logger.loginfo("[outbound] Sending email via params"); - - var transaction = trans.createTransaction(); - - logger.loginfo("[outbound] Created transaction: " + transaction.uuid); - - //Adding notes passed as parameter - if (notes) { - transaction.notes = notes; - } - - // set MAIL FROM address, and parse if it's not an Address object - if (from instanceof Address) { - transaction.mail_from = from; - } - else { - try { - from = new Address(from); - } - catch (err) { - return next(constants.deny, "Malformed from: " + err); - } - transaction.mail_from = from; - } - - // Make sure to is an array - if (!(Array.isArray(to))) { - // turn into an array - to = [ to ]; - } - - if (to.length === 0) { - return next(constants.deny, "No recipients for email"); - } - - // Set RCPT TO's, and parse each if it's not an Address object. - for (var i=0,l=to.length; i < l; i++) { - if (!(to[i] instanceof Address)) { - try { - to[i] = new Address(to[i]); - } - catch (err) { - return next(constants.deny, - "Malformed to address (" + to[i] + "): " + err); - } - } - } - - transaction.rcpt_to = to; - - // Set data_lines to lines in contents - if (typeof contents == 'string') { - var match; - while ((match = utils.line_regexp.exec(contents))) { - var line = match[1]; - line = line.replace(/\r?\n?$/, '\r\n'); // make sure it ends in \r\n - if (dot_stuffed === false && line.length >= 3 && line.substr(0,1) === '.') { - line = "." + line; - } - transaction.add_data(new Buffer(line)); - contents = contents.substr(match[1].length); - if (contents.length === 0) { - break; - } - } - } - else { - // Assume a stream - return stream_line_reader(contents, transaction, function (err) { - if (err) { - return next(constants.denysoft, "Error from stream line reader: " + err); - } - exports.send_trans_email(transaction, next); - }); - } - - transaction.message_stream.add_line_end(); - this.send_trans_email(transaction, next); -}; - -function stream_line_reader (stream, transaction, cb) { - var current_data = ''; - function process_data (data) { - current_data += data.toString(); - var results; - while ((results = utils.line_regexp.exec(current_data))) { - var this_line = results[1]; - current_data = current_data.slice(this_line.length); - if (!(current_data.length || this_line.length)) { - return; - } - transaction.add_data(new Buffer(this_line)); - } - } - - function process_end () { - if (current_data.length) { - transaction.add_data(new Buffer(current_data)); - } - current_data = ''; - transaction.message_stream.add_line_end(); - cb(); - } - - stream.on('data', process_data); - stream.once('end', process_end); - stream.once('error', cb); -} - -exports.send_trans_email = function (transaction, next) { - var self = this; - - // add in potentially missing headers - if (!transaction.header.get_all('Message-Id').length) { - logger.loginfo("[outbound] Adding missing Message-Id header"); - transaction.add_header('Message-Id', '<' + transaction.uuid + '@' + config.get('me') + '>'); - } - if (!transaction.header.get_all('Date').length) { - logger.loginfo("[outbound] Adding missing Date header"); - transaction.add_header('Date', utils.date_to_str(new Date())); - } - - transaction.add_leading_header('Received', '('+cfg.received_header+'); ' + utils.date_to_str(new Date())); - - var connection = { - transaction: transaction, - }; - - logger.add_log_methods(connection); - transaction.results = transaction.results || new ResultStore(connection); - - connection.pre_send_trans_email_respond = function (retval) { - var deliveries = []; - var always_split = cfg.always_split; - if (always_split) { - this.logdebug({name: "outbound"}, "always split"); - transaction.rcpt_to.forEach(function (rcpt) { - deliveries.push({domain: rcpt.host, rcpts: [ rcpt ]}); - }); - } - else { - // First get each domain - var recips = {}; - transaction.rcpt_to.forEach(function (rcpt) { - var domain = rcpt.host; - if (!recips[domain]) { recips[domain] = []; } - recips[domain].push(rcpt); - }); - Object.keys(recips).forEach(function (domain) { - deliveries.push({'domain': domain, 'rcpts': recips[domain]}); - }); - } - - var hmails = []; - var ok_paths = []; - - var todo_index = 1; - - async.forEachSeries(deliveries, function (deliv, cb) { - var todo = new TODOItem(deliv.domain, deliv.rcpts, transaction); - todo.uuid = todo.uuid + '.' + todo_index; - todo_index++; - self.process_delivery(ok_paths, todo, hmails, cb); - }, - function (err) { - if (err) { - for (var i=0,l=ok_paths.length; i> 8) & 0xff; - todo_length[1] = (todo_l >> 16) & 0xff; - todo_length[0] = (todo_l >> 24) & 0xff; - - var buf = Buffer.concat([todo_length, todo_str], todo_str.length + 4); - - var continue_writing = ws.write(buf); - if (continue_writing) return write_more(); - ws.once('drain', write_more); -}; - - -exports.split_to_new_recipients = function (hmail, recipients, response, cb) { - var self = this; - if (recipients.length === hmail.todo.rcpt_to.length) { - // Split to new for no reason - increase refcount and return self - hmail.refcount++; - return cb(hmail); - } - var fname = _qfile.name(); - var tmp_path = path.join(queue_dir, platformDOT + fname); - var ws = new FsyncWriteStream(tmp_path, { flags: WRITE_EXCL }); - var err_handler = function (err, location) { - logger.logerror("[outbound] Error while splitting to new recipients (" + location + "): " + err); - hmail.todo.rcpt_to.forEach(function (rcpt) { - hmail.extend_rcpt_with_dsn(rcpt, DSN.sys_unspecified("Error splitting to new recipients: " + err)); - }); - hmail.bounce("Error splitting to new recipients: " + err); - }; - - ws.on('error', function (err) { err_handler(err, "tmp file writer");}); - - var writing = false; - - var write_more = function () { - if (writing) return; - writing = true; - var rs = hmail.data_stream(); - rs.pipe(ws, {end: false}); - rs.on('error', function (err) { - err_handler(err, "hmail.data_stream reader"); - }); - rs.on('end', function () { - ws.on('close', function () { - var dest_path = path.join(queue_dir, fname); - fs.rename(tmp_path, dest_path, function (err) { - if (err) { - err_handler(err, "tmp file rename"); - } - else { - var split_mail = new HMailItem (fname, dest_path); - split_mail.once('ready', function () { - cb(split_mail); - }); - } - }); - }); - ws.destroySoon(); - return; - }); - }; - - ws.on('error', function (err) { - logger.logerror("[outbound] Unable to write queue file (" + fname + "): " + err); - ws.destroy(); - hmail.todo.rcpt_to.forEach(function (rcpt) { - hmail.extend_rcpt_with_dsn(rcpt, DSN.sys_unspecified("Error re-queueing some recipients: " + err)); - }); - hmail.bounce("Error re-queueing some recipients: " + err); - }); - - var new_todo = JSON.parse(JSON.stringify(hmail.todo)); - new_todo.rcpt_to = recipients; - self.build_todo(new_todo, ws, write_more); -}; - -exports.get_tls_options = function (mx) { - - var tls_options = exports.net_utils.tls_ini_section_with_defaults('outbound'); - tls_options.servername = mx.exchange; - - if (tls_options.key) { - if (Array.isArray(tls_options.key)) { - tls_options.key = tls_options.key[0]; - } - tls_options.key = exports.config.get(tls_options.key, 'binary'); - } - - if (tls_options.dhparam) { - tls_options.dhparam = exports.config.get(tls_options.dhparam, 'binary'); - } - - if (tls_options.cert) { - if (Array.isArray(tls_options.cert)) { - tls_options.cert = tls_options.cert[0]; - } - tls_options.cert = exports.config.get(tls_options.cert, 'binary'); - } - - return tls_options; -}; - -// TODOItem - queue file header data -function TODOItem (domain, recipients, transaction) { - this.queue_time = Date.now(); - this.domain = domain; - this.rcpt_to = recipients; - this.mail_from = transaction.mail_from; - this.message_stream = transaction.message_stream; - this.notes = transaction.notes; - this.uuid = transaction.uuid; - return this; -} - -// exported for testability -exports.TODOItem = TODOItem; ///////////////////////////////////////////////////////////////////////////// // HMailItem - encapsulates an individual outbound mail item var dummy_func = function () {}; -class HMailItem extends events.EventEmitter { - constructor (filename, filePath, notes) { - super(); - var parts = _qfile.parts(filename); - if (!parts) { - throw new Error("Bad filename: " + filename); - } - this.path = filePath; - this.filename = filename; - this.next_process = parts.next_attempt; - this.num_failures = parts.attempts; - this.pid = parts.pid; - this.notes = notes || {}; - this.refcount = 1; - this.todo = null; - this.file_size = 0; - this.next_cb = dummy_func; - this.bounce_error = null; - this.hook = null; - this.size_file(); - } + +function HMailItem (filename, filePath, notes) { + events.EventEmitter.call(this); + var parts = _qfile.parts(filename); + if (!parts) { + throw new Error("Bad filename: " + filename); + } + this.path = filePath; + this.filename = filename; + this.next_process = parts.next_attempt; + this.num_failures = parts.attempts; + this.pid = parts.pid; + this.notes = notes || {}; + this.refcount = 1; + this.todo = null; + this.file_size = 0; + this.next_cb = dummy_func; + this.bounce_error = null; + this.hook = null; + this.size_file(); } -exports.HMailItem = HMailItem; +util.inherits(HMailItem, events.EventEmitter); + +module.exports = HMailItem; + logger.add_log_methods(HMailItem.prototype, "outbound"); @@ -1051,72 +206,11 @@ HMailItem.prototype.get_mx_respond = function (retval, mx) { } // if none of the above return codes, drop through to this... - exports.lookup_mx(this.todo.domain, function (err, mxs) { + mx_lookup.lookup_mx(this.todo.domain, function (err, mxs) { hmail.found_mx(err, mxs); }); }; -exports.lookup_mx = function lookup_mx (domain, cb) { - var mxs = []; - - // Possible DNS errors - // NODATA - // FORMERR - // BADRESP - // NOTFOUND - // BADNAME - // TIMEOUT - // CONNREFUSED - // NOMEM - // DESTRUCTION - // NOTIMP - // EREFUSED - // SERVFAIL - - // default wrap_mx just returns our object with "priority" and "exchange" keys - var wrap_mx = function (a) { return a; }; - var process_dns = function (err, addresses) { - if (err) { - if (err.code === 'ENODATA') { - // Most likely this is a hostname with no MX record - // Drop through and we'll get the A record instead. - return 0; - } - cb(err); - } - else if (addresses && addresses.length) { - for (var i=0,l=addresses.length; i < l; i++) { - var mx = wrap_mx(addresses[i]); - mxs.push(mx); - } - cb(null, mxs); - } - else { - // return zero if we need to keep trying next option - return 0; - } - return 1; - }; - - dns.resolveMx(domain, function (err, addresses) { - if (process_dns(err, addresses)) { - return; - } - - // if MX lookup failed, we lookup an A record. To do that we change - // wrap_mx() to return same thing as resolveMx() does. - wrap_mx = function (a) { return {priority:0,exchange:a}; }; - // IS: IPv6 compatible - dns.resolve(domain, function (err2, addresses2) { - if (process_dns(err2, addresses2)) { - return; - } - err2 = new Error("Found nowhere to deliver to"); - err2.code = 'NOMX'; - cb(err2); - }); - }); -}; HMailItem.prototype.found_mx = function (err, mxs) { var hmail = this; @@ -1244,164 +338,6 @@ var cram_md5_response = function (username, password, challenge) { return utils.base64(username + ' ' + digest); } -function _create_socket (port, host, local_addr, is_unix_socket, callback) { - var socket = is_unix_socket ? sock.connect({path: host}) : - sock.connect({port: port, host: host, localAddress: local_addr}); - socket.setTimeout(cfg.connect_timeout * 1000); - logger.logdebug('[outbound] host=' + - host + ' port=' + port + ' pool_timeout=' + cfg.pool_timeout + ' created'); - socket.once('connect', function () { - socket.removeAllListeners('error'); // these get added after callback - callback(null, socket); - }); - socket.once('error', function (err) { - socket.end(); - var name = 'outbound::' + port + ':' + host + ':' + local_addr + ':' + cfg.pool_timeout; - if (server.notes.pool[name]) { - delete server.notes.pool[name]; - } - callback("Outbound connection error: " + err, null); - }); - socket.once('timeout', function () { - socket.end(); - callback("Outbound connection timed out to " + host + ":" + port, null); - }); -} - -// Separate pools are kept for each set of server attributes. -function get_pool (port, host, local_addr, is_unix_socket) { - port = port || 25; - host = host || 'localhost'; - var pool_timeout = cfg.pool_timeout || 300; - var name = 'outbound::' + port + ':' + host + ':' + local_addr + ':' + pool_timeout; - if (!server.notes.pool) { - server.notes.pool = {}; - } - if (server.notes.pool[name]) { - return server.notes.pool[name]; - } - - var pool = generic_pool.Pool({ - name: name, - create: function (done) { - _create_socket(port, host, local_addr, is_unix_socket, done); - }, - validate: function (socket) { - return socket.writable; - }, - destroy: function (socket) { - logger.logdebug('[outbound] destroying pool entry for ' + host + ':' + port); - // Remove pool object from server notes once empty - var size = pool.getPoolSize(); - if (size === 0) { - delete server.notes.pool[name]; - } - socket.removeAllListeners(); - socket.once('error', function (err) { - logger.logwarn("[outbound] Socket got an error while shutting down: " + err); - }); - if (!socket.writable) return; - logger.logprotocol("[outbound] C: QUIT"); - socket.write("QUIT\r\n"); - socket.end(); // half close - socket.once('line', function (line) { - // Just assume this is a valid response - logger.logprotocol("[outbound] S: " + line); - socket.destroy(); - }); - }, - max: cfg.pool_concurrency_max || 10, - idleTimeoutMillis: pool_timeout * 1000, - log: function (str, level) { - if (/this._availableObjects.length=/.test(str)) return; - level = (level === 'verbose') ? 'debug' : level; - logger['log' + level]('[outbound] [' + name + '] ' + str); - } - }); - server.notes.pool[name] = pool; - return pool; -} - -// Get a socket for the given attributes. -function get_client (port, host, local_addr, is_unix_socket, callback) { - if (cfg.pool_concurrency_max == 0) { - return _create_socket(port, host, local_addr, is_unix_socket, callback); - } - - var pool = get_pool(port, host, local_addr, is_unix_socket); - if (pool.waitingClientsCount() >= cfg.pool_concurrency_max) { - return callback("Too many waiting clients for pool", null); - } - pool.acquire(function (err, socket) { - if (err) return callback(err); - socket.__acquired = true; - callback(null, socket); - }); -} - -function release_client (socket, port, host, local_addr, error) { - logger.logdebug("[outbound] release_client: " + host + ":" + port + " to " + local_addr); - - if (cfg.pool_concurrency_max == 0) { - return sockend(); - } - - if (!socket.__acquired) { - logger.logerror("Release an un-acquired socket. Stack: " + (new Error()).stack); - return; - } - socket.__acquired = false; - - var pool_timeout = cfg.pool_timeout; - var name = 'outbound::' + port + ':' + host + ':' + local_addr + ':' + pool_timeout; - if (!(server.notes && server.notes.pool)) { - logger.logcrit("[outbound] Releasing a pool (" + name + ") that doesn't exist!"); - return; - } - var pool = server.notes.pool[name]; - if (!pool) { - logger.logcrit("[outbound] Releasing a pool (" + name + ") that doesn't exist!"); - return; - } - - if (error) { - return sockend(); - } - - if (cfg.pool_timeout == 0) { - logger.loginfo("[outbound] Pool_timeout is zero - shutting it down"); - return sockend(); - } - - socket.removeAllListeners('close'); - socket.removeAllListeners('error'); - socket.removeAllListeners('end'); - socket.removeAllListeners('timeout'); - socket.removeAllListeners('line'); - - socket.__fromPool = true; - - socket.once('error', function (err) { - logger.logwarn("[outbound] Socket [" + name + "] in pool got an error: " + err); - sockend(); - }); - - socket.once('end', function () { - logger.logwarn("[outbound] Socket [" + name + "] in pool got FIN"); - sockend(); - }); - - pool.release(socket); - - function sockend () { - if (server.notes.pool[name]) { - server.notes.pool[name].destroy(socket); - } - socket.removeAllListeners(); - socket.destroy(); - } -} - HMailItem.prototype.try_deliver_host = function (mx) { var self = this; @@ -1435,7 +371,7 @@ HMailItem.prototype.try_deliver_host = function (mx) { (mx.using_lmtp ? " using LMTP" : "") + " (" + delivery_queue.length() + ") (" + temp_fail_queue.length() + ")"); - get_client(port, host, mx.bind, mx.path ? true : false, function (err, socket) { + client_pool.get_client(port, host, mx.bind, mx.path ? true : false, function (err, socket) { if (err) { logger.logerror('[outbound] Failed to get pool entry: ' + err); // try next host @@ -1457,7 +393,7 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke if (processing_mail) { self.logerror("Ongoing connection failed to " + host + ":" + port + " : " + err); processing_mail = false; - release_client(socket, port, host, mx.bind, true); + client_pool.release_client(socket, port, host, mx.bind, true); // try the next MX return self.try_deliver_host(mx); } @@ -1467,7 +403,7 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke if (processing_mail) { self.logerror("Remote end " + host + ":" + port + " closed connection while we were processing mail. Trying next MX."); processing_mail = false; - release_client(socket, port, host, mx.bind, true); + client_pool.release_client(socket, port, host, mx.bind, true); return self.try_deliver_host(mx); } }); @@ -1499,14 +435,14 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke "auth": [], }; - var tls_cfg = net_utils.load_tls_ini(); + var tls_config = net_utils.load_tls_ini(); var send_command = socket.send_command = function (cmd, data) { if (!socket.writable) { self.logerror("Socket writability went away"); if (processing_mail) { processing_mail = false; - release_client(socket, port, host, mx.bind, true); + client_pool.release_client(socket, port, host, mx.bind, true); return self.try_deliver_host(mx); } return; @@ -1550,8 +486,8 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke } // TLS - if (!net_utils.ip_in_list(tls_cfg.no_tls_hosts, self.todo.domain) && - !net_utils.ip_in_list(tls_cfg.no_tls_hosts, host) && + if (!net_utils.ip_in_list(tls_config.no_tls_hosts, self.todo.domain) && + !net_utils.ip_in_list(tls_config.no_tls_hosts, host) && smtp_properties.tls && cfg.enable_tls && !secured) { socket.on('secure', function () { @@ -1630,14 +566,14 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke fp_called = true; if (fail_recips.length) { self.refcount++; - exports.split_to_new_recipients(self, fail_recips, "Some recipients temporarily failed", function (hmail) { + split_to_new_recipients(self, fail_recips, "Some recipients temporarily failed", function (hmail) { self.discard(); hmail.temp_fail("Some recipients temp failed: " + fail_recips.join(', '), { rcpt: fail_recips, mx: mx }); }); } if (bounce_recips.length) { self.refcount++; - exports.split_to_new_recipients(self, bounce_recips, "Some recipients rejected", function (hmail) { + split_to_new_recipients(self, bounce_recips, "Some recipients rejected", function (hmail) { self.discard(); hmail.bounce("Some recipients failed: " + bounce_recips.join(', '), { rcpt: bounce_recips, mx: mx }); }); @@ -1651,7 +587,7 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke else { self.discard(); } - release_client(socket, port, host, mx.bind, fin_sent); + client_pool.release_client(socket, port, host, mx.bind, fin_sent); }; socket.on('line', function (line) { @@ -1707,7 +643,7 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke }); send_command('RSET'); processing_mail = false; - release_client(socket, port, host, mx.bind); + client_pool.release_client(socket, port, host, mx.bind); return self.temp_fail("Upstream error: " + code + " " + ((extc) ? extc + ' ' : '') + reason); } else if (code.match(/^4/)) { @@ -1746,7 +682,7 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke }); send_command('RSET'); processing_mail = false; - release_client(socket, port, host, mx.bind); + client_pool.release_client(socket, port, host, mx.bind); return self.temp_fail("Upstream error: " + code + " " + ((extc) ? extc + ' ' : '') + reason); } } @@ -1788,7 +724,7 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke }); send_command('RSET'); processing_mail = false; - release_client(socket, port, host, mx.bind); + client_pool.release_client(socket, port, host, mx.bind); return self.bounce(reason, { mx: mx }); } } @@ -1804,7 +740,7 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke process_ehlo_data(); break; case 'starttls': - var tls_options = exports.get_tls_options(mx); + var tls_options = obtls.get_tls_options(mx); smtp_properties = {}; socket.upgrade(tls_options, function (authorized, verifyError, cert, cipher) { @@ -1890,7 +826,7 @@ HMailItem.prototype.try_deliver_host_on_socket = function (mx, host, port, socke // Unrecognized response. self.logerror("Unrecognized response from upstream server: " + line); processing_mail = false; - release_client(socket, port, host, mx.bind); + client_pool.release_client(socket, port, host, mx.bind); self.todo.rcpt_to.forEach(function (rcpt) { self.extend_rcpt_with_dsn(rcpt, DSN.proto_invalid_command("Unrecognized response from upstream server: " + line)); }); @@ -2183,7 +1119,7 @@ HMailItem.prototype.bounce_respond = function (retval, msg) { return self.double_bounce("Error populating bounce message: " + err2); } - exports.send_email(from, recip, data_lines.join(''), function (code, msg2) { + outbound.send_email(from, recip, data_lines.join(''), function (code, msg2) { if (code === constants.deny) { // failed to even queue the mail return self.double_bounce("Unable to queue the bounce message. Not sending bounce!"); @@ -2301,3 +1237,68 @@ HMailItem.prototype.delivered_respond = function (retval, msg) { } this.discard(); }; + +function split_to_new_recipients (hmail, recipients, response, cb) { + var self = this; + if (recipients.length === hmail.todo.rcpt_to.length) { + // Split to new for no reason - increase refcount and return self + hmail.refcount++; + return cb(hmail); + } + var fname = _qfile.name(); + var tmp_path = path.join(queue_dir, platformDOT + fname); + var ws = new FsyncWriteStream(tmp_path, { flags: WRITE_EXCL }); + var err_handler = function (err, location) { + logger.logerror("[outbound] Error while splitting to new recipients (" + location + "): " + err); + hmail.todo.rcpt_to.forEach(function (rcpt) { + hmail.extend_rcpt_with_dsn(rcpt, DSN.sys_unspecified("Error splitting to new recipients: " + err)); + }); + hmail.bounce("Error splitting to new recipients: " + err); + }; + + ws.on('error', function (err) { err_handler(err, "tmp file writer");}); + + var writing = false; + + var write_more = function () { + if (writing) return; + writing = true; + var rs = hmail.data_stream(); + rs.pipe(ws, {end: false}); + rs.on('error', function (err) { + err_handler(err, "hmail.data_stream reader"); + }); + rs.on('end', function () { + ws.on('close', function () { + var dest_path = path.join(queue_dir, fname); + fs.rename(tmp_path, dest_path, function (err) { + if (err) { + err_handler(err, "tmp file rename"); + } + else { + var split_mail = new HMailItem (fname, dest_path); + split_mail.once('ready', function () { + cb(split_mail); + }); + } + }); + }); + ws.destroySoon(); + return; + }); + }; + + ws.on('error', function (err) { + logger.logerror("[outbound] Unable to write queue file (" + fname + "): " + err); + ws.destroy(); + hmail.todo.rcpt_to.forEach(function (rcpt) { + hmail.extend_rcpt_with_dsn(rcpt, DSN.sys_unspecified("Error re-queueing some recipients: " + err)); + }); + hmail.bounce("Error re-queueing some recipients: " + err); + }); + + var new_todo = JSON.parse(JSON.stringify(hmail.todo)); + new_todo.rcpt_to = recipients; + self.build_todo(new_todo, ws, write_more); +}; + diff --git a/outbound/index.js b/outbound/index.js new file mode 100644 index 000000000..28c81a884 --- /dev/null +++ b/outbound/index.js @@ -0,0 +1,341 @@ +'use strict'; + +var async = require('async'); +var fs = require('fs'); +var path = require('path'); + +var Address = require('address-rfc2821').Address; +var constants = require('haraka-constants'); +var net_utils = require('haraka-net-utils'); +var utils = require('haraka-utils'); +var ResultStore = require('haraka-results'); + +var logger = require('../logger'); +var config = require('../config'); +var trans = require('../transaction'); +var plugins = require('../plugins'); +var DSN = require('../dsn'); +var FsyncWriteStream = require('./fsync_writestream'); +var server = require('../server'); + +var HMailItem = require('./hmail'); +var TODOItem = require('./todo'); +var cfg = require('./config'); +var queuelib = require('./queue'); + +var queue_dir = queuelib.queue_dir; +var temp_fail_queue = queuelib.temp_fail_queue; +var delivery_queue = queuelib.delivery_queue; + +var core_consts = require('constants'); +var WRITE_EXCL = core_consts.O_CREAT | core_consts.O_TRUNC | core_consts.O_WRONLY | core_consts.O_EXCL; + +var platformDOT = ((['win32','win64'].indexOf( process.platform ) !== -1) ? '' : '__tmp__') + '.'; + +exports.net_utils = net_utils; +exports.config = config; + +exports.get_stats = queuelib.get_stats; +exports.list_queue = queuelib.list_queue; +exports.stat_queue = queuelib.stat_queue; +exports.scan_queue_pids = queuelib.scan_queue_pids; +exports.flush_queue = queuelib.flush_queue; +exports.load_pid_queue = queuelib.load_pid_queue; +exports.ensure_queue_dir = queuelib.ensure_queue_dir; +exports.load_queue = queuelib.load_queue; +exports._add_file = queuelib._add_file; +exports.stats = queuelib.stats; +exports.drain_pools = queuelib.drain_pools; + +var _qfile = exports.qfile = require('./qfile'); + +process.on('message', function (msg) { + if (msg.event && msg.event === 'outbound.load_pid_queue') { + exports.load_pid_queue(msg.data); + return; + } + if (msg.event && msg.event === 'outbound.flush_queue') { + exports.flush_queue(msg.domain, process.pid); + return; + } + if (msg.event && msg.event == 'outbound.shutdown') { + logger.loginfo("[outbound] Shutting down temp fail queue"); + exports.drain_pools(); + temp_fail_queue.shutdown(); + return; + } + if (msg.event && msg.event === 'outbound.drain_pools') { + exports.drain_pools(); + return; + } + // ignores the message +}); + +exports.send_email = function () { + + if (arguments.length === 2) { + logger.loginfo("[outbound] Sending email as a transaction"); + return this.send_trans_email(arguments[0], arguments[1]); + } + + var from = arguments[0]; + var to = arguments[1]; + var contents = arguments[2]; + var next = arguments[3]; + var options = arguments[4]; + + var dot_stuffed = ((options && options.dot_stuffed) ? options.dot_stuffed : false); + var notes = ((options && options.notes) ? options.notes : null); + + logger.loginfo("[outbound] Sending email via params"); + + var transaction = trans.createTransaction(); + + logger.loginfo("[outbound] Created transaction: " + transaction.uuid); + + //Adding notes passed as parameter + if (notes) { + transaction.notes = notes; + } + + // set MAIL FROM address, and parse if it's not an Address object + if (from instanceof Address) { + transaction.mail_from = from; + } + else { + try { + from = new Address(from); + } + catch (err) { + return next(constants.deny, "Malformed from: " + err); + } + transaction.mail_from = from; + } + + // Make sure to is an array + if (!(Array.isArray(to))) { + // turn into an array + to = [ to ]; + } + + if (to.length === 0) { + return next(constants.deny, "No recipients for email"); + } + + // Set RCPT TO's, and parse each if it's not an Address object. + for (var i=0,l=to.length; i < l; i++) { + if (!(to[i] instanceof Address)) { + try { + to[i] = new Address(to[i]); + } + catch (err) { + return next(constants.deny, + "Malformed to address (" + to[i] + "): " + err); + } + } + } + + transaction.rcpt_to = to; + + // Set data_lines to lines in contents + if (typeof contents == 'string') { + var match; + while ((match = utils.line_regexp.exec(contents))) { + var line = match[1]; + line = line.replace(/\r?\n?$/, '\r\n'); // make sure it ends in \r\n + if (dot_stuffed === false && line.length >= 3 && line.substr(0,1) === '.') { + line = "." + line; + } + transaction.add_data(new Buffer(line)); + contents = contents.substr(match[1].length); + if (contents.length === 0) { + break; + } + } + } + else { + // Assume a stream + return stream_line_reader(contents, transaction, function (err) { + if (err) { + return next(constants.denysoft, "Error from stream line reader: " + err); + } + exports.send_trans_email(transaction, next); + }); + } + + transaction.message_stream.add_line_end(); + this.send_trans_email(transaction, next); +}; + +function stream_line_reader (stream, transaction, cb) { + var current_data = ''; + function process_data (data) { + current_data += data.toString(); + var results; + while ((results = utils.line_regexp.exec(current_data))) { + var this_line = results[1]; + current_data = current_data.slice(this_line.length); + if (!(current_data.length || this_line.length)) { + return; + } + transaction.add_data(new Buffer(this_line)); + } + } + + function process_end () { + if (current_data.length) { + transaction.add_data(new Buffer(current_data)); + } + current_data = ''; + transaction.message_stream.add_line_end(); + cb(); + } + + stream.on('data', process_data); + stream.once('end', process_end); + stream.once('error', cb); +} + +exports.send_trans_email = function (transaction, next) { + var self = this; + + // add in potentially missing headers + if (!transaction.header.get_all('Message-Id').length) { + logger.loginfo("[outbound] Adding missing Message-Id header"); + transaction.add_header('Message-Id', '<' + transaction.uuid + '@' + config.get('me') + '>'); + } + if (!transaction.header.get_all('Date').length) { + logger.loginfo("[outbound] Adding missing Date header"); + transaction.add_header('Date', utils.date_to_str(new Date())); + } + + transaction.add_leading_header('Received', '('+cfg.received_header+'); ' + utils.date_to_str(new Date())); + + var connection = { + transaction: transaction, + }; + + logger.add_log_methods(connection); + transaction.results = transaction.results || new ResultStore(connection); + + connection.pre_send_trans_email_respond = function (retval) { + var deliveries = []; + var always_split = cfg.always_split; + if (always_split) { + this.logdebug({name: "outbound"}, "always split"); + transaction.rcpt_to.forEach(function (rcpt) { + deliveries.push({domain: rcpt.host, rcpts: [ rcpt ]}); + }); + } + else { + // First get each domain + var recips = {}; + transaction.rcpt_to.forEach(function (rcpt) { + var domain = rcpt.host; + if (!recips[domain]) { recips[domain] = []; } + recips[domain].push(rcpt); + }); + Object.keys(recips).forEach(function (domain) { + deliveries.push({'domain': domain, 'rcpts': recips[domain]}); + }); + } + + var hmails = []; + var ok_paths = []; + + var todo_index = 1; + + async.forEachSeries(deliveries, function (deliv, cb) { + var todo = new TODOItem(deliv.domain, deliv.rcpts, transaction); + todo.uuid = todo.uuid + '.' + todo_index; + todo_index++; + self.process_delivery(ok_paths, todo, hmails, cb); + }, + function (err) { + if (err) { + for (var i=0,l=ok_paths.length; i> 8) & 0xff; + todo_length[1] = (todo_l >> 16) & 0xff; + todo_length[0] = (todo_l >> 24) & 0xff; + + var buf = Buffer.concat([todo_length, todo_str], todo_str.length + 4); + + var continue_writing = ws.write(buf); + if (continue_writing) return write_more(); + ws.once('drain', write_more); +}; + +// exported for testability +exports.TODOItem = TODOItem; + +exports.HMailItem = HMailItem; + +exports.lookup_mx = require('./mx_lookup').lookup_mx; diff --git a/outbound/mx_lookup.js b/outbound/mx_lookup.js new file mode 100644 index 000000000..7752e5f6c --- /dev/null +++ b/outbound/mx_lookup.js @@ -0,0 +1,65 @@ +"use strict"; + +var dns = require('dns'); + +exports.lookup_mx = function lookup_mx (domain, cb) { + var mxs = []; + + // Possible DNS errors + // NODATA + // FORMERR + // BADRESP + // NOTFOUND + // BADNAME + // TIMEOUT + // CONNREFUSED + // NOMEM + // DESTRUCTION + // NOTIMP + // EREFUSED + // SERVFAIL + + // default wrap_mx just returns our object with "priority" and "exchange" keys + var wrap_mx = function (a) { return a; }; + var process_dns = function (err, addresses) { + if (err) { + if (err.code === 'ENODATA') { + // Most likely this is a hostname with no MX record + // Drop through and we'll get the A record instead. + return 0; + } + cb(err); + } + else if (addresses && addresses.length) { + for (var i=0,l=addresses.length; i < l; i++) { + var mx = wrap_mx(addresses[i]); + mxs.push(mx); + } + cb(null, mxs); + } + else { + // return zero if we need to keep trying next option + return 0; + } + return 1; + }; + + dns.resolveMx(domain, function (err, addresses) { + if (process_dns(err, addresses)) { + return; + } + + // if MX lookup failed, we lookup an A record. To do that we change + // wrap_mx() to return same thing as resolveMx() does. + wrap_mx = function (a) { return {priority:0,exchange:a}; }; + // IS: IPv6 compatible + dns.resolve(domain, function (err2, addresses2) { + if (process_dns(err2, addresses2)) { + return; + } + err2 = new Error("Found nowhere to deliver to"); + err2.code = 'NOMX'; + cb(err2); + }); + }); +}; diff --git a/outbound/qfile.js b/outbound/qfile.js new file mode 100644 index 000000000..aa3a17772 --- /dev/null +++ b/outbound/qfile.js @@ -0,0 +1,88 @@ +"use strict"; + +var my_hostname = require('os').hostname().replace(/\\/, '\\057').replace(/:/, '\\072'); + +var QFILECOUNTER = 0; + +var _qfile = module.exports = { + // File Name Format: $arrival_$nextattempt_$attempts_$pid_$uniquetag_$counter_$host + name : function (overrides) { + var o = overrides || {}; + var time = _qfile.time(); + return [ + o.arrival || time, + o.next_attempt || time, + o.attempts || 0, + o.pid || process.pid, + o.uid || _qfile.rnd_unique(), + _qfile.next_counter(), + o.host || my_hostname + ].join('_'); + }, + + time : function () { + return new Date().getTime(); + }, + + next_counter: function () { + QFILECOUNTER = (QFILECOUNTER < 10000)?QFILECOUNTER+1:0; + return QFILECOUNTER; + }, + + rnd_unique: function (len) { + len = len || 6; + var chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'; + var result = []; + for (var i = len; i > 0; --i){ + result.push(chars[Math.floor(Math.random() * chars.length)]); + } + return result.join(''); + }, + + parts : function (filename) { + if (!filename){ + throw new Error("No filename provided"); + } + + var PARTS_EXPECTED_OLD = 4; + var PARTS_EXPECTED_CURRENT = 7; + var p = filename.split('_'); + + // bail on unknown split lengths + if (p.length !== PARTS_EXPECTED_OLD + && p.length !== PARTS_EXPECTED_CURRENT){ + return null; + } + + var time = _qfile.time(); + if (p.length === PARTS_EXPECTED_OLD){ + // parse the previous string structure + // $nextattempt_$attempts_$pid_$uniq.$host + // 1484878079415_0_12345_8888.mta1.example.com + // var fn_re = /^(\d+)_(\d+)_(\d+)(_\d+\..*)$/ + // match[1] = $nextattempt + // match[2] = $attempts + // match[3] = $pid + // match[4] = $uniq.$my_hostname + var fn_re = /^(\d+)_(\d+)_(\d+)_(\d+)\.(.*)$/; + var match = filename.match(fn_re); + if (!match){ + return null; + } + p = match.slice(1); // grab the capture groups minus the pattern + p.splice(3,1,_qfile.rnd_unique(),_qfile.next_counter()); // add a fresh UID and counter + p.unshift(time); // prepend current timestamp -- potentially inaccurate, but non-critical and shortlived + } + + return { + arrival : parseInt(p[0]), + next_attempt : parseInt(p[1]), + attempts : parseInt(p[2]), + pid : parseInt(p[3]), + uid : p[4], + counter : parseInt(p[5]), + host : p[6], + age : time - parseInt(p[0]) + }; + } +}; diff --git a/outbound/queue.js b/outbound/queue.js new file mode 100644 index 000000000..8358b8c89 --- /dev/null +++ b/outbound/queue.js @@ -0,0 +1,346 @@ +"use strict"; + +var async = require('async'); +var path = require('path'); +var fs = require('fs'); +var Address = require('address-rfc2821').Address; + +var TimerQueue = require('../timer_queue'); +var config = require('../config'); +var logger = require('../logger'); +var server = require('../server'); + +var HMailItem = require('./hmail'); +var cfg = require('./config'); +var _qfile = require('./qfile'); + +var queue_dir; +if (config.get('queue_dir')) { + queue_dir = path.resolve(config.get('queue_dir')); +} +else if (process.env.HARAKA) { + queue_dir = path.resolve(process.env.HARAKA, 'queue'); +} +else { + queue_dir = path.resolve('tests', 'test-queue'); +} + +exports.queue_dir = queue_dir; + +var load_queue = async.queue(function (file, cb) { + var hmail = new HMailItem(file, path.join(queue_dir, file)); + exports._add_file(hmail); + hmail.once('ready', cb); +}, cfg.concurrency_max); + +var in_progress = 0; +var delivery_queue = exports.delivery_queue = async.queue(function (hmail, cb) { + in_progress++; + hmail.next_cb = function () { + in_progress--; + cb(); + }; + hmail.send(); +}, cfg.concurrency_max); + +var temp_fail_queue = exports.temp_fail_queue = new TimerQueue(); + +var queue_count = 0; + +exports.get_stats = function () { + return in_progress + '/' + exports.delivery_queue.length() + '/' + exports.temp_fail_queue.length(); +}; + +exports.list_queue = function (cb) { + this._load_cur_queue(null, "_list_file", cb); +}; + +exports._stat_file = function (file, cb) { + queue_count++; + cb(); +}; + +exports.stat_queue = function (cb) { + var self = this; + this._load_cur_queue(null, "_stat_file", function (err) { + if (err) return cb(err); + return cb(null, self.stats()); + }); +}; + +exports.load_queue = function (pid) { + // Initialise and load queue + // This function is called first when not running under cluster, + // so we create the queue directory if it doesn't already exist. + this.ensure_queue_dir(); + this._load_cur_queue(pid, "_add_file"); +}; + +exports._load_cur_queue = function (pid, cb_name, cb) { + var self = this; + logger.loginfo("[outbound] Loading outbound queue from ", queue_dir); + fs.readdir(queue_dir, function (err, files) { + if (err) { + return logger.logerror("Failed to load queue directory (" + + queue_dir + "): " + err); + } + + self.cur_time = new Date(); // set once so we're not calling it a lot + + self.load_queue_files(pid, cb_name, files, cb); + }); +}; + +exports.load_queue_files = function (pid, cb_name, files, callback) { + var self = this; + if (files.length === 0) return; + + if (cfg.disabled && cb_name === '_add_file') { + // try again in 1 second if delivery is disabled + setTimeout(function () { + exports.load_queue_files(pid, cb_name, files, callback); + }, 1000); + return; + } + + if (pid) { + // Pre-scan to rename PID files to my PID: + logger.loginfo("[outbound] Grabbing queue files for pid: " + pid); + async.eachLimit(files, 200, function (file, cb) { + + var parts = _qfile.parts(file); + if (parts && parts.pid === parseInt(pid)) { + var next_process = parts.next_attempt; + // maintain some original details for the rename + var new_filename = _qfile.name({ + arrival : parts.arrival, + uid : parts.uid, + next_attempt : parts.next_attempt, + attempts : parts.attempts, + }); + // logger.loginfo("new_filename: ", new_filename); + fs.rename(path.join(queue_dir, file), path.join(queue_dir, new_filename), function (err) { + if (err) { + logger.logerror("Unable to rename queue file: " + file + + " to " + new_filename + " : " + err); + return cb(); + } + if (next_process <= self.cur_time) { + load_queue.push(new_filename); + } + else { + temp_fail_queue.add(next_process - self.cur_time, function () { + load_queue.push(new_filename); + }); + } + cb(); + }); + } + else if (/^\./.test(file)) { + // dot-file... + logger.logwarn("Removing left over dot-file: " + file); + return fs.unlink(path.join(queue_dir, file), function (err) { + if (err) { + logger.logerror("Error removing dot-file: " + file + ": " + err); + } + cb(); + }); + } + else { + // Do this because otherwise we blow the stack + async.setImmediate(cb); + } + }, function (err) { + if (err) { + // no error cases yet, but log anyway + logger.logerror("Error fixing up queue files: " + err); + } + logger.loginfo("Done fixing up old PID queue files"); + logger.loginfo(delivery_queue.length() + " files in my delivery queue"); + logger.loginfo(load_queue.length() + " files in my load queue"); + logger.loginfo(temp_fail_queue.length() + " files in my temp fail queue"); + + if (callback) callback(); + }); + } + else { + logger.loginfo("Loading the queue..."); + var good_file = function (file) { + if (/^\./.test(file)) { + logger.logwarn("Removing left over dot-file: " + file); + fs.unlink(path.join(queue_dir, file), function (err) { + if (err) console.error(err); + }); + return false; + } + + if (!_qfile.parts(file)) { + logger.logerror("Unrecognized file in queue folder: " + file); + return false; + } + return true; + } + async.mapSeries(files.filter(good_file), function (file, cb) { + // logger.logdebug("Loading queue file: " + file); + if (cb_name === '_add_file') { + var parts = _qfile.parts(file); + var next_process = parts.next_attempt; + + if (next_process <= self.cur_time) { + logger.logdebug("File needs processing now"); + load_queue.push(file); + } + else { + logger.logdebug("File needs processing later: " + (next_process - self.cur_time) + "ms"); + temp_fail_queue.add(next_process - self.cur_time, function () { load_queue.push(file);}); + } + cb(); + } + else { + self[cb_name](file, cb); + } + }, callback); + } +}; + +exports.stats = function () { + // TODO: output more data here + var results = { + queue_dir: queue_dir, + queue_count: queue_count, + }; + + return results; +}; + +exports._list_file = function (file, cb) { + var tl_reader = fs.createReadStream(path.join(queue_dir, file), {start: 0, end: 3}); + tl_reader.on('error', function (err) { + console.error("Error reading queue file: " + file + ":", err); + }); + tl_reader.once('data', function (buf) { + // I'm making the assumption here we won't ever read less than 4 bytes + // as no filesystem on the planet should be that dumb... + tl_reader.destroy(); + var todo_len = (buf[0] << 24) + (buf[1] << 16) + (buf[2] << 8) + buf[3]; + var td_reader = fs.createReadStream(path.join(queue_dir, file), {encoding: 'utf8', start: 4, end: todo_len + 3}); + var todo = ''; + td_reader.on('data', function (str) { + todo += str; + if (Buffer.byteLength(todo) === todo_len) { + // we read everything + var todo_struct = JSON.parse(todo); + todo_struct.rcpt_to = todo_struct.rcpt_to.map(function (a) { return new Address (a); }); + todo_struct.mail_from = new Address (todo_struct.mail_from); + todo_struct.file = file; + todo_struct.full_path = path.join(queue_dir, file); + var parts = _qfile.parts(file); + todo_struct.pid = (parts && parts.pid) || null; + cb(null, todo_struct); + } + }); + td_reader.on('end', function () { + if (Buffer.byteLength(todo) !== todo_len) { + console.error("Didn't find right amount of data in todo for file:", file); + return cb(); + } + }); + }); +}; + +exports.flush_queue = function (domain, pid) { + if (domain) { + exports.list_queue(function (err, qlist) { + if (err) return logger.logerror("Failed to load queue: " + err); + qlist.forEach(function (todo) { + if (todo.domain.toLowerCase() != domain.toLowerCase()) return; + if (pid && todo.pid != pid) return; + // console.log("requeue: ", todo); + delivery_queue.push(new HMailItem(todo.file, todo.full_path)); + }); + }) + } + else { + temp_fail_queue.drain(); + } +}; + +exports.load_pid_queue = function (pid) { + logger.loginfo("[outbound] Loading queue for pid: " + pid); + this.load_queue(pid); +}; + +exports.ensure_queue_dir = function () { + // No reason not to do this stuff syncronously - + // this code is only run at start-up. + if (fs.existsSync(queue_dir)) return; + + logger.logdebug("[outbound] Creating queue directory " + queue_dir); + try { + fs.mkdirSync(queue_dir, 493); // 493 == 0755 + } + catch (err) { + if (err.code !== 'EEXIST') { + logger.logerror("Error creating queue directory: " + err); + throw err; + } + } +}; + +exports._add_file = function (hmail) { + if (hmail.next_process < this.cur_time) { + delivery_queue.push(hmail); + } + else { + temp_fail_queue.add(hmail.next_process - this.cur_time, function () { + delivery_queue.push(hmail); + }); + } +}; + +exports.scan_queue_pids = function (cb) { + // Under cluster, this is called first by the master so + // we create the queue directory if it doesn't exist. + this.ensure_queue_dir(); + + fs.readdir(queue_dir, function (err, files) { + if (err) { + logger.logerror("[outbound] Failed to load queue directory (" + queue_dir + "): " + err); + return cb(err); + } + + var pids = {}; + + files.forEach(function (file) { + if (/^\./.test(file)) { + // dot-file... + logger.logwarn("[outbound] Removing left over dot-file: " + file); + return fs.unlink(file, function () {}); + } + + var parts = _qfile.parts(file); + if (!parts) { + logger.logerror("[outbound] Unrecognized file in queue directory: " + queue_dir + '/' + file); + return; + } + + pids[parts.pid] = true; + }); + + return cb(null, Object.keys(pids)); + }); +}; + +exports.drain_pools = function () { + if (!server.notes.pool || Object.keys(server.notes.pool).length == 0) { + return logger.logdebug("[outbound] Drain pools: No pools available"); + } + for (var p in server.notes.pool) { + logger.logdebug("[outbound] Drain pools: Draining SMTP connection pool " + p); + server.notes.pool[p].drain(function () { + if (!server.notes.pool[p]) return; + server.notes.pool[p].destroyAllNow(); + }); + } + logger.logdebug("[outbound] Drain pools: Pools shut down"); +} diff --git a/outbound/tls.js b/outbound/tls.js new file mode 100644 index 000000000..4df52ef93 --- /dev/null +++ b/outbound/tls.js @@ -0,0 +1,50 @@ +"use strict"; + +var net_utils = require('haraka-net-utils'); + +exports.config = require('../config'); + +exports.get_tls_options = function (mx) { + + var tls_config = net_utils.load_tls_ini(); + var tls_options = { servername: mx.exchange }; + var config_options = [ + 'key', 'cert', 'ciphers', 'dhparam', + 'requestCert', 'honorCipherOrder', 'rejectUnauthorized' + ]; + + for (var i = 0; i < config_options.length; i++) { + var opt = config_options[i]; + if (tls_config.main[opt] === undefined) { continue; } + tls_options[opt] = tls_config.main[opt]; + } + + if (tls_config.outbound) { + for (var j = 0; j < config_options.length; j++) { + var opt2 = config_options[j]; + if (tls_config.outbound[opt2] === undefined) { continue; } + tls_options[opt2] = tls_config.outbound[opt2]; + } + } + + if (tls_options.key) { + if (Array.isArray(tls_options.key)) { + tls_options.key = tls_options.key[0]; + } + tls_options.key = exports.config.get(tls_options.key, 'binary'); + } + + if (tls_options.dhparam) { + tls_options.dhparam = exports.config.get(tls_options.dhparam, 'binary'); + } + + if (tls_options.cert) { + if (Array.isArray(tls_options.cert)) { + tls_options.cert = tls_options.cert[0]; + } + tls_options.cert = exports.config.get(tls_options.cert, 'binary'); + } + + return tls_options; +}; + diff --git a/outbound/todo.js b/outbound/todo.js new file mode 100644 index 000000000..db7dd7a3c --- /dev/null +++ b/outbound/todo.js @@ -0,0 +1,15 @@ +"use strict"; + +// TODOItem - queue file header data +function TODOItem (domain, recipients, transaction) { + this.queue_time = Date.now(); + this.domain = domain; + this.rcpt_to = recipients; + this.mail_from = transaction.mail_from; + this.message_stream = transaction.message_stream; + this.notes = transaction.notes; + this.uuid = transaction.uuid; + return this; +} + +module.exports = TODOItem; diff --git a/tests/fixtures/vm_harness.js b/tests/fixtures/vm_harness.js index 2aaa7a7a6..443c23c29 100644 --- a/tests/fixtures/vm_harness.js +++ b/tests/fixtures/vm_harness.js @@ -6,16 +6,26 @@ function dot_files (element) { } exports.sandbox_require = function (id) { - if (id[0] == '.') { + if (id[0] == '.' && id[1] != '.') { try { var override = __dirname + '/' + id + '.js'; fs.statSync(override); id = override; } catch (e) { - id = '../../' + id; + try { + override = __dirname + '/../../outbound/' + id.replace(/^[./]*/, '') + '.js'; + fs.statSync(override); + id = override; + } + catch (err) { + id = '../../' + id.replace(/^[./]*/, ''); + } } } + else if (id[0] == '.' && id[1] == '.') { + id = '../../' + id.replace(/^[./]*/, ''); + } return require(id); } diff --git a/tests/outbound.js b/tests/outbound.js index 759ebd99b..9d3667c70 100644 --- a/tests/outbound.js +++ b/tests/outbound.js @@ -133,6 +133,7 @@ exports.get_tls_options = { setUp : function (done) { process.env.HARAKA_TEST_DIR=path.resolve('tests'); this.outbound = require('../outbound'); + this.obtls = require('../outbound/tls'); done(); }, tearDown: function (done) { @@ -146,8 +147,9 @@ exports.get_tls_options = { var testDir = path.resolve('tests'); this.outbound.net_utils.config = this.outbound.net_utils.config.module_config(testDir); this.outbound.config = this.outbound.config.module_config(testDir); + this.obtls.config = this.outbound.config; - var tls_config = this.outbound.get_tls_options( + var tls_config = this.obtls.get_tls_options( { exchange: 'mail.example.com'} ); diff --git a/tests/outbound_bounce_net_errors.js b/tests/outbound_bounce_net_errors.js new file mode 100644 index 000000000..679ee0b57 --- /dev/null +++ b/tests/outbound_bounce_net_errors.js @@ -0,0 +1,175 @@ +'use strict'; + +// Testing bounce email contents related to errors occuring during SMTP dialog + +// About running the tests: +// - Making a folder for queuing files +// - Creating a HMailItem instance using fixtures/util_hmailitem +// - Talk some STMP in the playbook +// - Test the outcome by replacing trigger functions with our testing code (outbound.send_email, HMailItem.temp_fail, ...) + +require('../configfile').watch_files = false; +var fs = require('fs'); +var path = require('path'); +var util_hmailitem = require('./fixtures/util_hmailitem'); +var TODOItem = require('../outbound/todo'); +var HMailItem = require('../outbound/hmail'); +var outbound = require('../outbound'); +var dns = require('dns'); +var constants = require('haraka-constants'); + +var outbound_context = { + TODOItem: TODOItem, + exports: outbound +}; + +var queue_dir = path.resolve(__dirname, 'test-queue'); + +exports.bounce_3464 = { + setUp : function (done) { + fs.exists(queue_dir, function (exists) { + if (exists) { + done(); + } + else { + fs.mkdir(queue_dir, function (err) { + if (err) { + return done(err); + } + done(); + }); + } + }); + }, + tearDown: function (done) { + fs.exists(queue_dir, function (exists) { + if (exists) { + var files = fs.readdirSync(queue_dir); + files.forEach(function (file,index){ + var curPath = path.resolve(queue_dir, file); + if (fs.lstatSync(curPath).isDirectory()) { // recurse + return done(new Error('did not expect an sub folder here ("' + curPath + '")! cancel')); + } + }); + files.forEach(function (file,index){ + var curPath = path.resolve(queue_dir, file); + fs.unlinkSync(curPath); + }); + done(); + } + else { + done(); + } + }); + }, + 'test get-mx-deny triggers bounce(...)': function (test) { + test.expect(2); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var orig_bounce = HMailItem.prototype.bounce; + HMailItem.prototype.bounce = function (err, opts) { + test.ok(true, 'get_mx=DENY: bounce function called'); + /* dsn_code: 550, + dsn_status: '5.1.2', + dsn_action: 'failed' */ + test.equal('5.1.2', this.todo.rcpt_to[0].dsn_status, 'get_mx=DENY dsn status = 5.1.2'); + }; + mock_hmail.domain = mock_hmail.todo.domain; + HMailItem.prototype.get_mx_respond.apply(mock_hmail, [constants.deny, {}]); + HMailItem.prototype.bounce = orig_bounce; + test.done(); + }); + }, + 'test get-mx-denysoft triggers temp_fail(...)': function (test) { + test.expect(2); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var orig_temp_fail = HMailItem.prototype.temp_fail; + HMailItem.prototype.temp_fail = function (err, opts) { + test.ok(true, 'get_mx-DENYSOFT: temp_fail function called'); + /*dsn_code: 450, + dsn_status: '4.1.2', + dsn_action: 'delayed' */ + test.equal('4.1.2', this.todo.rcpt_to[0].dsn_status, 'get_mx=DENYSOFT dsn status = 4.1.2'); + }; + mock_hmail.domain = mock_hmail.todo.domain; + HMailItem.prototype.get_mx_respond.apply(mock_hmail, [constants.denysoft, {}]); + HMailItem.prototype.temp_fail = orig_temp_fail; + test.done(); + }); + }, + 'test found_mx({code:dns.NXDOMAIN}) triggers bounce(...)': function (test) { + test.expect(2); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var orig_bounce = HMailItem.prototype.bounce; + HMailItem.prototype.bounce = function (err, opts) { + test.ok(true, 'found_mx({code: dns.NXDOMAIN}): bounce function called'); + test.equal('5.1.2', this.todo.rcpt_to[0].dsn_status, 'found_mx({code: dns.NXDOMAIN}: dsn status = 5.1.2'); + }; + HMailItem.prototype.found_mx.apply(mock_hmail, [{code: dns.NXDOMAIN}, {}]); + HMailItem.prototype.bounce = orig_bounce; + test.done(); + }); + }, + 'test found_mx({code:\'NOMX\'}) triggers bounce(...)': function (test) { + test.expect(2); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var orig_bounce = HMailItem.prototype.bounce; + HMailItem.prototype.bounce = function (err, opts) { + test.ok(true, 'found_mx({code: "NOMX"}): bounce function called'); + test.equal('5.1.2', this.todo.rcpt_to[0].dsn_status, 'found_mx({code: "NOMX"}: dsn status = 5.1.2'); + }; + HMailItem.prototype.found_mx.apply(mock_hmail, [{code: 'NOMX'}, {}]); + HMailItem.prototype.bounce = orig_bounce; + test.done(); + }); + }, + 'test found_mx({code:\'SOME-OTHER-ERR\'}) triggers temp_fail(...)': function (test) { + test.expect(2); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var orig_temp_fail = HMailItem.prototype.temp_fail; + HMailItem.prototype.temp_fail = function (err, opts) { + test.ok(true, 'found_mx({code: "SOME-OTHER-ERR"}): temp_fail function called'); + test.equal('4.1.0', this.todo.rcpt_to[0].dsn_status, 'found_mx({code: "SOME-OTHER-ERR"}: dsn status = 4.1.0'); + }; + HMailItem.prototype.found_mx.apply(mock_hmail, [{code: 'SOME-OTHER-ERR'}, {}]); + HMailItem.prototype.temp_fail = orig_temp_fail; + test.done(); + }); + }, + 'test found_mx(null, [{priority:0,exchange:\'\'}]) triggers bounce(...)': function (test) { + test.expect(2); + + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var orig_bounce = HMailItem.prototype.bounce; + HMailItem.prototype.bounce = function (err, opts) { + test.ok(true, 'found_mx(null, [{priority:0,exchange:""}]): bounce function called'); + test.equal('5.1.2', this.todo.rcpt_to[0].dsn_status, 'found_mx(null, [{priority:0,exchange:""}]): dsn status = 5.1.2'); + }; + HMailItem.prototype.found_mx.apply(mock_hmail, [null, [{priority:0,exchange:''}]]); + HMailItem.prototype.bounce = orig_bounce; + test.done(); + }); + }, + 'test try_deliver while hmail.mxlist=[] triggers bounce(...)': function (test) { + test.expect(2); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + mock_hmail.mxlist = []; + var orig_temp_fail = HMailItem.prototype.temp_fail; + HMailItem.prototype.temp_fail = function (err, opts) { + test.ok(true, 'try_deliver while hmail.mxlist=[]: temp_fail function called'); + test.equal('5.1.2', this.todo.rcpt_to[0].dsn_status, 'try_deliver while hmail.mxlist=[]: dsn status = 5.1.2'); + }; + HMailItem.prototype.try_deliver.apply(mock_hmail, []); + HMailItem.prototype.temp_fail = orig_temp_fail; + test.done(); + }); + }, + +} + diff --git a/tests/outbound_bounce_rfc3464.js b/tests/outbound_bounce_rfc3464.js new file mode 100644 index 000000000..6f09af581 --- /dev/null +++ b/tests/outbound_bounce_rfc3464.js @@ -0,0 +1,305 @@ +'use strict'; + +// Testing bounce email contents related to errors occuring during STMP dialog + +// About running the tests: +// - Making a folder for queuing files +// - Creating a HMailItem instance using fixtures/util_hmailitem +// - Talk some STMP in the playbook +// - Test the outcome by replacing trigger functions with our testing code (outbound.send_email, HMailItem.temp_fail, ...) +// At one point, the mocked remote SMTP says "5XX" or "4XX" and we test that +// * outbound.send_email is called with a RFC3464 bounce message +// * or, in case of 4XX: that temp_fail is called and dsn vars are available) + +require('../configfile').watch_files = false; +var fs = require('fs'); +var path = require('path'); +var util_hmailitem = require('./fixtures/util_hmailitem'); +var TODOItem = require('../outbound/todo'); +var HMailItem = require('../outbound/hmail'); +var outbound = require('../outbound'); +var mock_sock = require('./fixtures/line_socket'); + +var outbound_context = { + TODOItem: TODOItem, + exports: outbound +}; + +var queue_dir = path.resolve(__dirname, 'test-queue'); + +exports.bounce_3464 = { + setUp : function (done) { + fs.exists(queue_dir, function (exists) { + if (exists) { + done(); + } + else { + fs.mkdir(queue_dir, function (err) { + if (err) { + return done(err); + } + done(); + }); + } + }); + }, + tearDown: function (done) { + fs.exists(queue_dir, function (exists) { + if (exists) { + var files = fs.readdirSync(queue_dir); + files.forEach(function (file,index){ + var curPath = path.resolve(queue_dir, file); + if (fs.lstatSync(curPath).isDirectory()) { // recurse + return done(new Error('did not expect an sub folder here ("' + curPath + '")! cancel')); + } + }); + files.forEach(function (file,index){ + var curPath = path.resolve(queue_dir, file); + fs.unlinkSync(curPath); + }); + done(); + } + else { + done(); + } + }); + }, + 'test MAIL FROM responded with 500 5.0.0 triggers send_email() containing bounce msg with codes and message': function (test) { + test.expect(9); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var mock_socket = mock_sock.connect('testhost', 'testport'); + mock_socket.writable = true; + + var orig_send_email = outbound_context.exports.send_email; + outbound_context.exports.send_email = function (from, to, contents, cb, opts) { + test.ok(true, 'outbound.send_email called'); + test.ok(contents.match(/^Content-type: message\/delivery-status/m), 'its a bounce report'); + test.ok(contents.match(/^Final-Recipient: rfc822;recipient@domain/m), 'bounce report contains final recipient'); + test.ok(contents.match(/^Action: failed/m), 'DATA-5XX: bounce report contains action field'); + test.ok(contents.match(/^Status: 5\.0\.0/m), 'bounce report contains status field with ext. smtp code'); + test.ok(contents.match(/Absolutely not acceptable\. Basic Test Only\./), 'original upstream message available'); + outbound_context.exports.send_email = orig_send_email; + + test.done(); + }; + + // The playbook + // from remote: This line is to be sent (from an mocked remote SMTP) to haraka outbound. This is done in this test. + // from haraka: Expected answer from haraka-outbound to the mocked remote SMTP. + // 'test' can hold a function(line) returning true for success, or a string tested for equality + var testPlaybook = [ + // Haraka connects, we say first + { 'from': 'remote', 'line': '220 testing-smtp' }, + + { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, + { 'from': 'remote', 'line': '220-testing-smtp' }, + { 'from': 'remote', 'line': '220 8BITMIME' }, + + { 'from': 'haraka', 'test': 'MAIL FROM:' }, + { 'from': 'remote', 'line': '500 5.0.0 Absolutely not acceptable. Basic Test Only.' }, + + { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback + ]; + + util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { + + }); + }); + }, + 'test that early response of 3XX triggers temp_fail': function (test) { + test.expect(7); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var mock_socket = mock_sock.connect('testhost', 'testport'); + mock_socket.writable = true; + + var orig_temp_fail = HMailItem.prototype.temp_fail; + HMailItem.prototype.temp_fail = function (err, opts) { + test.ok(true, 'early-3XX: outbound.temp_fail called'); + test.equal('3.0.0', this.todo.rcpt_to[0].dsn_status, 'early-3XX: dsn status = 3.0.0'); + test.equal('delayed', this.todo.rcpt_to[0].dsn_action, 'early-3XX: dsn action = delayed'); + test.ok(this.todo.rcpt_to[0].dsn_smtp_response.match(/No time for you right now/), 'early-3XX: original upstream message available'); + HMailItem.prototype.temp_fail = orig_temp_fail; + test.done(); + }; + var testPlaybook = [ + { 'from': 'remote', 'line': '220 testing-smtp' }, + + { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, + { 'from': 'remote', 'line': '220-testing-smtp' }, + { 'from': 'remote', 'line': '220 8BITMIME' }, + + { 'from': 'haraka', 'test': 'MAIL FROM:' }, + { 'from': 'remote', 'line': '300 3.0.0 No time for you right now' }, + + { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback + ]; + + util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { + + }); + }); + }, + 'test that response of 4XX for RCPT-TO triggers temp_fail': function (test) { + test.expect(8); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var mock_socket = mock_sock.connect('testhost', 'testport'); + mock_socket.writable = true; + + var orig_temp_fail = HMailItem.prototype.temp_fail; + HMailItem.prototype.temp_fail = function (err, opts) { + test.ok(true, 'RCPT-TO-4XX: outbound.temp_fail called'); + test.equal('4.0.0', this.todo.rcpt_to[0].dsn_status, 'RCPT-TO-4XX: dsn status = 4.0.0'); + test.equal('delayed', this.todo.rcpt_to[0].dsn_action, 'RCPT-TO-4XX: dsn action = delayed'); + test.ok(this.todo.rcpt_to[0].dsn_smtp_response.match(/Currently not available\. Try again later\./), 'RCPT-TO-4XX: original upstream message available'); + HMailItem.prototype.temp_fail = orig_temp_fail; + test.done(); + }; + var testPlaybook = [ + { 'from': 'remote', 'line': '220 testing-smtp' }, + + { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, + { 'from': 'remote', 'line': '220-testing-smtp' }, + { 'from': 'remote', 'line': '220 8BITMIME' }, + + { 'from': 'haraka', 'test': 'MAIL FROM:' }, + { 'from': 'remote', 'line': '250 2.1.0 Ok' }, + + { 'from': 'haraka', 'test': 'RCPT TO:' }, + { 'from': 'remote', 'line': '400 4.0.0 Currently not available. Try again later.' }, + + { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback + ]; + + util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { + + }); + }); + }, + 'test that response of 4XX for DATA triggers temp_fail': function (test) { + test.expect(9); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var mock_socket = mock_sock.connect('testhost', 'testport'); + mock_socket.writable = true; + + var orig_temp_fail = HMailItem.prototype.temp_fail; + HMailItem.prototype.temp_fail = function (err, opts) { + test.ok(true, 'DATA-4XX: outbound.temp_fail called'); + test.equal('4.6.0', this.todo.rcpt_to[0].dsn_status, 'DATA-4XX: dsn status = 4.6.0'); + test.equal('delayed', this.todo.rcpt_to[0].dsn_action, 'DATA-4XX: dsn action = delayed'); + test.ok(this.todo.rcpt_to[0].dsn_smtp_response.match(/Currently I do not like ascii art cats\./), 'DATA-4XX: original upstream message available'); + HMailItem.prototype.temp_fail = orig_temp_fail; + test.done(); + }; + var testPlaybook = [ + { 'from': 'remote', 'line': '220 testing-smtp' }, + + { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, + { 'from': 'remote', 'line': '220-testing-smtp' }, + { 'from': 'remote', 'line': '220 8BITMIME' }, + + { 'from': 'haraka', 'test': 'MAIL FROM:' }, + { 'from': 'remote', 'line': '250 2.1.0 Ok' }, + + { 'from': 'haraka', 'test': 'RCPT TO:' }, + { 'from': 'remote', 'line': '250 2.1.5 Ok' }, + + { 'from': 'haraka', 'test': 'DATA' }, + // haraka will send us more lines + { 'from': 'remote', 'line': '450 4.6.0 Currently I do not like ascii art cats.' }, + + { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback + ]; + + util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { + + }); + }); + }, + 'test that response of 5XX for RCPT-TO triggers send_email() containing bounce msg with codes and message': function (test) { + test.expect(10); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var mock_socket = mock_sock.connect('testhost', 'testport'); + mock_socket.writable = true; + + var orig_send_email = outbound_context.exports.send_email; + outbound_context.exports.send_email = function (from, to, contents, cb, opts) { + test.ok(true, 'RCPT-TO-5XX: outbound.send_email called'); + test.ok(contents.match(/^Content-type: message\/delivery-status/m), 'RCPT-TO-5XX: its a bounce report'); + test.ok(contents.match(/^Final-Recipient: rfc822;recipient@domain/m), 'RCPT-TO-5XX: bounce report contains final recipient'); + test.ok(contents.match(/^Action: failed/m), 'DATA-5XX: bounce report contains action field'); + test.ok(contents.match(/^Status: 5\.1\.1/m), 'RCPT-TO-5XX: bounce report contains status field with our ext. smtp code'); + test.ok(contents.match(/Not available and will not come back/), 'RCPT-TO-5XX: original upstream message available'); + outbound_context.exports.send_email = orig_send_email; + test.done(); + }; + var testPlaybook = [ + { 'from': 'remote', 'line': '220 testing-smtp' }, + + { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, + { 'from': 'remote', 'line': '220-testing-smtp' }, + { 'from': 'remote', 'line': '220 8BITMIME' }, + + { 'from': 'haraka', 'test': 'MAIL FROM:' }, + { 'from': 'remote', 'line': '250 2.1.0 Ok' }, + + { 'from': 'haraka', 'test': 'RCPT TO:' }, + { 'from': 'remote', 'line': '550 5.1.1 Not available and will not come back' }, + + { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback + ]; + + util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { + + }); + }); + }, + 'test that response of 5XX for DATA triggers send_email() containing bounce msg with codes and message': function (test) { + test.expect(11); + + util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ + var mock_socket = mock_sock.connect('testhost', 'testport'); + mock_socket.writable = true; + + var orig_send_email = outbound_context.exports.send_email; + outbound_context.exports.send_email = function (from, to, contents, cb, opts) { + test.ok(true, 'DATA-5XX: outbound.send_email called'); + test.ok(contents.match(/^Content-type: message\/delivery-status/m), 'DATA-5XX: its a bounce report'); + test.ok(contents.match(/^Final-Recipient: rfc822;recipient@domain/m), 'DATA-5XX: bounce report contains final recipient'); + test.ok(contents.match(/^Action: failed/m), 'DATA-5XX: bounce report contains action field'); + test.ok(contents.match(/^Status: 5\.6\.0/m), 'DATA-5XX: bounce report contains status field with our ext. smtp code'); + test.ok(contents.match(/I never did and will like ascii art cats/), 'DATA-5XX: original upstream message available'); + outbound_context.exports.send_email = orig_send_email; + test.done(); + }; + var testPlaybook = [ + { 'from': 'remote', 'line': '220 testing-smtp' }, + + { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, + { 'from': 'remote', 'line': '220-testing-smtp' }, + { 'from': 'remote', 'line': '220 8BITMIME' }, + + { 'from': 'haraka', 'test': 'MAIL FROM:' }, + { 'from': 'remote', 'line': '250 2.1.0 Ok' }, + + { 'from': 'haraka', 'test': 'RCPT TO:' }, + { 'from': 'remote', 'line': '250 2.1.5 Ok' }, + + { 'from': 'haraka', 'test': 'DATA' }, + // haraka will send us more lines + { 'from': 'remote', 'line': '550 5.6.0 I never did and will like ascii art cats.' }, + + { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback + ]; + + util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { + + }); + }); + }, +} + diff --git a/tests/outbound_protocol.js b/tests/outbound_protocol.js deleted file mode 100644 index 52581c6cb..000000000 --- a/tests/outbound_protocol.js +++ /dev/null @@ -1,62 +0,0 @@ -'use strict'; - -var fs = require('fs'); -var path = require('path'); - -require('../configfile').watch_files = false; -var vm_harness = require('./fixtures/vm_harness'); - -var queue_dir = path.resolve(__dirname, 'test-queue'); - -var ensureTestQueueDirExists = function (done) { - fs.exists(queue_dir, function (exists) { - if (exists) { - done(); - } - else { - fs.mkdir(queue_dir, function (err) { - if (err) { - return done(err); - } - done(); - }); - } - }); -}; - -var removeTestQueueDir = function (done) { - fs.exists(queue_dir, function (exists) { - if (exists) { - var files = fs.readdirSync(queue_dir); - files.forEach(function (file,index){ - var curPath = path.resolve(queue_dir, file); - if (fs.lstatSync(curPath).isDirectory()) { // recurse - return done(new Error('did not expect an sub folder here ("' + curPath + '")! cancel')); - } - }); - files.forEach(function (file,index){ - var curPath = path.resolve(queue_dir, file); - fs.unlinkSync(curPath); - }); - done(); - } - else { - done(); - } - }); -}; - -exports.outbound_protocol_tests = { - setUp : ensureTestQueueDirExists, - tearDown : removeTestQueueDir, -}; - -vm_harness.add_tests( - path.join(__dirname, '..', 'outbound.js'), - path.join(__dirname, 'outbound_protocol/'), - exports.outbound_protocol_tests, - { - test_queue_dir: queue_dir, - process: process - } -); diff --git a/tests/outbound_protocol/basic_outbound_trial_test.js b/tests/outbound_protocol/basic_outbound_trial_test.js deleted file mode 100644 index 89e74a22f..000000000 --- a/tests/outbound_protocol/basic_outbound_trial_test.js +++ /dev/null @@ -1,85 +0,0 @@ -'use strict'; -/*eslint no-unused-vars: ["error", { "varsIgnorePattern": "queue_dir", "args": "none" }]*/ - -// This test file is executed by tests/outbound_protocol.js (see there) -// -// Important to understand the code: This is file is - for running the test - appended to outbound.js. - - -test.expect(4); - -// What is tested: -// A simple SMTP conversation is made -// At one point, the mocked remote SMTP says "5XX", and we test that the HMailItem.bounce function gets called - -// we copy over here "test_queue_dir" from vm-sandbox to the queue_dir back -// (queue_dir is outbound-private var introduced at the beginning of outbound.js) -var queue_dir = test_queue_dir; - -var util_hmailitem = require('./../fixtures/util_hmailitem'); -var mock_sock = require('./../fixtures/line_socket'); - -// create a dummy HMailItem for testing -util_hmailitem.createHMailItem( - { - TODOItem: TODOItem, - exports: exports, - }, // outbound context - { - - }, - function (err, hmail) { - if (err) { - test.ok(false, 'Could not create HMailItem: ' + err); - test.done(); - return; - } - runBasicSmtpConversation(hmail); - } -); - -HMailItem.prototype.bounce = function (err, opts) { - test.ok(true, 'HMail bounce called'); - test.done(); -} - -function runBasicSmtpConversation (hmail) { - if (!hmail.todo) { - hmail.once('ready', function () { - _runBasicSmtpConversation(hmail); - }); - } - else { - _runBasicSmtpConversation(hmail); - } -} -function _runBasicSmtpConversation (hmail) { - var mock_socket = mock_sock.connect('testhost', 'testport'); - mock_socket.writable = true; - mock_socket.__acquired = true; - - // The playbook - // from remote: This line is to be sent (from an mocked remote SMTP) to haraka outbound. This is done in this test. - // from haraka: Expected answer from haraka-outbound to the mocked remote SMTP. - // 'test' can hold a function(line) returning true for success, or a string tested for equality - var testPlaybook = [ - // Haraka connects, we say first - { 'from': 'remote', 'line': '220 testing-smtp' }, - - { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, - { 'from': 'remote', 'line': '220-testing-smtp' }, - { 'from': 'remote', 'line': '220 8BITMIME' }, - - { 'from': 'haraka', 'test': 'MAIL FROM:' }, - { 'from': 'remote', 'line': '500 5.0.0 Absolutely not acceptable. Basic Test Only.' }, - - { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback - ]; - - util_hmailitem.playTestSmtpConversation(hmail, mock_socket, test, testPlaybook, function () { - // test done covered in stubbed HMailItem.bounce - }); - -} - - diff --git a/tests/outbound_protocol/outbound_bounce_net_errors.js b/tests/outbound_protocol/outbound_bounce_net_errors.js deleted file mode 100644 index f96af40e5..000000000 --- a/tests/outbound_protocol/outbound_bounce_net_errors.js +++ /dev/null @@ -1,133 +0,0 @@ -'use strict'; -/*eslint no-unused-vars: ["error", { "varsIgnorePattern": "queue_dir", "args": "none" }]*/ - -test.expect(14); - -// What is tested: -// - get_mx plugin with DENY/DENYSOFT is simulated -// - found_mx with various errors is simulated -// - try_deliver with empty hmail.mxlist is called -// and it is tested that bounce/temp_fail gets called with DSN-params set - -// we copy over here "test_queue_dir" from vm-sandbox to the queue_dir back -// (queue_dir is outbound-private var introduced at the beginning of outbound.js) -var queue_dir = test_queue_dir; - -var util_hmailitem = require('./../fixtures/util_hmailitem'); -var async = require('async'); -var dns = require('dns'); -var constants = require('haraka-constants'); - - -var outbound_context = { - TODOItem: exports.TODOItem, - exports: exports, -}; - -async.series( - [ - // test get-mx-deny triggers bounce(...) - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var orig_bounce = HMailItem.prototype.bounce; - HMailItem.prototype.bounce = function (err, opts) { - test.ok(true, 'get_mx=DENY: bounce function called'); - /* dsn_code: 550, - dsn_status: '5.1.2', - dsn_action: 'failed' */ - test.equal('5.1.2', this.todo.rcpt_to[0].dsn_status, 'get_mx=DENY dsn status = 5.1.2'); - } - mock_hmail.domain = mock_hmail.todo.domain; - HMailItem.prototype.get_mx_respond.apply(mock_hmail, [constants.deny, {}]); - HMailItem.prototype.bounce = orig_bounce; - callback(null, 1); - }); - }, - // test get-mx-denysoft triggers temp_fail(...) - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var orig_temp_fail = HMailItem.prototype.temp_fail; - HMailItem.prototype.temp_fail = function (err, opts) { - test.ok(true, 'get_mx-DENYSOFT: temp_fail function called'); - /*dsn_code: 450, - dsn_status: '4.1.2', - dsn_action: 'delayed' */ - test.equal('4.1.2', this.todo.rcpt_to[0].dsn_status, 'get_mx=DENYSOFT dsn status = 4.1.2'); - } - mock_hmail.domain = mock_hmail.todo.domain; - HMailItem.prototype.get_mx_respond.apply(mock_hmail, [constants.denysoft, {}]); - HMailItem.prototype.temp_fail = orig_temp_fail; - callback(null, 1); - }); - }, - // test found_mx({code:dns.NXDOMAIN}) triggers bounce(...) - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var orig_bounce = HMailItem.prototype.bounce; - HMailItem.prototype.bounce = function (err, opts) { - test.ok(true, 'found_mx({code: dns.NXDOMAIN}): bounce function called'); - test.equal('5.1.2', this.todo.rcpt_to[0].dsn_status, 'found_mx({code: dns.NXDOMAIN}: dsn status = 5.1.2'); - } - HMailItem.prototype.found_mx.apply(mock_hmail, [{code: dns.NXDOMAIN}, {}]); - HMailItem.prototype.bounce = orig_bounce; - callback(null, 1); - }); - }, - // test found_mx({code:'NOMX'}) triggers bounce(...) - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var orig_bounce = HMailItem.prototype.bounce; - HMailItem.prototype.bounce = function (err, opts) { - test.ok(true, 'found_mx({code: "NOMX"}): bounce function called'); - test.equal('5.1.2', this.todo.rcpt_to[0].dsn_status, 'found_mx({code: "NOMX"}: dsn status = 5.1.2'); - } - HMailItem.prototype.found_mx.apply(mock_hmail, [{code: 'NOMX'}, {}]); - HMailItem.prototype.bounce = orig_bounce; - callback(null, 1); - }); - }, - // test found_mx({code:'SOME-OTHER-ERR'}) triggers temp_fail(...) - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var orig_temp_fail = HMailItem.prototype.temp_fail; - HMailItem.prototype.temp_fail = function (err, opts) { - test.ok(true, 'found_mx({code: "SOME-OTHER-ERR"}): temp_fail function called'); - test.equal('4.1.0', this.todo.rcpt_to[0].dsn_status, 'found_mx({code: "SOME-OTHER-ERR"}: dsn status = 4.1.0'); - } - HMailItem.prototype.found_mx.apply(mock_hmail, [{code: 'SOME-OTHER-ERR'}, {}]); - HMailItem.prototype.temp_fail = orig_temp_fail; - callback(null, 1); - }); - }, - // test found_mx(null, [{priority:0,exchange:''}]) triggers bounce(...) - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var orig_bounce = HMailItem.prototype.bounce; - HMailItem.prototype.bounce = function (err, opts) { - test.ok(true, 'found_mx(null, [{priority:0,exchange:""}]): bounce function called'); - test.equal('5.1.2', this.todo.rcpt_to[0].dsn_status, 'found_mx(null, [{priority:0,exchange:""}]): dsn status = 5.1.2'); - } - HMailItem.prototype.found_mx.apply(mock_hmail, [null, [{priority:0,exchange:''}]]); - HMailItem.prototype.bounce = orig_bounce; - callback(null, 1); - }); - }, - // test try_deliver while hmail.mxlist=[] triggers bounce(...) - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - mock_hmail.mxlist = []; - var orig_temp_fail = HMailItem.prototype.temp_fail; - HMailItem.prototype.temp_fail = function (err, opts) { - test.ok(true, 'try_deliver while hmail.mxlist=[]: temp_fail function called'); - test.equal('5.1.2', this.todo.rcpt_to[0].dsn_status, 'try_deliver while hmail.mxlist=[]: dsn status = 5.1.2'); - } - HMailItem.prototype.try_deliver.apply(mock_hmail, []); - HMailItem.prototype.temp_fail = orig_temp_fail; - callback(null, 1); - }); - }, - ], - function (err, results) { - test.done(); - } -); diff --git a/tests/outbound_protocol/outbound_bounce_rfc3464.js b/tests/outbound_protocol/outbound_bounce_rfc3464.js deleted file mode 100644 index 680fd5bb0..000000000 --- a/tests/outbound_protocol/outbound_bounce_rfc3464.js +++ /dev/null @@ -1,265 +0,0 @@ -'use strict'; -/*eslint no-unused-vars: ["error", { "varsIgnorePattern": "queue_dir", "args": "none" }]*/ - -test.expect(54); - -// What is tested: -// A simple SMTP conversation is made -// At one point, the mocked remote SMTP says "5XX" or "4XX" -// and we test that outbound.send_email is called with a RFC3464 bounce message -// (or, in case of 4XX: that temp_fail is called and dsn vars are available) - -// we copy over here "test_queue_dir" from vm-sandbox to the queue_dir back -// (queue_dir is outbound-private var introduced at the beginning of outbound.js) -var queue_dir = test_queue_dir; - -var util_hmailitem = require('./../fixtures/util_hmailitem'); -var mock_sock = require('./../fixtures/line_socket'); -var async = require('async'); - -var outbound_context = { - TODOItem: exports.TODOItem, - exports: exports, -}; - -async.series( - [ - // test that MAIL FROM responded with 500 5.0.0 triggers - // send_email() containing bounce msg with our codes and message - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var mock_socket = mock_sock.connect('testhost', 'testport'); - mock_socket.writable = true; - - var orig_send_email = exports.send_email; - exports.send_email = function (from, to, contents, cb, opts) { - test.ok(true, 'outbound.send_email called'); - test.ok(contents.match(/^Content-type: message\/delivery-status/m), 'its a bounce report'); - test.ok(contents.match(/^Final-Recipient: rfc822;recipient@domain/m), 'bounce report contains final recipient'); - test.ok(contents.match(/^Action: failed/m), 'DATA-5XX: bounce report contains action field'); - test.ok(contents.match(/^Status: 5\.0\.0/m), 'bounce report contains status field with our ext. smtp code'); - test.ok(contents.match(/Absolutely not acceptable\. Basic Test Only\./), 'original upstream message available'); - exports.send_email = orig_send_email; - callback(null, 1); - }; - - // The playbook - // from remote: This line is to be sent (from an mocked remote SMTP) to haraka outbound. This is done in this test. - // from haraka: Expected answer from haraka-outbound to the mocked remote SMTP. - // 'test' can hold a function(line) returning true for success, or a string tested for equality - var testPlaybook = [ - // Haraka connects, we say first - { 'from': 'remote', 'line': '220 testing-smtp' }, - - { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, - { 'from': 'remote', 'line': '220-testing-smtp' }, - { 'from': 'remote', 'line': '220 8BITMIME' }, - - { 'from': 'haraka', 'test': 'MAIL FROM:' }, - { 'from': 'remote', 'line': '500 5.0.0 Absolutely not acceptable. Basic Test Only.' }, - - { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback - ]; - - util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { - - }); - }); - }, - // test that early response of 3XX triggers temp_fail - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var mock_socket = mock_sock.connect('testhost', 'testport'); - mock_socket.writable = true; - - var orig_temp_fail = HMailItem.prototype.temp_fail; - HMailItem.prototype.temp_fail = function (err, opts) { - test.ok(true, 'early-3XX: outbound.temp_fail called'); - test.equal('3.0.0', this.todo.rcpt_to[0].dsn_status, 'early-3XX: dsn status = 3.0.0'); - test.equal('delayed', this.todo.rcpt_to[0].dsn_action, 'early-3XX: dsn action = delayed'); - test.ok(this.todo.rcpt_to[0].dsn_smtp_response.match(/No time for you right now/), 'early-3XX: original upstream message available'); - HMailItem.prototype.temp_fail = orig_temp_fail; - callback(null, 1); - }; - var testPlaybook = [ - { 'from': 'remote', 'line': '220 testing-smtp' }, - - { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, - { 'from': 'remote', 'line': '220-testing-smtp' }, - { 'from': 'remote', 'line': '220 8BITMIME' }, - - { 'from': 'haraka', 'test': 'MAIL FROM:' }, - { 'from': 'remote', 'line': '300 3.0.0 No time for you right now' }, - - { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback - ]; - - util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { - - }); - }); - }, - // test that response of 4XX for RCPT-TO triggers temp_fail - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var mock_socket = mock_sock.connect('testhost', 'testport'); - mock_socket.writable = true; - - var orig_temp_fail = HMailItem.prototype.temp_fail; - HMailItem.prototype.temp_fail = function (err, opts) { - test.ok(true, 'RCPT-TO-4XX: outbound.temp_fail called'); - test.equal('4.0.0', this.todo.rcpt_to[0].dsn_status, 'RCPT-TO-4XX: dsn status = 4.0.0'); - test.equal('delayed', this.todo.rcpt_to[0].dsn_action, 'RCPT-TO-4XX: dsn action = delayed'); - test.ok(this.todo.rcpt_to[0].dsn_smtp_response.match(/Currently not available\. Try again later\./), 'RCPT-TO-4XX: original upstream message available'); - HMailItem.prototype.temp_fail = orig_temp_fail; - callback(null, 1); - }; - var testPlaybook = [ - { 'from': 'remote', 'line': '220 testing-smtp' }, - - { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, - { 'from': 'remote', 'line': '220-testing-smtp' }, - { 'from': 'remote', 'line': '220 8BITMIME' }, - - { 'from': 'haraka', 'test': 'MAIL FROM:' }, - { 'from': 'remote', 'line': '250 2.1.0 Ok' }, - - { 'from': 'haraka', 'test': 'RCPT TO:' }, - { 'from': 'remote', 'line': '400 4.0.0 Currently not available. Try again later.' }, - - { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback - ]; - - util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { - - }); - }); - }, - - // test that response of 4XX for DATA triggers temp_fail - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var mock_socket = mock_sock.connect('testhost', 'testport'); - mock_socket.writable = true; - - var orig_temp_fail = HMailItem.prototype.temp_fail; - HMailItem.prototype.temp_fail = function (err, opts) { - test.ok(true, 'DATA-4XX: outbound.temp_fail called'); - test.equal('4.6.0', this.todo.rcpt_to[0].dsn_status, 'DATA-4XX: dsn status = 4.6.0'); - test.equal('delayed', this.todo.rcpt_to[0].dsn_action, 'DATA-4XX: dsn action = delayed'); - test.ok(this.todo.rcpt_to[0].dsn_smtp_response.match(/Currently I do not like ascii art cats\./), 'DATA-4XX: original upstream message available'); - HMailItem.prototype.temp_fail = orig_temp_fail; - callback(null, 1); - }; - var testPlaybook = [ - { 'from': 'remote', 'line': '220 testing-smtp' }, - - { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, - { 'from': 'remote', 'line': '220-testing-smtp' }, - { 'from': 'remote', 'line': '220 8BITMIME' }, - - { 'from': 'haraka', 'test': 'MAIL FROM:' }, - { 'from': 'remote', 'line': '250 2.1.0 Ok' }, - - { 'from': 'haraka', 'test': 'RCPT TO:' }, - { 'from': 'remote', 'line': '250 2.1.5 Ok' }, - - { 'from': 'haraka', 'test': 'DATA' }, - // haraka will send us more lines - { 'from': 'remote', 'line': '450 4.6.0 Currently I do not like ascii art cats.' }, - - { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback - ]; - - util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { - - }); - }); - }, - // test that response of 5XX for RCPT-TO triggers - // send_email() containing bounce msg with our codes and message - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var mock_socket = mock_sock.connect('testhost', 'testport'); - mock_socket.writable = true; - - var orig_send_email = exports.send_email; - exports.send_email = function (from, to, contents, cb, opts) { - test.ok(true, 'RCPT-TO-5XX: outbound.send_email called'); - test.ok(contents.match(/^Content-type: message\/delivery-status/m), 'RCPT-TO-5XX: its a bounce report'); - test.ok(contents.match(/^Final-Recipient: rfc822;recipient@domain/m), 'RCPT-TO-5XX: bounce report contains final recipient'); - test.ok(contents.match(/^Action: failed/m), 'DATA-5XX: bounce report contains action field'); - test.ok(contents.match(/^Status: 5\.1\.1/m), 'RCPT-TO-5XX: bounce report contains status field with our ext. smtp code'); - test.ok(contents.match(/Not available and will not come back/), 'RCPT-TO-5XX: original upstream message available'); - exports.send_email = orig_send_email; - callback(null, 1); - }; - var testPlaybook = [ - { 'from': 'remote', 'line': '220 testing-smtp' }, - - { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, - { 'from': 'remote', 'line': '220-testing-smtp' }, - { 'from': 'remote', 'line': '220 8BITMIME' }, - - { 'from': 'haraka', 'test': 'MAIL FROM:' }, - { 'from': 'remote', 'line': '250 2.1.0 Ok' }, - - { 'from': 'haraka', 'test': 'RCPT TO:' }, - { 'from': 'remote', 'line': '550 5.1.1 Not available and will not come back' }, - - { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback - ]; - - util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { - - }); - }); - }, - // test that response of 5XX for DATA triggers - // send_email() containing bounce msg with our codes and message - function (callback) { - util_hmailitem.newMockHMailItem(outbound_context, test, {}, function (mock_hmail){ - var mock_socket = mock_sock.connect('testhost', 'testport'); - mock_socket.writable = true; - - var orig_send_email = exports.send_email; - exports.send_email = function (from, to, contents, cb, opts) { - test.ok(true, 'DATA-5XX: outbound.send_email called'); - test.ok(contents.match(/^Content-type: message\/delivery-status/m), 'DATA-5XX: its a bounce report'); - test.ok(contents.match(/^Final-Recipient: rfc822;recipient@domain/m), 'DATA-5XX: bounce report contains final recipient'); - test.ok(contents.match(/^Action: failed/m), 'DATA-5XX: bounce report contains action field'); - test.ok(contents.match(/^Status: 5\.6\.0/m), 'DATA-5XX: bounce report contains status field with our ext. smtp code'); - test.ok(contents.match(/I never did and will like ascii art cats/), 'DATA-5XX: original upstream message available'); - exports.send_email = orig_send_email; - callback(null, 1); - }; - var testPlaybook = [ - { 'from': 'remote', 'line': '220 testing-smtp' }, - - { 'from': 'haraka', 'test': function (line) { return line.match(/^EHLO /); }, 'description': 'Haraka should say EHLO' }, - { 'from': 'remote', 'line': '220-testing-smtp' }, - { 'from': 'remote', 'line': '220 8BITMIME' }, - - { 'from': 'haraka', 'test': 'MAIL FROM:' }, - { 'from': 'remote', 'line': '250 2.1.0 Ok' }, - - { 'from': 'haraka', 'test': 'RCPT TO:' }, - { 'from': 'remote', 'line': '250 2.1.5 Ok' }, - - { 'from': 'haraka', 'test': 'DATA' }, - // haraka will send us more lines - { 'from': 'remote', 'line': '550 5.6.0 I never did and will like ascii art cats.' }, - - { 'from': 'haraka', 'test': 'RSET', end_test: true }, // this will trigger calling the callback - ]; - - util_hmailitem.playTestSmtpConversation(mock_hmail, mock_socket, test, testPlaybook, function () { - - }); - }); - }, - ], - function (err, results) { - test.done(); - } -);