metric server: improve flush on big data updates

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Thomas Lamprecht 2020-05-08 17:15:44 +02:00
parent 7788262afb
commit 5c77a34f08
4 changed files with 65 additions and 49 deletions

View File

@ -28,17 +28,10 @@ sub update_all($$@) {
my $method = "update_${subsystem}_status";
my (undef, $fn, $line, $subr) = caller(1);
for my $txn (@$transactions) {
my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
$plugin->$method($txn, @params);
if (length($txn->{data}) > 48000) {
# UDP stack cannot handle messages > 65k, if we've alot of data we
# do smaller batch sends then, but keep the connection alive
transaction_flush($txn, 1);
}
}
}
@ -69,36 +62,20 @@ sub transactions_start {
return $transactions;
}
sub transaction_flush {
my ($txn, $keepconnected) = @_;
if (!$txn->{connection}) {
return if !$txn->{data}; # OK, if data was already sent/flushed
die "cannot flush metric data, no connection available!\n";
}
return if !defined($txn->{data}) || $txn->{data} eq '';
my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
my $data = delete $txn->{data};
eval { $plugin->send($txn->{connection}, $data) };
my $senderr = $@;
if (!$keepconnected) {
$plugin->_disconnect($txn->{connection});
$txn->{connection} = undef;
# avoid log spam, already got a send error; disconnect would fail too
warn "disconnect failed: $@" if $@ && !$senderr;
}
die "metrics send error '$txn->{id}': $senderr" if $senderr;
};
sub transactions_finish {
my ($transactions) = @_;
for my $txn (@$transactions) {
eval { transaction_flush($txn) };
warn "$@" if $@;
my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
eval { $plugin->flush_data($txn) };
my $flush_err = $@;
warn "$flush_err" if $flush_err;
$plugin->_disconnect($txn->{connection});
$txn->{connection} = undef;
# avoid log spam, already got a send error; disconnect would fail too
warn "disconnect failed: $@" if $@ && !$flush_err;
}
}

View File

@ -61,25 +61,26 @@ sub options {
sub update_node_status {
my ($class, $txn, $node, $data, $ctime) = @_;
assemble($txn, $data, $ctime, "nodes.$node");
return assemble($class, $txn, $data, $ctime, "nodes.$node");
}
sub update_qemu_status {
my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
assemble($txn, $data, $ctime, "qemu.$vmid");
return assemble($class, $txn, $data, $ctime, "qemu.$vmid");
}
sub update_lxc_status {
my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
assemble($txn, $data, $ctime, "lxc.$vmid");
return assemble($class, $txn, $data, $ctime, "lxc.$vmid");
}
sub update_storage_status {
my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
assemble($txn, $data, $ctime, "storages.$nodename.$storeid");
return assemble($class, $txn, $data, $ctime, "storages.$nodename.$storeid");
}
sub _connect {
@ -108,7 +109,7 @@ sub _connect {
}
sub assemble {
my ($txn, $data, $ctime, $object) = @_;
my ($class, $txn, $data, $ctime, $object) = @_;
my $path = $txn->{cfg}->{path} // 'proxmox';
$path .= ".$object";
@ -121,7 +122,6 @@ sub assemble {
'serial' => 1,
};
$txn->{data} //= '';
my $assemble_graphite_data;
$assemble_graphite_data = sub {
my ($metric, $path) = @_;
@ -136,7 +136,7 @@ sub assemble {
if (ref($value) eq 'HASH') {
$assemble_graphite_data->($value, $metricpath);
} elsif ($value =~ m/^[+-]?[0-9]*\.?[0-9]+$/ && !$key_blacklist->{$key}) {
$txn->{data} .= "$metricpath $value $ctime\n";
$class->add_metric_data($txn, "$metricpath $value $ctime\n");
}
}
};

View File

@ -5,6 +5,7 @@ use warnings;
use POSIX qw(isnan isinf);
use Scalar::Util 'looks_like_number';
use IO::Socket::IP;
use PVE::SafeSyslog;
@ -37,7 +38,7 @@ sub update_node_status {
$ctime *= 1000000000;
build_influxdb_payload(\$txn->{data}, $data, $ctime, "object=nodes,host=$node");
build_influxdb_payload($class, $txn, $data, $ctime, "object=nodes,host=$node");
}
sub update_qemu_status {
@ -51,7 +52,7 @@ sub update_qemu_status {
}
$object =~ s/\s/\\ /g;
build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
build_influxdb_payload($class, $txn, $data, $ctime, $object);
}
sub update_lxc_status {
@ -65,7 +66,7 @@ sub update_lxc_status {
}
$object =~ s/\s/\\ /g;
build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
build_influxdb_payload($class, $txn, $data, $ctime, $object);
}
sub update_storage_status {
@ -79,7 +80,7 @@ sub update_storage_status {
}
$object =~ s/\s/\\ /g;
build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
build_influxdb_payload($class, $txn, $data, $ctime, $object);
}
sub _connect {
@ -94,11 +95,13 @@ sub _connect {
Proto => 'udp',
) || die "couldn't create influxdb socket [$host]:$port - $@\n";
$socket->blocking(0);
return $socket;
}
sub build_influxdb_payload {
my ($payload, $data, $ctime, $tags, $measurement, $instance) = @_;
my ($class, $txn, $data, $ctime, $tags, $measurement, $instance) = @_;
my @values = ();
@ -116,9 +119,9 @@ sub build_influxdb_payload {
# value is a hash
if (!defined($measurement)) {
build_influxdb_payload($payload, $value, $ctime, $tags, $key);
build_influxdb_payload($class, $txn, $value, $ctime, $tags, $key);
} elsif(!defined($instance)) {
build_influxdb_payload($payload, $value, $ctime, $tags, $measurement, $key);
build_influxdb_payload($class, $txn, $value, $ctime, $tags, $measurement, $key);
} else {
push @values, get_recursive_values($value);
}
@ -129,8 +132,8 @@ sub build_influxdb_payload {
my $mm = $measurement // 'system';
my $tagstring = $tags;
$tagstring .= ",instance=$instance" if defined($instance);
my $valuestr = join(',', @values);
$$payload .= "$mm,$tagstring $valuestr $ctime\n";
my $valuestr = join(',', @values);
$class->add_metric_data($txn, "$mm,$tagstring $valuestr $ctime\n");
}
}

View File

@ -66,6 +66,42 @@ sub _disconnect {
$connection->close(); # overwrite if not a simple socket
}
# UDP cannot do more than 64k at once. Overwrite for different protocol limits.
sub _send_batch_size {
my ($class, $cfg) = @_;
return 48000;
}
# call with the smalles $data chunks possible
sub add_metric_data {
my ($class, $txn, $data) = @_;
return if !defined($data);
my $batch_size = $class->_send_batch_size();
my $data_length = length($data) // 0;
my $dataq_len = length($txn->{data}) // 0;
if ($dataq_len > ($batch_size / 2) && ($dataq_len + $data_length) > $batch_size) {
$class->flush_data($txn);
}
$txn->{data} //= '';
$txn->{data} .= "$data";
}
sub flush_data {
my ($class, $txn) = @_;
if (!$txn->{connection}) {
return if !$txn->{data}; # OK, if data was already sent/flushed
die "cannot flush metric data, no connection available!\n";
}
return if !defined($txn->{data}) || $txn->{data} eq '';
my $data = delete $txn->{data};
eval { $class->send($txn->{connection}, $data) };
die "metrics send error '$txn->{id}': $@" if $@;
}
sub send {
my ($class, $connection, $data) = @_;