From 87be2c19e3c99e0465441123f0d796aed2f1f674 Mon Sep 17 00:00:00 2001 From: Thomas Lamprecht Date: Mon, 18 Nov 2019 12:18:59 +0100 Subject: [PATCH] ext. metric: move to a transaction model Signed-off-by: Thomas Lamprecht --- PVE/ExtMetric.pm | 81 ++++++++++++++++++++++++++++++++++++++--- PVE/Service/pvestatd.pm | 20 +++++++--- PVE/Status/Graphite.pm | 51 +++++++++++--------------- PVE/Status/InfluxDB.pm | 34 +++++------------ PVE/Status/Plugin.pm | 26 ++++++++----- 5 files changed, 139 insertions(+), 73 deletions(-) diff --git a/PVE/ExtMetric.pm b/PVE/ExtMetric.pm index 342dc281..c5559073 100644 --- a/PVE/ExtMetric.pm +++ b/PVE/ExtMetric.pm @@ -14,22 +14,93 @@ PVE::Status::Plugin->init(); sub foreach_plug($&) { my ($status_cfg, $code) = @_; - for my $plugin_config (values %{$status_cfg->{ids}}) { + for my $id (sort keys %{$status_cfg->{ids}}) { + my $plugin_config = $status_cfg->{ids}->{$id}; next if $plugin_config->{disable}; + my $plugin = PVE::Status::Plugin->lookup($plugin_config->{type}); - $code->($plugin, $plugin_config); + $code->($plugin, $id, $plugin_config); } } sub update_all($$@) { - my ($cfg, $subsystem, @params) = @_; + my ($transactions, $subsystem, @params) = @_; my $method = "update_${subsystem}_status"; + my (undef, $fn, $line, $subr) = caller(1); + for my $txn (@$transactions) { + my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type}); + + $plugin->$method($txn, @params); + + if (length($txn->{data}) > 48000) { + # UDP stack cannot handle messages > 65k, if we've alot of data we + # do smaller batch sends then, but keep the connection alive + transaction_flush($txn, 1); + } + } +} + +# must return a transaction hash with the format: +# { +# cfg => $plugin_config, +# connection => ..., # the connected socket +# data => '', # payload, will be sent at the trannsaction flush +# } +my $transactions; +sub transactions_start { + my ($cfg) = @_; + + @$transactions = (); + foreach_plug($cfg, sub { - my ($plugin, $plugin_config) = @_; - $plugin->$method($plugin_config, @params); + my ($plugin, $id, $plugin_config) = @_; + + my $connection = $plugin->_connect($plugin_config); + + push @$transactions, { + connection => $connection, + cfg => $plugin_config, + id => $id, + data => '', + }; }); + + return $transactions; +} + +sub transaction_flush { + my ($txn, $keepconnected) = @_; + + if (!$txn->{connection}) { + return if !$txn->{data}; # OK, if data was already sent/flushed + die "cannot flush metric data, no connection available!\n"; + } + return if !defined($txn->{data}) || $txn->{data} eq ''; + + my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type}); + + my $data = delete $txn->{data}; + eval { $plugin->send($txn->{connection}, $data) }; + my $senderr = $@; + + if (!$keepconnected) { + $plugin->_disconnect($txn->{connection}); + $txn->{connection} = undef; + # avoid log spam, already got a send error; disconnect would fail too + warn "disconnect failed: $@" if $@ && !$senderr; + } + die "metrics send error '$txn->{id}': $senderr" if $senderr; +}; + +sub transactions_finish { + my ($transactions) = @_; + + for my $txn (@$transactions) { + eval { transaction_flush($txn) }; + warn "$@" if $@; + } } 1; diff --git a/PVE/Service/pvestatd.pm b/PVE/Service/pvestatd.pm index 012d05f7..a50f3499 100755 --- a/PVE/Service/pvestatd.pm +++ b/PVE/Service/pvestatd.pm @@ -126,7 +126,9 @@ sub update_node_status { $node_metric->{cpustat}->@{qw(avg1 avg5 avg15)} = ($avg1, $avg5, $avg15); $node_metric->{cpustat}->{cpus} = $maxcpu; - PVE::ExtMetric::update_all($status_cfg, 'node', $nodename, $node_metric, $ctime); + my $transactions = PVE::ExtMetric::transactions_start($status_cfg); + PVE::ExtMetric::update_all($transactions, 'node', $nodename, $node_metric, $ctime); + PVE::ExtMetric::transactions_finish($transactions); } sub auto_balloning { @@ -161,12 +163,12 @@ sub update_qemu_status { my ($status_cfg) = @_; my $ctime = time(); - my $vmstatus = PVE::QemuServer::vmstatus(undef, 1); eval { auto_balloning($vmstatus); }; syslog('err', "auto ballooning error: $@") if $@; + my $transactions = PVE::ExtMetric::transactions_start($status_cfg); foreach my $vmid (keys %$vmstatus) { my $d = $vmstatus->{$vmid}; my $data; @@ -184,8 +186,10 @@ sub update_qemu_status { } PVE::Cluster::broadcast_rrd("pve2.3-vm/$vmid", $data); - PVE::ExtMetric::update_all($status_cfg, 'qemu', $vmid, $d, $ctime, $nodename); + PVE::ExtMetric::update_all($transactions, 'qemu', $vmid, $d, $ctime, $nodename); } + + PVE::ExtMetric::transactions_finish($transactions); } sub remove_stale_lxc_consoles { @@ -359,6 +363,8 @@ sub update_lxc_status { my $ctime = time(); my $vmstatus = PVE::LXC::vmstatus(); + my $transactions = PVE::ExtMetric::transactions_start($status_cfg); + foreach my $vmid (keys %$vmstatus) { my $d = $vmstatus->{$vmid}; my $template = $d->{template} ? $d->{template} : "0"; @@ -378,8 +384,9 @@ sub update_lxc_status { } PVE::Cluster::broadcast_rrd("pve2.3-vm/$vmid", $data); - PVE::ExtMetric::update_all($status_cfg, 'lxc', $vmid, $d, $ctime, $nodename); + PVE::ExtMetric::update_all($transactions, 'lxc', $vmid, $d, $ctime, $nodename); } + PVE::ExtMetric::transactions_finish($transactions); } sub update_storage_status { @@ -389,6 +396,8 @@ sub update_storage_status { my $ctime = time(); my $info = PVE::Storage::storage_info($cfg); + my $transactions = PVE::ExtMetric::transactions_start($status_cfg); + foreach my $storeid (keys %$info) { my $d = $info->{$storeid}; next if !$d->{active}; @@ -398,8 +407,9 @@ sub update_storage_status { my $key = "pve2-storage/${nodename}/$storeid"; PVE::Cluster::broadcast_rrd($key, $data); - PVE::ExtMetric::update_all($status_cfg, 'storage', $nodename, $storeid, $d, $ctime); + PVE::ExtMetric::update_all($transactions, 'storage', $nodename, $storeid, $d, $ctime); } + PVE::ExtMetric::transactions_finish($transactions); } sub rotate_authkeys { diff --git a/PVE/Status/Graphite.pm b/PVE/Status/Graphite.pm index a0cec1c7..04542cad 100644 --- a/PVE/Status/Graphite.pm +++ b/PVE/Status/Graphite.pm @@ -32,13 +32,15 @@ sub properties { }, timeout => { type => 'integer', - description => "graphite tcp socket timeout (default=1)", + description => "graphite TCP socket timeout (default=1)", + minimum => 0, + default => 1, optional => 1 }, proto => { type => 'string', enum => ['udp', 'tcp'], - description => "send graphite data using tcp or udp (default)", + description => "Protocol to send graphite data. TCP or UDP (default)", optional => 1, }, }; @@ -57,27 +59,27 @@ sub options { # Plugin implementation sub update_node_status { - my ($class, $plugin_config, $node, $data, $ctime) = @_; + my ($class, $txn, $node, $data, $ctime) = @_; - write_graphite_hash($plugin_config, $data, $ctime, "nodes.$node"); + assemble($txn, $data, $ctime, "nodes.$node"); } sub update_qemu_status { - my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; - write_graphite_hash($plugin_config, $data, $ctime, "qemu.$vmid"); + my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_; + assemble($txn, $data, $ctime, "qemu.$vmid"); } sub update_lxc_status { - my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; + my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_; - write_graphite_hash($plugin_config, $data, $ctime, "lxc.$vmid"); + assemble($txn, $data, $ctime, "lxc.$vmid"); } sub update_storage_status { - my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_; + my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_; - write_graphite_hash($plugin_config, $data, $ctime, "storages.$nodename.$storeid"); + assemble($txn, $data, $ctime, "storages.$nodename.$storeid"); } sub _connect { @@ -105,21 +107,11 @@ sub _connect { return $carbon_socket; } -sub write_graphite_hash { - my ($plugin_config, $d, $ctime, $object) = @_; +sub assemble { + my ($txn, $data, $ctime, $object) = @_; - my $path = $plugin_config->{path} // 'proxmox'; - - my $carbon_socket = __PACKAGE__->_connect($plugin_config); - - write_graphite($carbon_socket, $d, $ctime, $path.".$object"); - - $carbon_socket->close() if $carbon_socket; - -} - -sub write_graphite { - my ($carbon_socket, $d, $ctime, $path) = @_; + my $path = $txn->{cfg}->{path} // 'proxmox'; + $path .= ".$object"; # we do not want boolean/state information to export to graphite my $key_blacklist = { @@ -129,13 +121,14 @@ sub write_graphite { 'serial' => 1, }; - my $graphite_data = ''; + $txn->{data} //= ''; my $assemble_graphite_data; $assemble_graphite_data = sub { my ($metric, $path) = @_; for my $key (sort keys %$metric) { - my $value = $d->{$key} // next; + my $value = $data->{$key}; + next if !defined($value); $key =~ s/\./-/g; my $metricpath = $path . ".$key"; @@ -143,13 +136,11 @@ sub write_graphite { if (ref($value) eq 'HASH') { $assemble_graphite_data->($value, $metricpath); } elsif ($value =~ m/^[+-]?[0-9]*\.?[0-9]+$/ && !$key_blacklist->{$key}) { - $graphite_data .= "$metricpath $value $ctime\n"; + $txn->{data} .= "$metricpath $value $ctime\n"; } } }; - $assemble_graphite_data->($d, $path); - - $carbon_socket->send($graphite_data) if $graphite_data ne ''; + $assemble_graphite_data->($data, $path); } PVE::JSONSchema::register_format('graphite-path', \&pve_verify_graphite_path); diff --git a/PVE/Status/InfluxDB.pm b/PVE/Status/InfluxDB.pm index f02c8854..21949400 100644 --- a/PVE/Status/InfluxDB.pm +++ b/PVE/Status/InfluxDB.pm @@ -33,16 +33,15 @@ sub options { # Plugin implementation sub update_node_status { - my ($class, $plugin_config, $node, $data, $ctime) = @_; + my ($class, $txn, $node, $data, $ctime) = @_; $ctime *= 1000000000; - write_influxdb_hash($plugin_config, $data, $ctime, "object=nodes,host=$node"); - + build_influxdb_payload(\$txn->{data}, $data, $ctime, "object=nodes,host=$node"); } sub update_qemu_status { - my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; + my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_; $ctime *= 1000000000; @@ -51,11 +50,12 @@ sub update_qemu_status { $object .= ",host=$data->{name}"; } $object =~ s/\s/\\ /g; - write_influxdb_hash($plugin_config, $data, $ctime, $object); + + build_influxdb_payload(\$txn->{data}, $data, $ctime, $object); } sub update_lxc_status { - my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; + my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_; $ctime *= 1000000000; @@ -65,11 +65,11 @@ sub update_lxc_status { } $object =~ s/\s/\\ /g; - write_influxdb_hash($plugin_config, $data, $ctime, $object); + build_influxdb_payload(\$txn->{data}, $data, $ctime, $object); } sub update_storage_status { - my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_; + my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_; $ctime *= 1000000000; @@ -79,7 +79,7 @@ sub update_storage_status { } $object =~ s/\s/\\ /g; - write_influxdb_hash($plugin_config, $data, $ctime, $object); + build_influxdb_payload(\$txn->{data}, $data, $ctime, $object); } sub _connect { @@ -97,20 +97,6 @@ sub _connect { return $socket; } -sub write_influxdb_hash { - my ($plugin_config, $d, $ctime, $tags) = @_; - - my $payload = {}; - - build_influxdb_payload($payload, $d, $ctime, $tags); - - my $socket = __PACKAGE__->_connect($plugin_config); - - $socket->send($payload->{string}); - - $socket->close() if $socket; -} - sub build_influxdb_payload { my ($payload, $data, $ctime, $tags, $measurement, $instance) = @_; @@ -144,7 +130,7 @@ sub build_influxdb_payload { my $tagstring = $tags; $tagstring .= ",instance=$instance" if defined($instance); my $valuestr = join(',', @values); - $payload->{string} .= "$mm,$tagstring $valuestr $ctime\n"; + $$payload .= "$mm,$tagstring $valuestr $ctime\n"; } } diff --git a/PVE/Status/Plugin.pm b/PVE/Status/Plugin.pm index 59be30d6..402c5b4a 100644 --- a/PVE/Status/Plugin.pm +++ b/PVE/Status/Plugin.pm @@ -57,31 +57,39 @@ sub parse_section_header { sub _connect { my ($class, $cfg) = @_; - die "please implement inside plugin"; } -sub update_node_status { - my ($class, $plugin_config, $node, $data, $ctime) = @_; +sub _disconnect { + my ($class, $connection) = @_; + $connection->close(); # overwrite if not a simple socket +} + +sub send { + my ($class, $connection, $data) = @_; + + defined($connection->send($data)) + or die "failed to send metrics: $!\n"; +} + +sub update_node_status { + my ($class, $txn, $node, $data, $ctime) = @_; die "please implement inside plugin"; } sub update_qemu_status { - my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; - + my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_; die "please implement inside plugin"; } sub update_lxc_status { - my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; - + my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_; die "please implement inside plugin"; } sub update_storage_status { - my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_; - + my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_; die "please implement inside plugin"; }