mirror of
https://git.proxmox.com/git/pve-manager
synced 2025-07-24 16:34:17 +00:00
status/influxdb: implement influxdb 2.x http api
needs an organization/bucket (previously db) and an optional token the http client does not fit exactly in the connect/send/disconnect scheme, so it simply creates a request in 'connect', does the actual http connection in 'send' and nothing in 'disconnect' max-body-size is set to 25.000.000 bytes by default (the influxdb default) and the timeout to 1 second (same as default graphite tcp timeout) the token (if given) gets saved in /etc/pve/priv/metricserver/$ID.pw it is optional, because the 1.8.x compatibility api does not need authentication (in contrast to influxdb 2.x) Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
parent
27bc5e8e02
commit
ccb614311d
@ -6,6 +6,8 @@ use warnings;
|
||||
use POSIX qw(isnan isinf);
|
||||
use Scalar::Util 'looks_like_number';
|
||||
use IO::Socket::IP;
|
||||
use LWP::UserAgent;
|
||||
use HTTP::Request;
|
||||
|
||||
use PVE::SafeSyslog;
|
||||
|
||||
@ -24,12 +26,51 @@ sub type {
|
||||
return 'influxdb';
|
||||
}
|
||||
|
||||
sub properties {
|
||||
return {
|
||||
organization => {
|
||||
description => "The influxdb organization. Only necessary when using the http v2 api. ".
|
||||
"Has no meaning when using v2 compatibility api.",
|
||||
type => 'string',
|
||||
optional => 1,
|
||||
},
|
||||
bucket => {
|
||||
description => "The influxdb bucket/db. Only necessary when using the http v2 api.",
|
||||
type => 'string',
|
||||
optional => 1,
|
||||
},
|
||||
token => {
|
||||
description => "The influxdb access token. Only necessary when using the http v2 api. ".
|
||||
"If the v2 compatibility api is used, use 'user:password' instead.",
|
||||
type => 'string',
|
||||
optional => 1,
|
||||
},
|
||||
influxdbproto => {
|
||||
type => 'string',
|
||||
enum => ['udp', 'http', 'https'],
|
||||
default => 'udp',
|
||||
optional => 1,
|
||||
},
|
||||
'max-body-size' => {
|
||||
description => "Influxdb max-body-size. Requests are batched up to this size.",
|
||||
type => 'integer',
|
||||
minimum => 1,
|
||||
default => 25_000_000,
|
||||
}
|
||||
};
|
||||
}
|
||||
sub options {
|
||||
return {
|
||||
server => {},
|
||||
port => {},
|
||||
mtu => { optional => 1 },
|
||||
disable => { optional => 1 },
|
||||
organization => { optional => 1},
|
||||
bucket => { optional => 1},
|
||||
token => { optional => 1},
|
||||
influxdbproto => { optional => 1},
|
||||
timeout => { optional => 1},
|
||||
'max-body-size' => { optional => 1 },
|
||||
};
|
||||
}
|
||||
|
||||
@ -84,21 +125,106 @@ sub update_storage_status {
|
||||
build_influxdb_payload($class, $txn, $data, $ctime, $object);
|
||||
}
|
||||
|
||||
sub _connect {
|
||||
sub _send_batch_size {
|
||||
my ($class, $cfg) = @_;
|
||||
my $proto = $cfg->{influxdbproto} // 'udp';
|
||||
if ($proto ne 'udp') {
|
||||
return $cfg->{'max-body-size'} // 25_000_000;
|
||||
}
|
||||
|
||||
return $class->SUPER::_send_batch_size($cfg);
|
||||
}
|
||||
|
||||
sub send {
|
||||
my ($class, $connection, $data, $cfg) = @_;
|
||||
|
||||
my $proto = $cfg->{influxdbproto} // 'udp';
|
||||
if ($proto eq 'udp') {
|
||||
return $class->SUPER::send($connection, $data, $cfg);
|
||||
} elsif ($proto =~ m/^https?$/) {
|
||||
my $ua = LWP::UserAgent->new();
|
||||
$ua->timeout($cfg->{timeout} // 1);
|
||||
$connection->content($data);
|
||||
my $response = $ua->request($connection);
|
||||
|
||||
if (!$response->is_success) {
|
||||
my $err = $response->status_line;
|
||||
die "$err\n";
|
||||
}
|
||||
} else {
|
||||
die "invalid protocol\n";
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
sub _disconnect {
|
||||
my ($class, $connection, $cfg) = @_;
|
||||
my $proto = $cfg->{influxdbproto} // 'udp';
|
||||
if ($proto eq 'udp') {
|
||||
return $class->SUPER::_disconnect($connection, $cfg);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
sub _connect {
|
||||
my ($class, $cfg, $id) = @_;
|
||||
|
||||
my $host = $cfg->{server};
|
||||
my $port = $cfg->{port};
|
||||
my $proto = $cfg->{influxdbproto} // 'udp';
|
||||
|
||||
my $socket = IO::Socket::IP->new(
|
||||
PeerAddr => $host,
|
||||
PeerPort => $port,
|
||||
Proto => 'udp',
|
||||
) || die "couldn't create influxdb socket [$host]:$port - $@\n";
|
||||
if ($proto eq 'udp') {
|
||||
my $socket = IO::Socket::IP->new(
|
||||
PeerAddr => $host,
|
||||
PeerPort => $port,
|
||||
Proto => 'udp',
|
||||
) || die "couldn't create influxdb socket [$host]:$port - $@\n";
|
||||
|
||||
$socket->blocking(0);
|
||||
$socket->blocking(0);
|
||||
|
||||
return $socket;
|
||||
return $socket;
|
||||
} elsif ($proto =~ m/^https?$/) {
|
||||
my $token = get_credentials($id);
|
||||
my $org = $cfg->{organization} // 'proxmox';
|
||||
my $bucket = $cfg->{bucket} // 'proxmox';
|
||||
my $url = "${proto}://${host}:${port}/api/v2/write?org=${org}&bucket=${bucket}";
|
||||
|
||||
my $req = HTTP::Request->new(POST => $url);
|
||||
if (defined($token)) {
|
||||
$req->header( "Authorization", "Token $token");
|
||||
}
|
||||
|
||||
return $req;
|
||||
}
|
||||
|
||||
die "cannot connect to influxdb: invalid protocol\n";
|
||||
}
|
||||
|
||||
sub test_connection {
|
||||
my ($class, $cfg, $id) = @_;
|
||||
|
||||
my $proto = $cfg->{influxdbproto} // 'udp';
|
||||
if ($proto eq 'udp') {
|
||||
return $class->SUPER::test_connection($cfg, $id);
|
||||
} elsif ($proto =~ m/^https?$/) {
|
||||
my $host = $cfg->{server};
|
||||
my $port = $cfg->{port};
|
||||
my $url = "${proto}://${host}:${port}/health";
|
||||
my $ua = LWP::UserAgent->new();
|
||||
$ua->timeout($cfg->{timeout} // 1);
|
||||
my $response = $ua->get($url);
|
||||
|
||||
if (!$response->is_success) {
|
||||
my $err = $response->status_line;
|
||||
die "$err\n";
|
||||
}
|
||||
} else {
|
||||
die "invalid protocol\n";
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
sub build_influxdb_payload {
|
||||
@ -177,4 +303,77 @@ sub prepare_value {
|
||||
return $value;
|
||||
}
|
||||
|
||||
my $priv_dir = "/etc/pve/priv/metricserver";
|
||||
|
||||
sub cred_file_name {
|
||||
my ($id) = @_;
|
||||
return "${priv_dir}/${id}.pw";
|
||||
}
|
||||
|
||||
sub delete_credentials {
|
||||
my ($id) = @_;
|
||||
|
||||
if (my $cred_file = cred_file_name($id)) {
|
||||
unlink($cred_file)
|
||||
or warn "removing influxdb credentials file '$cred_file' failed: $!\n";
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
sub set_credentials {
|
||||
my ($id, $token) = @_;
|
||||
|
||||
my $cred_file = cred_file_name($id);
|
||||
|
||||
mkdir $priv_dir;
|
||||
|
||||
PVE::Tools::file_set_contents($cred_file, "$token");
|
||||
}
|
||||
|
||||
sub get_credentials {
|
||||
my ($id) = @_;
|
||||
|
||||
my $cred_file = cred_file_name($id);
|
||||
|
||||
return PVE::Tools::file_get_contents($cred_file);
|
||||
}
|
||||
|
||||
sub on_add_hook {
|
||||
my ($class, $id, $opts, $sensitive_opts) = @_;
|
||||
|
||||
my $token = $sensitive_opts->{token};
|
||||
|
||||
if (defined($token)) {
|
||||
set_credentials($id, $token);
|
||||
} else {
|
||||
delete_credenetials($id);
|
||||
}
|
||||
|
||||
return undef;
|
||||
}
|
||||
|
||||
sub on_update_hook {
|
||||
my ($class, $id, $opts, $sensitive_opts) = @_;
|
||||
return if !exists($sensitive_opts->{token});
|
||||
|
||||
my $token = $sensitive_opts->{token};
|
||||
if (defined($token)) {
|
||||
set_credentials($id, $token);
|
||||
} else {
|
||||
delete_credenetials($id);
|
||||
}
|
||||
|
||||
return undef;
|
||||
}
|
||||
|
||||
sub on_delete_hook {
|
||||
my ($class, $id, $opts) = @_;
|
||||
|
||||
delete_credentials($id);
|
||||
|
||||
return undef;
|
||||
}
|
||||
|
||||
|
||||
1;
|
||||
|
Loading…
Reference in New Issue
Block a user