pve-manager/PVE/Status/InfluxDB.pm
Markus Frank 6ea437b237 fix #3815: metrics: influxdb: coerce guest name always to string
InfluxDB interprets the guest name '66601' as a number and the guest
name 'vm42' as a String. This leads to problematic metrics, that will
be dropped by influxdb. Whichever comes first decides how the
"schema" is defined.

To change that add a $to_quote hashmap to define which value
shouldn't get interpreted as number by always send it quoted.

For now only quote 'name'.

Note, that while the 'nodename' and 'host' properties would have the
same problem, they are tags in InfluxDB which are always interpreted
as strings:
https://docs.influxdata.com/influxdb/v2.1/reference/syntax/line-protocol/

Signed-off-by: Markus Frank <m.frank@proxmox.com>
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
(cherry picked from commit 988150664900d447e7bdb5d0e23edadb4bd82f35)
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
2022-03-14 10:44:20 +01:00

407 lines
9.6 KiB
Perl

package PVE::Status::InfluxDB;
use strict;
use warnings;
use POSIX qw(isnan isinf);
use Scalar::Util 'looks_like_number';
use IO::Socket::IP;
use LWP::UserAgent;
use HTTP::Request;
use PVE::SafeSyslog;
use PVE::Status::Plugin;
use base('PVE::Status::Plugin');
sub type {
return 'influxdb';
}
sub properties {
return {
organization => {
description => "The InfluxDB organization. Only necessary when using the http v2 api."
." Has no meaning when using v2 compatibility api.",
type => 'string',
optional => 1,
},
bucket => {
description => "The InfluxDB bucket/db. Only necessary when using the http v2 api.",
type => 'string',
optional => 1,
},
token => {
description => "The InfluxDB access token. Only necessary when using the http v2 api."
." If the v2 compatibility api is used, use 'user:password' instead.",
type => 'string',
optional => 1,
},
'api-path-prefix' => {
description => "An API path prefix inserted between '<host>:<port>/' and '/api2/'."
." Can be useful if the InfluxDB service runs behind a reverse proxy.",
type => 'string',
optional => 1,
},
influxdbproto => {
type => 'string',
enum => ['udp', 'http', 'https'],
default => 'udp',
optional => 1,
},
'max-body-size' => {
description => "InfluxDB max-body-size in bytes. Requests are batched up to this size.",
type => 'integer',
minimum => 1,
default => 25_000_000,
}
};
}
sub options {
return {
server => {},
port => {},
mtu => { optional => 1 },
disable => { optional => 1 },
organization => { optional => 1},
bucket => { optional => 1},
token => { optional => 1},
influxdbproto => { optional => 1},
timeout => { optional => 1},
'max-body-size' => { optional => 1 },
'api-path-prefix' => { optional => 1 },
};
}
# Plugin implementation
sub update_node_status {
my ($class, $txn, $node, $data, $ctime) = @_;
$ctime *= 1000000000;
build_influxdb_payload($class, $txn, $data, $ctime, "object=nodes,host=$node");
}
sub update_qemu_status {
my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
$ctime *= 1000000000;
my $object = "object=qemu,vmid=$vmid,nodename=$nodename";
if($data->{name} && $data->{name} ne '') {
$object .= ",host=$data->{name}";
}
$object =~ s/\s/\\ /g;
# VMID is already added in base $object above, so exclude it from being re-added
build_influxdb_payload($class, $txn, $data, $ctime, $object, { 'vmid' => 1 });
}
sub update_lxc_status {
my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
$ctime *= 1000000000;
my $object = "object=lxc,vmid=$vmid,nodename=$nodename";
if($data->{name} && $data->{name} ne '') {
$object .= ",host=$data->{name}";
}
$object =~ s/\s/\\ /g;
# VMID is already added in base $object above, so exclude it from being re-added
build_influxdb_payload($class, $txn, $data, $ctime, $object, { 'vmid' => 1 });
}
sub update_storage_status {
my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
$ctime *= 1000000000;
my $object = "object=storages,nodename=$nodename,host=$storeid";
if($data->{type} && $data->{type} ne '') {
$object .= ",type=$data->{type}";
}
$object =~ s/\s/\\ /g;
build_influxdb_payload($class, $txn, $data, $ctime, $object);
}
sub _send_batch_size {
my ($class, $cfg) = @_;
my $proto = $cfg->{influxdbproto} // 'udp';
if ($proto ne 'udp') {
return $cfg->{'max-body-size'} // 25_000_000;
}
return $class->SUPER::_send_batch_size($cfg);
}
sub send {
my ($class, $connection, $data, $cfg) = @_;
my $proto = $cfg->{influxdbproto} // 'udp';
if ($proto eq 'udp') {
return $class->SUPER::send($connection, $data, $cfg);
} elsif ($proto =~ m/^https?$/) {
my $ua = LWP::UserAgent->new();
$ua->timeout($cfg->{timeout} // 1);
$connection->content($data);
my $response = $ua->request($connection);
if (!$response->is_success) {
my $err = $response->status_line;
die "$err\n";
}
} else {
die "invalid protocol\n";
}
return;
}
sub _disconnect {
my ($class, $connection, $cfg) = @_;
my $proto = $cfg->{influxdbproto} // 'udp';
if ($proto eq 'udp') {
return $class->SUPER::_disconnect($connection, $cfg);
}
return;
}
sub _get_v2url {
my ($cfg, $api_path) = @_;
my ($proto, $host, $port) = $cfg->@{qw(influxdbproto server port)};
my $api_prefix = $cfg->{'api-path-prefix'} // '/';
if ($api_prefix ne '/' && $api_prefix =~ m!^/*(.+)/*$!) {
$api_prefix = "/$1/";
}
if ($api_path ne 'health') {
$api_path = "api/v2/${api_path}";
}
return "${proto}://${host}:${port}${api_prefix}${api_path}";
}
sub _connect {
my ($class, $cfg, $id) = @_;
my $host = $cfg->{server};
my $port = $cfg->{port};
my $proto = $cfg->{influxdbproto} // 'udp';
if ($proto eq 'udp') {
my $socket = IO::Socket::IP->new(
PeerAddr => $host,
PeerPort => $port,
Proto => 'udp',
) || die "couldn't create influxdb socket [$host]:$port - $@\n";
$socket->blocking(0);
return $socket;
} elsif ($proto =~ m/^https?$/) {
my $token = get_credentials($id);
my $org = $cfg->{organization} // 'proxmox';
my $bucket = $cfg->{bucket} // 'proxmox';
my $url = _get_v2url($cfg, "write?org=${org}&bucket=${bucket}");
my $req = HTTP::Request->new(POST => $url);
if (defined($token)) {
$req->header( "Authorization", "Token $token");
}
return $req;
}
die "cannot connect to InfluxDB: invalid protocol '$proto'\n";
}
sub test_connection {
my ($class, $cfg, $id) = @_;
my $proto = $cfg->{influxdbproto} // 'udp';
if ($proto eq 'udp') {
return $class->SUPER::test_connection($cfg, $id);
} elsif ($proto =~ m/^https?$/) {
my $url = _get_v2url($cfg, "health");
my $ua = LWP::UserAgent->new();
$ua->timeout($cfg->{timeout} // 1);
# in the initial add connection test, the token may still be in $cfg
my $token = $cfg->{token} // get_credentials($id, 1);
if (defined($token)) {
$ua->default_header("Authorization" => "Token $token");
}
my $response = $ua->get($url);
if (!$response->is_success) {
my $err = $response->status_line;
die "$err\n";
}
} else {
die "invalid protocol\n";
}
return;
}
sub build_influxdb_payload {
my ($class, $txn, $data, $ctime, $tags, $excluded, $measurement, $instance) = @_;
# 'abc' and '123' are both valid hostnames, that confuses influx's type detection
my $to_quote = { name => 1 };
my @values = ();
foreach my $key (sort keys %$data) {
next if defined($excluded) && $excluded->{$key};
my $value = $data->{$key};
next if !defined($value);
if (!ref($value) && $value ne '') {
# value is scalar
if (defined(my $v = prepare_value($value, $to_quote->{$key}))) {
push @values, "$key=$v";
}
} elsif (ref($value) eq 'HASH') {
# value is a hash
if (!defined($measurement)) {
build_influxdb_payload($class, $txn, $value, $ctime, $tags, $excluded, $key);
} elsif(!defined($instance)) {
build_influxdb_payload($class, $txn, $value, $ctime, $tags, $excluded, $measurement, $key);
} else {
push @values, get_recursive_values($value);
}
}
}
if (@values > 0) {
my $mm = $measurement // 'system';
my $tagstring = $tags;
$tagstring .= ",instance=$instance" if defined($instance);
my $valuestr = join(',', @values);
$class->add_metric_data($txn, "$mm,$tagstring $valuestr $ctime\n");
}
}
sub get_recursive_values {
my ($hash) = @_;
my @values = ();
foreach my $key (keys %$hash) {
my $value = $hash->{$key};
if(ref($value) eq 'HASH') {
push(@values, get_recursive_values($value));
} elsif (!ref($value) && $value ne '') {
if (defined(my $v = prepare_value($value))) {
push @values, "$key=$v";
}
}
}
return @values;
}
sub prepare_value {
my ($value, $quote) = @_;
# don't treat value like a number if quote is 1
if (looks_like_number($value) && !$quote) {
if (isnan($value) || isinf($value)) {
# we cannot send influxdb NaN or Inf
return undef;
}
# influxdb also accepts 1.0e+10, etc.
return $value;
}
# non-numeric values require to be quoted, so escape " with \"
$value =~ s/\"/\\\"/g;
$value = "\"$value\"";
return $value;
}
my $priv_dir = "/etc/pve/priv/metricserver";
sub cred_file_name {
my ($id) = @_;
return "${priv_dir}/${id}.pw";
}
sub delete_credentials {
my ($id) = @_;
if (my $cred_file = cred_file_name($id)) {
unlink($cred_file)
or warn "removing influxdb credentials file '$cred_file' failed: $!\n";
}
return;
}
sub set_credentials {
my ($id, $token) = @_;
my $cred_file = cred_file_name($id);
mkdir $priv_dir;
PVE::Tools::file_set_contents($cred_file, "$token");
}
sub get_credentials {
my ($id, $silent) = @_;
my $cred_file = cred_file_name($id);
my $creds = eval { PVE::Tools::file_get_contents($cred_file) };
warn "could not load credentials for '$id': $@\n" if $@ && !$silent;
return $creds;
}
sub on_add_hook {
my ($class, $id, $opts, $sensitive_opts) = @_;
my $token = $sensitive_opts->{token};
if (defined($token)) {
set_credentials($id, $token);
} else {
delete_credentials($id);
}
return undef;
}
sub on_update_hook {
my ($class, $id, $opts, $sensitive_opts) = @_;
return if !exists($sensitive_opts->{token});
my $token = $sensitive_opts->{token};
if (defined($token)) {
set_credentials($id, $token);
} else {
delete_credentials($id);
}
return undef;
}
sub on_delete_hook {
my ($class, $id, $opts) = @_;
delete_credentials($id);
return undef;
}
1;