ext. metric: move to a transaction model

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Thomas Lamprecht 2019-11-18 12:18:59 +01:00
parent 83cab72f62
commit 87be2c19e3
5 changed files with 139 additions and 73 deletions

View File

@ -14,22 +14,93 @@ PVE::Status::Plugin->init();
sub foreach_plug($&) { sub foreach_plug($&) {
my ($status_cfg, $code) = @_; my ($status_cfg, $code) = @_;
for my $plugin_config (values %{$status_cfg->{ids}}) { for my $id (sort keys %{$status_cfg->{ids}}) {
my $plugin_config = $status_cfg->{ids}->{$id};
next if $plugin_config->{disable}; next if $plugin_config->{disable};
my $plugin = PVE::Status::Plugin->lookup($plugin_config->{type}); my $plugin = PVE::Status::Plugin->lookup($plugin_config->{type});
$code->($plugin, $plugin_config); $code->($plugin, $id, $plugin_config);
} }
} }
sub update_all($$@) { sub update_all($$@) {
my ($cfg, $subsystem, @params) = @_; my ($transactions, $subsystem, @params) = @_;
my $method = "update_${subsystem}_status"; my $method = "update_${subsystem}_status";
my (undef, $fn, $line, $subr) = caller(1);
for my $txn (@$transactions) {
my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
$plugin->$method($txn, @params);
if (length($txn->{data}) > 48000) {
# UDP stack cannot handle messages > 65k, if we've alot of data we
# do smaller batch sends then, but keep the connection alive
transaction_flush($txn, 1);
}
}
}
# must return a transaction hash with the format:
# {
# cfg => $plugin_config,
# connection => ..., # the connected socket
# data => '', # payload, will be sent at the trannsaction flush
# }
my $transactions;
sub transactions_start {
my ($cfg) = @_;
@$transactions = ();
foreach_plug($cfg, sub { foreach_plug($cfg, sub {
my ($plugin, $plugin_config) = @_; my ($plugin, $id, $plugin_config) = @_;
$plugin->$method($plugin_config, @params);
my $connection = $plugin->_connect($plugin_config);
push @$transactions, {
connection => $connection,
cfg => $plugin_config,
id => $id,
data => '',
};
}); });
return $transactions;
}
sub transaction_flush {
my ($txn, $keepconnected) = @_;
if (!$txn->{connection}) {
return if !$txn->{data}; # OK, if data was already sent/flushed
die "cannot flush metric data, no connection available!\n";
}
return if !defined($txn->{data}) || $txn->{data} eq '';
my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
my $data = delete $txn->{data};
eval { $plugin->send($txn->{connection}, $data) };
my $senderr = $@;
if (!$keepconnected) {
$plugin->_disconnect($txn->{connection});
$txn->{connection} = undef;
# avoid log spam, already got a send error; disconnect would fail too
warn "disconnect failed: $@" if $@ && !$senderr;
}
die "metrics send error '$txn->{id}': $senderr" if $senderr;
};
sub transactions_finish {
my ($transactions) = @_;
for my $txn (@$transactions) {
eval { transaction_flush($txn) };
warn "$@" if $@;
}
} }
1; 1;

View File

