diff --git a/README b/README index 2fb37e0..1050b3c 100644 --- a/README +++ b/README @@ -20,3 +20,21 @@ The typical flow of a mail (after-queue): 7. smtpd ('disconnected from') Both 6 and 7 are required before the mail can be printed completely. +6 and 7 can be logged in arbitrary order. + +The typical flow of a mail (before-queue): + +1. postscreen (NOQUEUE -> mail finished) +2. smtpd 1 (pid matching) +3. pmg-smtp-filter (rule system, accept/block, long (Q)ID matching) + - on accept match the QID +4. smtpd 2 (pid matching) +5. (optional, only on 'accept') cleanup -> qmgr (mail in queue, QID matching) +6. (optional, only on 'accept') smtp (QID matching) +7. (optional, only on 'accept') qmgr ('removed') +8. smtpd 2 ('disconnect from') +9. smtpd 1 (proxy-accept/proxy-reject, filter (Q)ID matching) +10. smtpd 1 ('disconnect from') + +7, 8 and 10 are required before the mail can be printed completely. +7, 8 and 10 can be logged in arbitrary order. diff --git a/src/main.rs b/src/main.rs index b864c3d..9cb96f0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -200,6 +200,17 @@ fn handle_pmg_smtp_filter_message(msg: &[u8], parser: &mut Parser, complete_line // add a ToEntry with the DStatus 'Accept' to the FEntry fe.borrow_mut() .add_accept(to, qid, parser.current_record_state.timestamp); + + // if there's a QEntry with the qid and it's not yet filtered + // set it to before-queue filtered + if let Some(qe) = parser.qentries.get(qid) { + if !qe.borrow().filtered { + qe.borrow_mut().bq_filtered = true; + qe.borrow_mut().filter = Some(Rc::clone(&fe)); + fe.borrow_mut().qentry = Some(Rc::downgrade(qe)); + } + } + return; } @@ -315,7 +326,8 @@ fn handle_postscreen_message(msg: &[u8], parser: &mut Parser, complete_line: &[u } // handle log entries for 'qmgr' -// these only appear in the 'after-queue filter' case +// these only appear in the 'after-queue filter' case or when the mail is +// 'accepted' in the 'before-queue filter' case fn handle_qmgr_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) { let (qid, data) = match parse_qid(msg, 15) { Some(t) => t, @@ -446,8 +458,25 @@ fn handle_lmtp_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) { // add a reference to the filter qe.borrow_mut().filtered = true; + + // if there's a FEntry with the filter QID, check to see if its + // qentry matches this one if let Some(fe) = parser.fentries.get(qid) { - qe.borrow_mut().filter = Some(Rc::downgrade(fe)); + qe.borrow_mut().filter = Some(Rc::clone(fe)); + let q = fe.borrow().qentry.clone(); + if let Some(q) = q { + if let Some(q) = q.upgrade() { + if !Rc::ptr_eq(&q, &qe) { + // QEntries don't match, set all flags to false and + // remove the referenced FEntry + q.borrow_mut().filtered = false; + q.borrow_mut().bq_filtered = false; + q.borrow_mut().filter = None; + // update FEntry's QEntry reference to the new one + fe.borrow_mut().qentry = Some(Rc::downgrade(&qe)); + } + } + } } } } @@ -483,6 +512,12 @@ fn handle_smtpd_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) { if se.borrow_mut().remove_unneeded_refs(parser) == 0 { // no QEntries referenced in SEntry so just print the SEntry se.borrow_mut().print(parser); + // free the referenced FEntry (only happens with before-queue) + if let Some(f) = &se.borrow().filter { + if let Some(f) = f.upgrade() { + parser.free_fentry(&f.borrow().logid); + } + } parser.free_sentry(se.borrow().pid); } else { se.borrow_mut().finalize_refs(parser); @@ -541,6 +576,94 @@ fn handle_smtpd_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) { return; } + // only happens with before-queue + // here we can match the pmg-smtp-filter + // 'proxy-accept' happens if it is accepted for AT LEAST ONE receiver + if msg.starts_with(b"proxy-accept: ") { + let data = &msg[14..]; + if !data.starts_with(b"END-OF-MESSAGE: ") { + return; + } + let data = &data[16..]; + if !data.starts_with(b"250 2.5.0 OK (") { + return; + } + let data = &data[14..]; + if let Some((qid, data)) = parse_qid(data, 25) { + let fe = get_or_create_fentry(&mut parser.fentries, qid); + // set the FEntry to before-queue filtered + fe.borrow_mut().is_bq = true; + // if there's no 'accept mail to' entry because of quarantine + // we have to match the pmg-smtp-filter here + // for 'accepted' mails it is matched in the 'accept mail to' + // log entry + if !fe.borrow().is_accepted { + // set the SEntry filter reference as we don't have a QEntry + // in this case + se.borrow_mut().filter = Some(Rc::downgrade(&fe)); + + if let Some(from_index) = find(data, b"from=<") { + let data = &data[from_index + 6..]; + let from_count = data.iter().take_while(|b| (**b as char) != '>').count(); + let from = &data[..from_count]; + // keep the from for later printing + // required for the correct 'TO:{}:{}...' syntax required + // by PMG/API2/MailTracker.pm + se.borrow_mut().bq_from = from.into(); + } + } else if let Some(qe) = &fe.borrow().qentry { + // mail is 'accepted', add a reference to the QEntry to the + // SEntry so we can wait for all to be finished before printing + if let Some(qe) = qe.upgrade() { + qe.borrow_mut().bq_sentry = Some(Rc::clone(&se)); + SEntry::add_ref(&se, &qe, true); + } + } + // specify that before queue filtering is used and the mail was + // accepted, but not necessarily by an 'accept' rule + // (e.g. quarantine) + se.borrow_mut().is_bq_accepted = true; + } + + return; + } + + // before queue filtering and rejected, here we can match the + // pmg-smtp-filter same as in the 'proxy-accept' case + // only happens if the mail was rejected for ALL receivers, otherwise + // a 'proxy-accept' happens + if msg.starts_with(b"proxy-reject: ") { + let data = &msg[14..]; + if !data.starts_with(b"END-OF-MESSAGE: ") { + return; + } + let data = &data[16..]; + if let Some(qid_index) = find(data, b"(") { + let data = &data[qid_index + 1..]; + if let Some((qid, data)) = parse_qid(data, 25) { + let fe = get_or_create_fentry(&mut parser.fentries, qid); + // set the FEntry to before-queue filtered + fe.borrow_mut().is_bq = true; + // we never have a QEntry in this case, so just set the SEntry + // filter reference + se.borrow_mut().filter = Some(Rc::downgrade(&fe)); + // specify that before queue filtering is used and the mail + // was rejected for all receivers + se.borrow_mut().is_bq_rejected = true; + + if let Some(from_index) = find(data, b"from=<") { + let data = &data[from_index + 6..]; + let from_count = data.iter().take_while(|b| (**b as char) != '>').count(); + let from = &data[..from_count]; + // same as for 'proxy-accept' above + se.borrow_mut().bq_from = from.into(); + } + } + } + + return; + } + // with none of the other messages matching, we try for a QID to match the // corresponding QEntry to the SEntry let (qid, data) = match parse_qid(msg, 15) { @@ -555,7 +678,7 @@ fn handle_smtpd_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) { qe.borrow_mut().string_match = parser.string_match; } - SEntry::add_ref(&se, &qe); + SEntry::add_ref(&se, &qe, false); if !data.starts_with(b"client=") { return; @@ -571,7 +694,8 @@ fn handle_smtpd_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) { } // handle log entries for 'cleanup' -// happens before the mail is passed to qmgr (after-queue only) +// happens before the mail is passed to qmgr (after-queue or before-queue +// accepted only) fn handle_cleanup_message(msg: &[u8], parser: &mut Parser, complete_line: &[u8]) { let (qid, data) = match parse_qid(msg, 15) { Some(t) => t, @@ -633,7 +757,7 @@ impl Default for ToEntry { } } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Copy, Clone)] enum DStatus { Invalid, Accept, @@ -665,16 +789,6 @@ impl std::fmt::Display for DStatus { } } -impl DStatus { - fn is_dsn(&self, value: Option) -> bool { - match (self, value) { - (DStatus::Dsn(v), Some(val)) => *v == val, - (DStatus::Dsn(_), None) => true, - _ => false, - } - } -} - #[derive(Debug, Default)] struct SEntry { log: Vec<(Box<[u8]>, u64)>, @@ -686,9 +800,18 @@ struct SEntry { refs: Vec>>, nq_entries: Vec, disconnected: bool, + // only set in case of before queue filtering + // used as a fallback in case no QEntry is referenced + filter: Option>>, string_match: bool, timestamp: u64, rel_line_nr: u64, + // before queue filtering with the mail accepted for at least one receiver + is_bq_accepted: bool, + // before queue filtering with the mail rejected for all receivers + is_bq_rejected: bool, + // from address saved for compatibility with after queue filtering + bq_from: Box<[u8]>, } impl SEntry { @@ -758,22 +881,13 @@ impl SEntry { { let mut found = false; for nq in self.nq_entries.iter_mut().rev() { - if !parser.options.from.is_empty() - && find_lowercase(&nq.from, parser.options.from.as_bytes()).is_none() - { - nq.dstatus = DStatus::Invalid; - } - - if parser.options.exclude_greylist && nq.dstatus == DStatus::Greylist { - nq.dstatus = DStatus::Invalid; - } - if parser.options.exclude_ndr && nq.from.is_empty() { - nq.dstatus = DStatus::Invalid; - } - - if !parser.options.to.is_empty() - && !nq.to.is_empty() - && find_lowercase(&nq.to, parser.options.to.as_bytes()).is_none() + if (!parser.options.from.is_empty() + && find_lowercase(&nq.from, parser.options.from.as_bytes()).is_none()) + || (parser.options.exclude_greylist && nq.dstatus == DStatus::Greylist) + || (parser.options.exclude_ndr && nq.from.is_empty()) + || (!parser.options.to.is_empty() + && !nq.to.is_empty() + && find_lowercase(&nq.to, parser.options.to.as_bytes()).is_none()) { nq.dstatus = DStatus::Invalid; } @@ -826,14 +940,66 @@ impl SEntry { } } - // if '-vv' is passed to the log tracker, print all the logs - if parser.options.verbose > 1 { - parser.write_all_ok(b"LOGS:\n"); - for (log, line) in self.log.iter() { + let print_filter_to_entries_fn = + |fe: &Rc>, + parser: &mut Parser, + se: &SEntry, + dstatus: Option| { + let mut dstatus = match dstatus { + Some(d) => d, + None => DStatus::Invalid, + }; + for to in fe.borrow().to_entries.iter().rev() { + if dstatus == DStatus::Invalid { + dstatus = to.dstatus; + } + parser.write_all_ok(format!( + "TO:{:X}:T{:08X}L{:08X}:{}: from <", + to.timestamp as i32, se.timestamp as i32, se.rel_line_nr, dstatus, + )); + parser.write_all_ok(&se.bq_from); + parser.write_all_ok(b"> to <"); + parser.write_all_ok(&to.to); + parser.write_all_ok(b">\n"); + parser.count += 1; + } + }; + + // only true in before queue filtering case + if let Some(fe) = &self.filter { + if let Some(fe) = fe.upgrade() { + // limited to !fe.is_accepted because otherwise we would have + // a QEntry with all required information instead + if fe.borrow().is_bq && !fe.borrow().is_accepted && self.is_bq_accepted { + print_filter_to_entries_fn(&fe, parser, self, None); + } else if fe.borrow().is_bq && !fe.borrow().is_accepted && self.is_bq_rejected { + print_filter_to_entries_fn(&fe, parser, self, Some(DStatus::Noqueue)); + } + } + } + + let print_log = |parser: &mut Parser, logs: &Vec<(Box<[u8]>, u64)>| { + for (log, line) in logs.iter() { parser.write_all_ok(format!("L{:08X} ", *line as u32)); parser.write_all_ok(log); parser.write_all_ok(b"\n"); } + }; + + // if '-vv' is passed to the log tracker, print all the logs + if parser.options.verbose > 1 { + parser.write_all_ok(b"LOGS:\n"); + let mut logs = self.log.clone(); + if let Some(f) = &self.filter { + if let Some(f) = f.upgrade() { + logs.append(&mut f.borrow().log.clone()); + // as the logs come from 1 SEntry and 1 FEntry, + // interleave them via sort based on line number + logs.sort_by(|a, b| a.1.cmp(&b.1)); + } + } + + print_log(parser, &logs); } parser.write_all_ok(b"\n"); } @@ -873,7 +1039,6 @@ impl SEntry { }); for q in to_delete.iter().rev() { - q.borrow_mut().smtpd = None; parser.free_qentry(&q.borrow().qid, Some(self)); } count @@ -895,8 +1060,20 @@ impl SEntry { let fe = &q.borrow().filter; if let Some(f) = fe { - if let Some(f) = f.upgrade() { - if !f.borrow().finished { + if !q.borrow().bq_filtered && !f.borrow().finished { + continue; + } + } + + if !self.is_bq_accepted && q.borrow().bq_sentry.is_some() { + if let Some(se) = &q.borrow().bq_sentry { + // we're already disconnected, but the SEntry referenced + // by the QEntry might not yet be done + if !se.borrow().disconnected { + // add a reference to the SEntry referenced by the + // QEntry so it gets deleted when both the SEntry + // and the QEntry is done + Self::add_ref(&se, &q, true); continue; } } @@ -907,24 +1084,23 @@ impl SEntry { for q in qentries.iter().rev() { q.borrow_mut().print(parser, Some(self)); - q.borrow_mut().smtpd = None; parser.free_qentry(&q.borrow().qid, Some(self)); if let Some(f) = &q.borrow().filter { - if let Some(f) = f.upgrade() { - parser.free_fentry(&f.borrow().logid); - } + parser.free_fentry(&f.borrow().logid); } } } - fn add_ref(sentry: &Rc>, qentry: &Rc>) { + fn add_ref(sentry: &Rc>, qentry: &Rc>, bq: bool) { let smtpd = qentry.borrow().smtpd.clone(); - if let Some(s) = smtpd { - if !Rc::ptr_eq(sentry, &s) { - eprintln!("Error: qentry ref already set"); + if !bq { + if let Some(s) = smtpd { + if !Rc::ptr_eq(sentry, &s) { + eprintln!("Error: qentry ref already set"); + } + return; } - return; } for q in sentry.borrow().refs.iter() { @@ -938,7 +1114,9 @@ impl SEntry { } sentry.borrow_mut().refs.push(Rc::downgrade(qentry)); - qentry.borrow_mut().smtpd = Some(Rc::clone(sentry)); + if !bq { + qentry.borrow_mut().smtpd = Some(Rc::clone(sentry)); + } } } @@ -946,7 +1124,7 @@ impl SEntry { struct QEntry { log: Vec<(Box<[u8]>, u64)>, smtpd: Option>>, - filter: Option>>, + filter: Option>>, qid: Box<[u8]>, from: Box<[u8]>, client: Box<[u8]>, @@ -957,6 +1135,9 @@ struct QEntry { removed: bool, filtered: bool, string_match: bool, + bq_filtered: bool, + // will differ from smtpd + bq_sentry: Option>>, } impl QEntry { @@ -980,13 +1161,17 @@ impl QEntry { return; } } + if let Some(s) = &self.bq_sentry { + if self.bq_filtered && !s.borrow().disconnected { + return; + } + } if let Some(fe) = self.filter.clone() { - if let Some(fe) = fe.upgrade() { - // verify that the attached FEntry is finished - if !fe.borrow().finished { - return; - } + // verify that the attached FEntry is finished if it is not + // before queue filtered + if !self.bq_filtered && !fe.borrow().finished { + return; } // if there's an SEntry, print with the SEntry @@ -1002,7 +1187,7 @@ impl QEntry { parser.free_qentry(&self.qid, None); } - if let Some(fe) = fe.upgrade() { + if !self.bq_filtered { parser.free_fentry(&fe.borrow().logid); } } else if let Some(s) = self.smtpd.clone() { @@ -1037,11 +1222,9 @@ impl QEntry { match m { Match::Qid(q) => { if let Some(f) = fe { - if let Some(f) = f.upgrade() { - if &f.borrow().logid == q { - found = true; - break; - } + if &f.borrow().logid == q { + found = true; + break; } } if &self.qid == q { @@ -1129,10 +1312,8 @@ impl QEntry { } } if let Some(f) = fe { - if let Some(f) = f.upgrade() { - if f.borrow().string_match { - string_match = true; - } + if f.borrow().string_match { + string_match = true; } } if !string_match { @@ -1142,55 +1323,78 @@ impl QEntry { true } - fn print(&mut self, parser: &mut Parser, se: Option<&SEntry>) { - let fe = self.filter.clone(); + // is_se_bq_sentry is true if the QEntry::bq_sentry is the same as passed + // into the print() function via reference + fn print_qentry_boilerplate( + &mut self, + parser: &mut Parser, + is_se_bq_sentry: bool, + se: Option<&SEntry>, + ) { + parser.write_all_ok(b"QENTRY: "); + parser.write_all_ok(&self.qid); + parser.write_all_ok(b"\n"); + parser.write_all_ok(format!("CTIME: {:8X}\n", parser.ctime)); + parser.write_all_ok(format!("SIZE: {}\n", self.size)); - if !self.msgid_matches(parser) { - return; - } - - if !self.match_list_matches(parser, se) { - return; - } - - if !self.host_matches(parser, se) { - return; - } - - if !self.from_to_matches(parser) { - return; - } - - if !self.string_matches(parser, se) { - return; - } - - if parser.options.verbose > 0 { - parser.write_all_ok(b"QENTRY: "); - parser.write_all_ok(&self.qid); + if !self.client.is_empty() { + parser.write_all_ok(b"CLIENT: "); + parser.write_all_ok(&self.client); parser.write_all_ok(b"\n"); - parser.write_all_ok(format!("CTIME: {:8X}\n", parser.ctime)); - parser.write_all_ok(format!("SIZE: {}\n", self.size)); - - if !self.client.is_empty() { - parser.write_all_ok(b"CLIENT: "); - parser.write_all_ok(&self.client); - parser.write_all_ok(b"\n"); - } else if let Some(s) = se { + } else if !is_se_bq_sentry { + if let Some(s) = se { if !s.connect.is_empty() { parser.write_all_ok(b"CLIENT: "); parser.write_all_ok(&s.connect); parser.write_all_ok(b"\n"); } } - - if !self.msgid.is_empty() { - parser.write_all_ok(b"MSGID: "); - parser.write_all_ok(&self.msgid); + } else if let Some(s) = &self.smtpd { + if !s.borrow().connect.is_empty() { + parser.write_all_ok(b"CLIENT: "); + parser.write_all_ok(&s.borrow().connect); parser.write_all_ok(b"\n"); } } + if !self.msgid.is_empty() { + parser.write_all_ok(b"MSGID: "); + parser.write_all_ok(&self.msgid); + parser.write_all_ok(b"\n"); + } + } + + fn print(&mut self, parser: &mut Parser, se: Option<&SEntry>) { + let fe = self.filter.clone(); + + if !self.msgid_matches(parser) + || !self.match_list_matches(parser, se) + || !self.host_matches(parser, se) + || !self.from_to_matches(parser) + || !self.string_matches(parser, se) + { + return; + } + + // necessary so we do not attempt to mutable borrow it a second time + // which will panic + let is_se_bq_sentry = match (&self.bq_sentry, se) { + (Some(s), Some(se)) => std::ptr::eq(s.as_ptr(), se), + _ => false, + }; + + if is_se_bq_sentry { + if let Some(s) = &se { + if !s.disconnected { + return; + } + } + } + + if parser.options.verbose > 0 { + self.print_qentry_boilerplate(parser, is_se_bq_sentry, se); + } + // rev() to match the C code iteration direction (linked list vs Vec) for to in self.to_entries.iter().rev() { if !to.to.is_empty() { @@ -1200,9 +1404,9 @@ impl QEntry { // if status == success and there's a filter attached that has // a matching 'to' in one of the ToEntries, set the ToEntry to // the one in the filter - if to.dstatus.is_dsn(Some(2)) { + if to.dstatus == DStatus::Dsn(2) { if let Some(f) = &fe { - if let Some(f) = f.upgrade() { + if !self.bq_filtered || (f.borrow().finished && f.borrow().is_bq) { final_rc = f; final_borrow = final_rc.borrow(); for to2 in final_borrow.to_entries.iter().rev() { @@ -1222,7 +1426,14 @@ impl QEntry { parser.write_all_ok(b"> to <"); parser.write_all_ok(&final_to.to); parser.write_all_ok(b"> ("); - parser.write_all_ok(&final_to.relay); + // if we use the relay from the filter ToEntry, it will be + // marked 'is_relay' in PMG/API2/MailTracker.pm and not shown + // in the GUI in the case of before queue filtering + if !self.bq_filtered { + parser.write_all_ok(&final_to.relay); + } else { + parser.write_all_ok(&to.relay); + } parser.write_all_ok(b")\n"); parser.count += 1; } @@ -1237,22 +1448,42 @@ impl QEntry { parser.write_all_ok(b"\n"); } }; - - if let Some(s) = se { - if !s.log.is_empty() { + if !is_se_bq_sentry { + if let Some(s) = se { + let mut logs = s.log.clone(); + if let Some(bq_se) = &self.bq_sentry { + logs.append(&mut bq_se.borrow().log.clone()); + // as the logs come from 2 different SEntries, + // interleave them via sort based on line number + logs.sort_by(|a, b| a.1.cmp(&b.1)); + } + if !logs.is_empty() { + parser.write_all_ok(b"SMTP:\n"); + print_log(parser, &logs); + } + } + } else if let Some(s) = &self.smtpd { + let mut logs = s.borrow().log.clone(); + if let Some(se) = se { + logs.append(&mut se.log.clone()); + // as the logs come from 2 different SEntries, + // interleave them via sort based on line number + logs.sort_by(|a, b| a.1.cmp(&b.1)); + } + if !logs.is_empty() { parser.write_all_ok(b"SMTP:\n"); - print_log(parser, &s.log); + print_log(parser, &logs); } } if let Some(f) = fe { - if let Some(f) = f.upgrade() { - if !f.borrow().log.is_empty() { - parser.write_all_ok(format!("FILTER: {}\n", unsafe { - std::str::from_utf8_unchecked(&f.borrow().logid) - })); - print_log(parser, &f.borrow().log); - } + if (!self.bq_filtered || (f.borrow().finished && f.borrow().is_bq)) + && !f.borrow().log.is_empty() + { + parser.write_all_ok(format!("FILTER: {}\n", unsafe { + std::str::from_utf8_unchecked(&f.borrow().logid) + })); + print_log(parser, &f.borrow().log); } } @@ -1279,6 +1510,9 @@ struct FEntry { processing_time: Box<[u8]>, string_match: bool, finished: bool, + is_accepted: bool, + qentry: Option>>, + is_bq: bool, } impl FEntry { @@ -1290,6 +1524,7 @@ impl FEntry { timestamp, }; self.to_entries.push(te); + self.is_accepted = true; } fn add_quarantine(&mut self, to: &[u8], qid: &[u8], timestamp: u64) {