#!/usr/bin/perl -w use strict; use Getopt::Long qw(GetOptions :config no_ignore_case); use Time::HiRes qw(gettimeofday tv_interval); my %opt = ( host => 'localhost', port => 30783, conc => 2, max => 0, ); GetOptions(\%opt, qw(host|h=s port|p=i conc|concurrency|c=i max|m=i)); die "usage:\n\t$0 list of mboxes\n" unless @ARGV; my (@mboxes, $curr_mbox, $mbox_fh) = @ARGV; #my $all_all = 0; #for my $f (@ARGV) { # my $mbox = Mail::MboxParser->new($f) or die; # $mbox->make_index; # push @mboxes, $mbox; # print 'mbox ' . $mbox->nmsgs() . "\t$f\n"; # $all_all += $mbox->nmsgs; #} use IO::Socket::INET6; use IO::Multiplex; my @sockets; my %conn = ( PeerAddr => $opt{host}, PeerPort => $opt{port}, ); my $mux = IO::Multiplex->new; $mux->set_callback_object(__PACKAGE__); my $msgs = 0; my $tempfoo; my $start = [gettimeofday]; while ($mux->handles < $opt{conc} && new_conn()) { ##warn ~~ $mux->handles(); die if $mux->handles > $opt{conc}; } $mux->loop; my $howlong = tv_interval($start); my $hour = int($howlong / 3600); my $min = int(($howlong % 3600) / 60); my $sec = $howlong % 60; printf "parsed %d messages in %02d:%02d:%02d (%s s), %.4f msgs/s (%.0f msgs/min, %.0f msgs/h)\n", $msgs, $hour, $min, $sec, $howlong, $msgs / $howlong, $msgs * 60 / $howlong, $msgs * 60 * 60 / $howlong; #sleep 1; sub new_conn { my $message = next_message() or return; die 'handles: ' . $mux->handles if $mux->handles > $opt{conc}; return if $opt{max} && $msgs >= $opt{max}; ++$msgs; # return 1 unless ++$tempfoo >= 6800; #die "'$$message'"; my $s = IO::Socket::INET6->new(%conn) or die; $mux->add($s) or die; my $spamc = Spamc->new(id => $msgs, s => $s, start => [gettimeofday],); $mux->set_callback_object($spamc, $s); $mux->set_timeout($s, 20); $mux->write($s, "SYMBOLS SPAMC/1.9\r\n" . 'Content-length: ' . length($$message) . "\r\n\r\n" . $$message) or die; 1; } sub next_message { local $/ = "\nFrom "; if ($curr_mbox && !eof $mbox_fh) { my $msg = tell $mbox_fh ? <$mbox_fh> : 'From ' . <$mbox_fh>; $msg =~ s/\r?\n(?:From )?$//; # (?:...) is for last message return \$msg; } else { # end of mbox or first one return unless @mboxes; # end $curr_mbox = shift @mboxes; close $mbox_fh if $mbox_fh; open $mbox_fh, '<', $curr_mbox or die "open $curr_mbox: $!"; return next_message(); # ;-> } } package Spamc; sub mux_input { my ($self, $mux, $fh, $in) = @_; my $ret = $self->parse($in); if (defined $ret) { main::new_conn(); if ($ret) { # ok (my $body = $self->{body}) =~ y/\r\n/ /s; $self->{headers}->{spam} =~ /^([TF])\S+\s*;\s*(-?[\d.]+)\s*\/\s*([\d.]+)\b/ or die "bad Spam header: '$self->{headers}->{spam}'"; printf "%-8s %5s %1s %4s/%3s %s\n", Time::HiRes::tv_interval($self->{start}), ($self->{id} ? $self->{id} : '(wtf)'), $1, $2, $3, $body; } else { warn 'fail for ', ($self->{id} ? $self->{id} : '(wtf)'), ": $self->{rcode} $self->{rmsg}\n"; $mux->kill_output($fh); } $fh->close; # are both needed? $mux->close($fh); } # undef $$in; # $mux->close($fh); } sub mux_timeout { my $self = shift; my $mux = shift; warn "timeout for $self->{id}\n"; $mux->close($self->{s}); } sub new { my $class = shift; bless {@_}, $class; } sub parse { my $self = shift; my $in = ref $_[0] ? $_[0] : \$_[0]; my $ret; while ($$in =~ /\n/ or defined $self->{body} && (length $$in || $self->{headers}->{content_length} == 0)) { $ret = !defined $self->{banner} ? $self->banner($in) : !defined $self->{body} ? $self->headers($in) : $self->body($in); return $ret if defined $ret; } undef; } sub banner { my $self = shift; my $in = shift; if ($$in =~ s/^SPAMD\/(\d\.\d)\s+(\d+)\s+([^\r\n]+)\r?\n//) { (@{$self}{qw(sver rcode rmsg)}) = ($1, $2, $3); $self->{banner}++; } else { warn "unparseable input from spamd: '$$in'"; return 0; } if ($self->{rcode} != 0) { # warn "fail: $self->{rcode} $self->{rmsg}\n"; return 0; } undef; } sub headers { my $self = shift; my $in = shift; die "blah" unless $self->{banner}; while ($$in =~ s/^([a-z\d_-]+):\s+([^\r\n]+)\r?\n//i) { my ($h, $v) = ($1, $2); $h =~ y/A-Z-/a-z_/; $self->{headers}->{$h} = $v; } die "content-length not numeric" if defined $self->{headers}->{content_length} && $self->{headers}->{content_length} !~ /^\d+$/; if ($$in =~ s/^\r?\n//) { $self->{body} = ''; unless ($self->{headers}->{spam}) { warn "no Spam header", keys %{ $self->{headers} }; return 0; } unless (defined $self->{headers}->{content_length}) { warn "Content-length is required"; return 0; } } elsif ($$in =~ /\n/) { warn "bad header '$$in'"; return 0; } undef; } sub body { my $self = shift; my $in = shift; die "fubar" unless $self->{banner} && $self->{headers} && defined $self->{body}; $self->{body} .= $$in; $$in = ''; if (defined(my $l = $self->{headers}->{content_length})) { if (length $self->{body} == $l) { return 1; } elsif (length $self->{body} > $l) { warn "body too long"; return 0; } } else { return 1 if $self->{body} =~ /\n/; # only good for one line output } undef; } #sub DESTROY { my $self = shift; warn "DESTROY $self->{id}"; } 1;