@ -126,7 +126,9 @@ sub update_node_status {
$node_metric->{cpustat}->@{qw(avg1 avg5 avg15)} = ($avg1, $avg5, $avg15); $node_metric->{cpustat}->@{qw(avg1 avg5 avg15)} = ($avg1, $avg5, $avg15);
$node_metric->{cpustat}->{cpus} = $maxcpu; $node_metric->{cpustat}->{cpus} = $maxcpu;
PVE::ExtMetric::update_all($status_cfg, 'node', $nodename, $node_metric, $ctime); my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
PVE::ExtMetric::update_all($transactions, 'node', $nodename, $node_metric, $ctime);
PVE::ExtMetric::transactions_finish($transactions);
} }
sub auto_balloning { sub auto_balloning {
@ -161,12 +163,12 @@ sub update_qemu_status {
my ($status_cfg) = @_; my ($status_cfg) = @_;
my $ctime = time(); my $ctime = time();
my $vmstatus = PVE::QemuServer::vmstatus(undef, 1); my $vmstatus = PVE::QemuServer::vmstatus(undef, 1);
eval { auto_balloning($vmstatus); }; eval { auto_balloning($vmstatus); };
syslog('err', "auto ballooning error: $@") if $@; syslog('err', "auto ballooning error: $@") if $@;
my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
foreach my $vmid (keys %$vmstatus) { foreach my $vmid (keys %$vmstatus) {
my $d = $vmstatus->{$vmid}; my $d = $vmstatus->{$vmid};
my $data; my $data;
@ -184,8 +186,10 @@ sub update_qemu_status {
} }
PVE::Cluster::broadcast_rrd("pve2.3-vm/$vmid", $data); PVE::Cluster::broadcast_rrd("pve2.3-vm/$vmid", $data);
PVE::ExtMetric::update_all($status_cfg, 'qemu', $vmid, $d, $ctime, $nodename); PVE::ExtMetric::update_all($transactions, 'qemu', $vmid, $d, $ctime, $nodename);
} }
PVE::ExtMetric::transactions_finish($transactions);
} }
sub remove_stale_lxc_consoles { sub remove_stale_lxc_consoles {
@ -359,6 +363,8 @@ sub update_lxc_status {
my $ctime = time(); my $ctime = time();
my $vmstatus = PVE::LXC::vmstatus(); my $vmstatus = PVE::LXC::vmstatus();
my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
foreach my $vmid (keys %$vmstatus) { foreach my $vmid (keys %$vmstatus) {
my $d = $vmstatus->{$vmid}; my $d = $vmstatus->{$vmid};
my $template = $d->{template} ? $d->{template} : "0"; my $template = $d->{template} ? $d->{template} : "0";
@ -378,8 +384,9 @@ sub update_lxc_status {
} }
PVE::Cluster::broadcast_rrd("pve2.3-vm/$vmid", $data); PVE::Cluster::broadcast_rrd("pve2.3-vm/$vmid", $data);
PVE::ExtMetric::update_all($status_cfg, 'lxc', $vmid, $d, $ctime, $nodename); PVE::ExtMetric::update_all($transactions, 'lxc', $vmid, $d, $ctime, $nodename);
} }
PVE::ExtMetric::transactions_finish($transactions);
} }
sub update_storage_status { sub update_storage_status {
@ -389,6 +396,8 @@ sub update_storage_status {
my $ctime = time(); my $ctime = time();
my $info = PVE::Storage::storage_info($cfg); my $info = PVE::Storage::storage_info($cfg);
my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
foreach my $storeid (keys %$info) { foreach my $storeid (keys %$info) {
my $d = $info->{$storeid}; my $d = $info->{$storeid};
next if !$d->{active}; next if !$d->{active};
@ -398,8 +407,9 @@ sub update_storage_status {
my $key = "pve2-storage/${nodename}/$storeid"; my $key = "pve2-storage/${nodename}/$storeid";
PVE::Cluster::broadcast_rrd($key, $data); PVE::Cluster::broadcast_rrd($key, $data);
PVE::ExtMetric::update_all($status_cfg, 'storage', $nodename, $storeid, $d, $ctime); PVE::ExtMetric::update_all($transactions, 'storage', $nodename, $storeid, $d, $ctime);
} }
PVE::ExtMetric::transactions_finish($transactions);
} }
sub rotate_authkeys { sub rotate_authkeys {

View File

@ -32,13 +32,15 @@ sub properties {
}, },
timeout => { timeout => {
type => 'integer', type => 'integer',
description => "graphite tcp socket timeout (default=1)", description => "graphite TCP socket timeout (default=1)",
minimum => 0,
default => 1,
optional => 1 optional => 1
}, },
proto => { proto => {
type => 'string', type => 'string',
enum => ['udp', 'tcp'], enum => ['udp', 'tcp'],
description => "send graphite data using tcp or udp (default)", description => "Protocol to send graphite data. TCP or UDP (default)",
optional => 1, optional => 1,
}, },
}; };
@ -57,27 +59,27 @@ sub options {
# Plugin implementation # Plugin implementation
sub update_node_status { sub update_node_status {
my ($class, $plugin_config, $node, $data, $ctime) = @_; my ($class, $txn, $node, $data, $ctime) = @_;
write_graphite_hash($plugin_config, $data, $ctime, "nodes.$node"); assemble($txn, $data, $ctime, "nodes.$node");
} }
sub update_qemu_status { sub update_qemu_status {
my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
write_graphite_hash($plugin_config, $data, $ctime, "qemu.$vmid"); assemble($txn, $data, $ctime, "qemu.$vmid");
} }
sub update_lxc_status { sub update_lxc_status {
my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
write_graphite_hash($plugin_config, $data, $ctime, "lxc.$vmid"); assemble($txn, $data, $ctime, "lxc.$vmid");
} }
sub update_storage_status { sub update_storage_status {
my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_; my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
write_graphite_hash($plugin_config, $data, $ctime, "storages.$nodename.$storeid"); assemble($txn, $data, $ctime, "storages.$nodename.$storeid");
} }
sub _connect { sub _connect {
@ -105,21 +107,11 @@ sub _connect {
return $carbon_socket; return $carbon_socket;
} }
sub write_graphite_hash { sub assemble {
my ($plugin_config, $d, $ctime, $object) = @_; my ($txn, $data, $ctime, $object) = @_;
my $path = $plugin_config->{path} // 'proxmox'; my $path = $txn->{cfg}->{path} // 'proxmox';
$path .= ".$object";
my $carbon_socket = __PACKAGE__->_connect($plugin_config);
write_graphite($carbon_socket, $d, $ctime, $path.".$object");
$carbon_socket->close() if $carbon_socket;
}
sub write_graphite {
my ($carbon_socket, $d, $ctime, $path) = @_;
# we do not want boolean/state information to export to graphite # we do not want boolean/state information to export to graphite
my $key_blacklist = { my $key_blacklist = {
@ -129,13 +121,14 @@ sub write_graphite {
'serial' => 1, 'serial' => 1,
}; };
my $graphite_data = ''; $txn->{data} //= '';
my $assemble_graphite_data; my $assemble_graphite_data;
$assemble_graphite_data = sub { $assemble_graphite_data = sub {
my ($metric, $path) = @_; my ($metric, $path) = @_;
for my $key (sort keys %$metric) { for my $key (sort keys %$metric) {
my $value = $d->{$key} // next; my $value = $data->{$key};
next if !defined($value);
$key =~ s/\./-/g; $key =~ s/\./-/g;
my $metricpath = $path . ".$key"; my $metricpath = $path . ".$key";
@ -143,13 +136,11 @@ sub write_graphite {
if (ref($value) eq 'HASH') { if (ref($value) eq 'HASH') {
$assemble_graphite_data->($value, $metricpath); $assemble_graphite_data->($value, $metricpath);
} elsif ($value =~ m/^[+-]?[0-9]*\.?[0-9]+$/ && !$key_blacklist->{$key}) { } elsif ($value =~ m/^[+-]?[0-9]*\.?[0-9]+$/ && !$key_blacklist->{$key}) {
$graphite_data .= "$metricpath $value $ctime\n"; $txn->{data} .= "$metricpath $value $ctime\n";
} }
} }
}; };
$assemble_graphite_data->($d, $path); $assemble_graphite_data->($data, $path);
$carbon_socket->send($graphite_data) if $graphite_data ne '';
} }
PVE::JSONSchema::register_format('graphite-path', \&pve_verify_graphite_path); PVE::JSONSchema::register_format('graphite-path', \&pve_verify_graphite_path);

View File

@ -33,16 +33,15 @@ sub options {
# Plugin implementation # Plugin implementation
sub update_node_status { sub update_node_status {
my ($class, $plugin_config, $node, $data, $ctime) = @_; my ($class, $txn, $node, $data, $ctime) = @_;
$ctime *= 1000000000; $ctime *= 1000000000;
write_influxdb_hash($plugin_config, $data, $ctime, "object=nodes,host=$node"); build_influxdb_payload(\$txn->{data}, $data, $ctime, "object=nodes,host=$node");
} }
sub update_qemu_status { sub update_qemu_status {
my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
$ctime *= 1000000000; $ctime *= 1000000000;
@ -51,11 +50,12 @@ sub update_qemu_status {
$object .= ",host=$data->{name}"; $object .= ",host=$data->{name}";
} }
$object =~ s/\s/\\ /g; $object =~ s/\s/\\ /g;
write_influxdb_hash($plugin_config, $data, $ctime, $object);
build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
} }
sub update_lxc_status { sub update_lxc_status {
my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
$ctime *= 1000000000; $ctime *= 1000000000;
@ -65,11 +65,11 @@ sub update_lxc_status {
} }
$object =~ s/\s/\\ /g; $object =~ s/\s/\\ /g;
write_influxdb_hash($plugin_config, $data, $ctime, $object); build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
} }
sub update_storage_status { sub update_storage_status {
my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_; my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
$ctime *= 1000000000; $ctime *= 1000000000;
@ -79,7 +79,7 @@ sub update_storage_status {
} }
$object =~ s/\s/\\ /g; $object =~ s/\s/\\ /g;
write_influxdb_hash($plugin_config, $data, $ctime, $object); build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
} }
sub _connect { sub _connect {
@ -97,20 +97,6 @@ sub _connect {
return $socket; return $socket;
} }
sub write_influxdb_hash {
my ($plugin_config, $d, $ctime, $tags) = @_;
my $payload = {};
build_influxdb_payload($payload, $d, $ctime, $tags);
my $socket = __PACKAGE__->_connect($plugin_config);
$socket->send($payload->{string});
$socket->close() if $socket;
}
sub build_influxdb_payload { sub build_influxdb_payload {
my ($payload, $data, $ctime, $tags, $measurement, $instance) = @_; my ($payload, $data, $ctime, $tags, $measurement, $instance) = @_;
@ -144,7 +130,7 @@ sub build_influxdb_payload {
my $tagstring = $tags; my $tagstring = $tags;
$tagstring .= ",instance=$instance" if defined($instance); $tagstring .= ",instance=$instance" if defined($instance);
my $valuestr = join(',', @values); my $valuestr = join(',', @values);
$payload->{string} .= "$mm,$tagstring $valuestr $ctime\n"; $$payload .= "$mm,$tagstring $valuestr $ctime\n";
} }
} }

View File

@ -57,31 +57,39 @@ sub parse_section_header {
sub _connect { sub _connect {
my ($class, $cfg) = @_; my ($class, $cfg) = @_;
die "please implement inside plugin"; die "please implement inside plugin";
} }
sub update_node_status { sub _disconnect {
my ($class, $plugin_config, $node, $data, $ctime) = @_; my ($class, $connection) = @_;
$connection->close(); # overwrite if not a simple socket
}
sub send {
my ($class, $connection, $data) = @_;
defined($connection->send($data))
or die "failed to send metrics: $!\n";
}
sub update_node_status {
my ($class, $txn, $node, $data, $ctime) = @_;
die "please implement inside plugin"; die "please implement inside plugin";
} }
sub update_qemu_status { sub update_qemu_status {
my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
die "please implement inside plugin"; die "please implement inside plugin";
} }
sub update_lxc_status { sub update_lxc_status {
my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_; my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
die "please implement inside plugin"; die "please implement inside plugin";
} }
sub update_storage_status { sub update_storage_status {
my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_; my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
die "please implement inside plugin"; die "please implement inside plugin";
} }