package PVE::RADOS; use 5.014002; use strict; use warnings; use Carp; use JSON; use POSIX; use Socket; use PVE::Tools; use PVE::INotify; use PVE::RPCEnvironment; require Exporter; my $rados_default_timeout = 5; my $ceph_default_conf = '/etc/pve/ceph.conf'; my $ceph_default_user = 'admin'; our $VERSION = '1.0'; require XSLoader; XSLoader::load('PVE::RADOS', $VERSION); my $writedata = sub { my ($fh, $cmd, $data) = @_; local $SIG{PIPE} = 'IGNORE'; my $bin = pack "a L/a*", $cmd, $data || ''; my $res = syswrite $fh, $bin; die "write data failed - $!\n" if !defined($res); }; my $readdata = sub { my ($fh, $allow_eof) = @_; my $head = ''; local $SIG{PIPE} = 'IGNORE'; while (length($head) < 5) { last if !sysread $fh, $head, 5 - length($head), length($head); } return undef if $allow_eof && length($head) == 0; die "partial read\n" if length($head) < 5; my ($cmd, $len) = unpack "a L", $head; my $data = ''; while (length($data) < $len) { last if !sysread $fh, $data, $len - length($data), length($data); } die "partial data read\n" if length($data) < $len; return wantarray ? ($cmd, $data) : $data; }; my $kill_worker = sub { my ($self) = @_; return if !$self->{cpid}; return if $self->{__already_killed}; $self->{__already_killed} = 1; close($self->{child}) if defined($self->{child}); # only kill if we created the process return if $self->{pid} != $$; kill(9, $self->{cpid}); waitpid($self->{cpid}, 0); }; my $sendcmd = sub { my ($self, $cmd, $data, $expect_tag) = @_; $expect_tag = '>' if !$expect_tag; die "detected forked connection\n" if $self->{pid} != $$; my ($restag, $raw); my $code = sub { &$writedata($self->{child}, $cmd, $data) if $expect_tag ne 'S'; ($restag, $raw) = &$readdata($self->{child}); }; eval { PVE::Tools::run_with_timeout($self->{timeout}, $code); }; if (my $err = $@) { &$kill_worker($self); die $err; } if ($restag eq 'E') { die $raw if $raw; die "unknown error\n"; } die "got unexpected result\n" if $restag ne $expect_tag; return $raw; }; sub pve_rados_work { my ($self, $parent, $timeout, %params) = @_; my $conn; eval { my $ceph_user = delete $params{userid} || $ceph_default_user; $conn = pve_rados_create($ceph_user) || die "unable to create RADOS object\n"; if (defined($params{ceph_conf}) && (!-e $params{ceph_conf})) { die "Supplied ceph config doesn't exist, $params{ceph_conf}\n"; } my $ceph_conf = delete $params{ceph_conf} || $ceph_default_conf; if (-e $ceph_conf) { pve_rados_conf_read_file($conn, $ceph_conf); } pve_rados_conf_set($conn, 'client_mount_timeout', $timeout); foreach my $k (keys %params) { pve_rados_conf_set($conn, $k, $params{$k}); } pve_rados_connect($conn); }; if (my $err = $@) { &$writedata($parent, 'E', $err); die $err; } &$writedata($parent, 'S'); $self->{conn} = $conn; for (;;) { my ($cmd, $data) = &$readdata($parent, 1); last if !$cmd || $cmd eq 'Q'; my $res; eval { if ($cmd eq 'M') { # rados monitor commands $res = encode_json(pve_rados_mon_command($self->{conn}, [ $data ])); } elsif ($cmd eq 'C') { # class methods my $aref = decode_json($data); my $method = shift @$aref; $res = encode_json($self->$method(@$aref)); } else { die "invalid command\n"; } }; if (my $err = $@) { &$writedata($parent, 'E', $err); die $err; } &$writedata($parent, '>', $res); } } sub new { my ($class, %params) = @_; my $rpcenv = PVE::RPCEnvironment::get(); socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) || die "socketpair: $!\n"; my $cpid = fork(); die "unable to fork - $!\n" if !defined($cpid); my $self = bless {}, $class; my $timeout = delete $params{timeout} || $rados_default_timeout; $self->{timeout} = $timeout; $self->{pid} = $$; if ($cpid) { # parent close $parent; $self->{cpid} = $cpid; $self->{child} = $child; &$sendcmd($self, undef, undef, 'S'); # wait for sync } else { # child $0 = 'pverados'; eval { PVE::INotify::inotify_close(); if (my $atfork = $rpcenv->{atfork}) { &$atfork(); } # override signal handlers inherited from the parent local $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub { pve_rados_shutdown($self->{conn}) if $self->{conn}; POSIX::_exit(1); }; # fixme: timeout? close $child; $self->pve_rados_work($parent, $timeout, %params); }; my $err = $@; warn $err if $err; pve_rados_shutdown($self->{conn}) if $self->{conn}; POSIX::_exit($err ? 1 : 0); } return $self; } sub timeout { my ($self, $new_timeout) = @_; $self->{timeout} = $new_timeout if $new_timeout; return $self->{timeout}; } sub DESTROY { my ($self) = @_; if ($self->{cpid}) { #print "$$: DESTROY WAIT0\n"; &$kill_worker($self); #print "$$: DESTROY WAIT\n"; } } sub cluster_stat { my ($self, @args) = @_; if ($self->{cpid}) { my $data = encode_json(['cluster_stat', @args]); my $raw = &$sendcmd($self, 'C', $data); return decode_json($raw); } else { return pve_rados_cluster_stat($self->{conn}); } } # example1: { prefix => 'get_command_descriptions'}) # example2: { prefix => 'mon dump', format => 'json' } sub mon_cmd { my ($self, $cmd, $no_result_catch) = @_; $cmd->{format} = 'json' if !$cmd->{format}; my $json = encode_json($cmd); my $ret = $sendcmd->($self, 'M', $json); die "error with '$cmd->{prefix}': $@" if $@; my $raw = decode_json($ret); die "error with '$cmd->{prefix}': mon_cmd failed - $raw->{status_message}\n" if !$no_result_catch && $raw->{return_code} < 0; my $data = ''; if ($cmd->{format} && $cmd->{format} eq 'json') { $data = length($raw->{data}) ? decode_json($raw->{data}) : undef; } else { $data = $raw->{data}; } return { return_code => $raw->{return_code}, status_message => $raw->{status_message}, data => $data, }; } # for backward compatibillity or if one just doesn't care about actual RADOS error details sub mon_command { my ($self, $cmd) = @_; my $res = mon_cmd($self, $cmd); return $res->{data}; } 1; __END__ =head1 NAME PVE::RADOS - Perl bindings for librados =head1 SYNOPSIS use PVE::RADOS; my $rados = PVE::RADOS->new(); my $stat = $rados->cluster_stat(); my $res = $rados->mon_command({ prefix => 'mon dump', format => 'json' }); =head1 DESCRIPTION Perl bindings for librados. =head2 EXPORT None by default. =head1 AUTHOR Dietmar Maurer, Edietmar@proxmox.com Thomas Lamprecht Et.lamprecht@proxmox.com Wolfgang Bumiller Ew.bumiller@proxmox.com =cut