mirror of
https://git.proxmox.com/git/pve-manager
synced 2025-08-04 09:28:40 +00:00
add PVE/Jobs to handle VZDump jobs
this adds a SectionConfig handling for jobs (only 'vzdump' for now) that represents a job that will be handled by pvescheduler and a basic 'job-state' handling for reading/writing state json files this has some intersections with pvesrs state handling, but does not use a single state file for all jobs, but seperate ones, like we do it in the backup-server. Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
parent
6385fb8183
commit
76c6ee8a6f
286
PVE/Jobs.pm
Normal file
286
PVE/Jobs.pm
Normal file
@ -0,0 +1,286 @@
|
||||
package PVE::Jobs;
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
use JSON;
|
||||
|
||||
use PVE::Cluster qw(cfs_read_file cfs_lock_file);
|
||||
use PVE::Jobs::Plugin;
|
||||
use PVE::Jobs::VZDump;
|
||||
use PVE::Tools;
|
||||
|
||||
PVE::Jobs::VZDump->register();
|
||||
PVE::Jobs::Plugin->init();
|
||||
|
||||
my $state_dir = "/var/lib/pve-manager/jobs";
|
||||
my $lock_dir = "/var/lock/pve-manager";
|
||||
|
||||
my $get_state_file = sub {
|
||||
my ($jobid, $type) = @_;
|
||||
return "$state_dir/$type-$jobid.json";
|
||||
};
|
||||
|
||||
my $default_state = {
|
||||
state => 'created',
|
||||
time => 0,
|
||||
};
|
||||
|
||||
# lockless, since we use file_get_contents, which is atomic
|
||||
sub read_job_state {
|
||||
my ($jobid, $type) = @_;
|
||||
my $path = $get_state_file->($jobid, $type);
|
||||
return if ! -e $path;
|
||||
|
||||
my $raw = PVE::Tools::file_get_contents($path);
|
||||
|
||||
return $default_state if $raw eq '';
|
||||
|
||||
# untaint $raw
|
||||
if ($raw =~ m/^(\{.*\})$/) {
|
||||
return decode_json($1);
|
||||
}
|
||||
|
||||
die "invalid json data in '$path'\n";
|
||||
}
|
||||
|
||||
sub lock_job_state {
|
||||
my ($jobid, $type, $sub) = @_;
|
||||
|
||||
my $filename = "$lock_dir/$type-$jobid.lck";
|
||||
|
||||
my $res = PVE::Tools::lock_file($filename, 10, $sub);
|
||||
die $@ if $@;
|
||||
|
||||
return $res;
|
||||
}
|
||||
|
||||
my $get_job_status = sub {
|
||||
my ($state) = @_;
|
||||
|
||||
if (!defined($state->{upid})) {
|
||||
return; # not started
|
||||
}
|
||||
|
||||
my ($task, $filename) = PVE::Tools::upid_decode($state->{upid}, 1);
|
||||
die "unable to parse worker upid - $state->{upid}\n" if !$task;
|
||||
die "no such task\n" if ! -f $filename;
|
||||
|
||||
my $pstart = PVE::ProcFSTools::read_proc_starttime($task->{pid});
|
||||
if ($pstart && $pstart == $task->{pstart}) {
|
||||
return; # still running
|
||||
}
|
||||
|
||||
return PVE::Tools::upid_read_status($state->{upid});
|
||||
};
|
||||
|
||||
# checks if the job is already finished if it was started before and
|
||||
# updates the statefile accordingly
|
||||
sub update_job_stopped {
|
||||
my ($jobid, $type) = @_;
|
||||
|
||||
# first check unlocked to save time,
|
||||
my $state = read_job_state($jobid, $type);
|
||||
return if !defined($state) || $state->{state} ne 'started'; # removed or not started
|
||||
|
||||
if (defined($get_job_status->($state))) {
|
||||
lock_job_state($jobid, $type, sub {
|
||||
my $state = read_job_state($jobid, $type);
|
||||
return if !defined($state) || $state->{state} ne 'started'; # removed or not started
|
||||
|
||||
my $status = $get_job_status->($state);
|
||||
|
||||
my $new_state = {
|
||||
state => 'stopped',
|
||||
msg => $status,
|
||||
upid => $state->{upid}
|
||||
};
|
||||
|
||||
if ($state->{updated}) { # save updated time stamp
|
||||
$new_state->{updated} = $state->{updated};
|
||||
}
|
||||
|
||||
my $path = $get_state_file->($jobid, $type);
|
||||
PVE::Tools::file_set_contents($path, encode_json($new_state));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
# must be called when the job is first created
|
||||
sub create_job {
|
||||
my ($jobid, $type) = @_;
|
||||
|
||||
lock_job_state($jobid, $type, sub {
|
||||
my $state = read_job_state($jobid, $type) // $default_state;
|
||||
|
||||
if ($state->{state} ne 'created') {
|
||||
die "job state already exists\n";
|
||||
}
|
||||
|
||||
$state->{time} = time();
|
||||
|
||||
my $path = $get_state_file->($jobid, $type);
|
||||
PVE::Tools::file_set_contents($path, encode_json($state));
|
||||
});
|
||||
}
|
||||
|
||||
# to be called when the job is removed
|
||||
sub remove_job {
|
||||
my ($jobid, $type) = @_;
|
||||
my $path = $get_state_file->($jobid, $type);
|
||||
unlink $path;
|
||||
}
|
||||
|
||||
# checks if the job can be started and sets the state to 'starting'
|
||||
# returns 1 if the job can be started, 0 otherwise
|
||||
sub starting_job {
|
||||
my ($jobid, $type) = @_;
|
||||
|
||||
# first check unlocked to save time
|
||||
my $state = read_job_state($jobid, $type);
|
||||
return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
|
||||
|
||||
lock_job_state($jobid, $type, sub {
|
||||
my $state = read_job_state($jobid, $type);
|
||||
return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
|
||||
|
||||
my $new_state = {
|
||||
state => 'starting',
|
||||
time => time(),
|
||||
};
|
||||
|
||||
my $path = $get_state_file->($jobid, $type);
|
||||
PVE::Tools::file_set_contents($path, encode_json($new_state));
|
||||
});
|
||||
return 1;
|
||||
}
|
||||
|
||||
sub started_job {
|
||||
my ($jobid, $type, $upid, $err) = @_;
|
||||
lock_job_state($jobid, $type, sub {
|
||||
my $state = read_job_state($jobid, $type);
|
||||
return if !defined($state); # job was removed, do not update
|
||||
die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
|
||||
|
||||
my $new_state;
|
||||
if (defined($err)) {
|
||||
$new_state = {
|
||||
state => 'stopped',
|
||||
msg => $err,
|
||||
time => time(),
|
||||
};
|
||||
} else {
|
||||
$new_state = {
|
||||
state => 'started',
|
||||
upid => $upid,
|
||||
};
|
||||
}
|
||||
|
||||
my $path = $get_state_file->($jobid, $type);
|
||||
PVE::Tools::file_set_contents($path, encode_json($new_state));
|
||||
});
|
||||
}
|
||||
|
||||
# will be called when the job schedule is updated
|
||||
sub updated_job_schedule {
|
||||
my ($jobid, $type) = @_;
|
||||
lock_job_state($jobid, $type, sub {
|
||||
my $old_state = read_job_state($jobid, $type) // $default_state;
|
||||
|
||||
$old_state->{updated} = time();
|
||||
|
||||
my $path = $get_state_file->($jobid, $type);
|
||||
PVE::Tools::file_set_contents($path, encode_json($old_state));
|
||||
});
|
||||
}
|
||||
|
||||
sub get_last_runtime {
|
||||
my ($jobid, $type) = @_;
|
||||
|
||||
my $state = read_job_state($jobid, $type) // $default_state;
|
||||
|
||||
return $state->{updated} if defined($state->{updated});
|
||||
|
||||
if (my $upid = $state->{upid}) {
|
||||
my ($task) = PVE::Tools::upid_decode($upid, 1);
|
||||
die "unable to parse worker upid\n" if !$task;
|
||||
return $task->{starttime};
|
||||
}
|
||||
|
||||
return $state->{time} // 0;
|
||||
}
|
||||
|
||||
sub run_jobs {
|
||||
synchronize_job_states_with_config();
|
||||
|
||||
my $jobs_cfg = cfs_read_file('jobs.cfg');
|
||||
my $nodename = PVE::INotify::nodename();
|
||||
|
||||
foreach my $id (sort keys %{$jobs_cfg->{ids}}) {
|
||||
my $cfg = $jobs_cfg->{ids}->{$id};
|
||||
my $type = $cfg->{type};
|
||||
my $schedule = delete $cfg->{schedule};
|
||||
|
||||
# only schedule local jobs
|
||||
next if defined($cfg->{node}) && $cfg->{node} ne $nodename;
|
||||
|
||||
eval {
|
||||
update_job_stopped($id, $type);
|
||||
};
|
||||
if (my $err = $@) {
|
||||
warn "could not update job state, skipping - $err\n";
|
||||
next;
|
||||
}
|
||||
|
||||
# only schedule enabled jobs
|
||||
next if defined($cfg->{enabled}) && !$cfg->{enabled};
|
||||
|
||||
my $last_run = get_last_runtime($id, $type);
|
||||
my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
|
||||
my $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $last_run) // 0;
|
||||
|
||||
if (time() >= $next_sync) {
|
||||
my $plugin = PVE::Jobs::Plugin->lookup($type);
|
||||
if (starting_job($id, $type)) {
|
||||
my $upid = eval { $plugin->run($cfg) };
|
||||
if (my $err = $@) {
|
||||
warn $@ if $@;
|
||||
started_job($id, $type, undef, $err);
|
||||
} elsif ($upid eq 'OK') { # some jobs return OK immediatly
|
||||
started_job($id, $type, undef, 'OK');
|
||||
} else {
|
||||
started_job($id, $type, $upid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# creates and removes statefiles for job configs
|
||||
sub synchronize_job_states_with_config {
|
||||
cfs_lock_file('jobs.cfg', undef, sub {
|
||||
my $data = cfs_read_file('jobs.cfg');
|
||||
|
||||
for my $id (keys $data->{ids}->%*) {
|
||||
my $job = $data->{ids}->{$id};
|
||||
my $type = $job->{type};
|
||||
my $jobstate = read_job_state($id, $type);
|
||||
create_job($id, $type) if !defined($jobstate);
|
||||
}
|
||||
|
||||
PVE::Tools::dir_glob_foreach($state_dir, '(.*?)-(.*).json', sub {
|
||||
my ($path, $type, $id) = @_;
|
||||
|
||||
if (!defined($data->{ids}->{$id})) {
|
||||
remove_job($id, $type);
|
||||
}
|
||||
});
|
||||
});
|
||||
die $@ if $@;
|
||||
}
|
||||
|
||||
sub setup_dirs {
|
||||
mkdir $state_dir;
|
||||
mkdir $lock_dir;
|
||||
}
|
||||
|
||||
1;
|
16
PVE/Jobs/Makefile
Normal file
16
PVE/Jobs/Makefile
Normal file
@ -0,0 +1,16 @@
|
||||
include ../../defines.mk
|
||||
|
||||
PERLSOURCE = \
|
||||
Plugin.pm\
|
||||
VZDump.pm
|
||||
|
||||
all:
|
||||
|
||||
.PHONY: clean
|
||||
clean:
|
||||
rm -rf *~
|
||||
|
||||
.PHONY: install
|
||||
install: ${PERLSOURCE}
|
||||
install -d ${PERLLIBDIR}/PVE/Jobs
|
||||
install -m 0644 ${PERLSOURCE} ${PERLLIBDIR}/PVE/Jobs
|
61
PVE/Jobs/Plugin.pm
Normal file
61
PVE/Jobs/Plugin.pm
Normal file
@ -0,0 +1,61 @@
|
||||
package PVE::Jobs::Plugin;
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use PVE::Cluster qw(cfs_register_file);
|
||||
|
||||
use base qw(PVE::SectionConfig);
|
||||
|
||||
cfs_register_file('jobs.cfg',
|
||||
sub { __PACKAGE__->parse_config(@_); },
|
||||
sub { __PACKAGE__->write_config(@_); });
|
||||
|
||||
my $defaultData = {
|
||||
propertyList => {
|
||||
type => { description => "Section type." },
|
||||
id => {
|
||||
description => "The ID of the VZDump job.",
|
||||
type => 'string',
|
||||
format => 'pve-configid',
|
||||
},
|
||||
enabled => {
|
||||
description => "Determines if the job is enabled.",
|
||||
type => 'boolean',
|
||||
default => 1,
|
||||
optional => 1,
|
||||
},
|
||||
schedule => {
|
||||
description => "Backup schedule. The format is a subset of `systemd` calendar events.",
|
||||
type => 'string', format => 'pve-calendar-event',
|
||||
maxLength => 128,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
sub private {
|
||||
return $defaultData;
|
||||
}
|
||||
|
||||
sub parse_config {
|
||||
my ($class, $filename, $raw) = @_;
|
||||
|
||||
my $cfg = $class->SUPER::parse_config($filename, $raw);
|
||||
|
||||
foreach my $id (sort keys %{$cfg->{ids}}) {
|
||||
my $data = $cfg->{ids}->{$id};
|
||||
|
||||
$data->{id} = $id;
|
||||
$data->{enabled} //= 1;
|
||||
}
|
||||
|
||||
return $cfg;
|
||||
}
|
||||
|
||||
sub run {
|
||||
my ($class, $cfg) = @_;
|
||||
# implement in subclass
|
||||
die "not implemented";
|
||||
}
|
||||
|
||||
1;
|
54
PVE/Jobs/VZDump.pm
Normal file
54
PVE/Jobs/VZDump.pm
Normal file
@ -0,0 +1,54 @@
|
||||
package PVE::Jobs::VZDump;
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use PVE::INotify;
|
||||
use PVE::VZDump::Common;
|
||||
use PVE::API2::VZDump;
|
||||
use PVE::Cluster;
|
||||
|
||||
use base qw(PVE::Jobs::Plugin);
|
||||
|
||||
sub type {
|
||||
return 'vzdump';
|
||||
}
|
||||
|
||||
my $props = PVE::VZDump::Common::json_config_properties();
|
||||
|
||||
sub properties {
|
||||
return $props;
|
||||
}
|
||||
|
||||
sub options {
|
||||
my $options = {
|
||||
enabled => { optional => 1 },
|
||||
schedule => {},
|
||||
};
|
||||
foreach my $opt (keys %$props) {
|
||||
if ($props->{$opt}->{optional}) {
|
||||
$options->{$opt} = { optional => 1 };
|
||||
} else {
|
||||
$options->{$opt} = {};
|
||||
}
|
||||
}
|
||||
|
||||
return $options;
|
||||
}
|
||||
|
||||
sub run {
|
||||
my ($class, $conf) = @_;
|
||||
|
||||
# remove all non vzdump related options
|
||||
foreach my $opt (keys %$conf) {
|
||||
delete $conf->{$opt} if !defined($props->{$opt});
|
||||
}
|
||||
|
||||
$conf->{quiet} = 1; # do not write to stdout/stderr
|
||||
|
||||
PVE::Cluster::cfs_update(); # refresh vmlist
|
||||
|
||||
return PVE::API2::VZDump->vzdump($conf);
|
||||
}
|
||||
|
||||
1;
|
@ -1,6 +1,6 @@
|
||||
include ../defines.mk
|
||||
|
||||
SUBDIRS=API2 Status CLI Service Ceph
|
||||
SUBDIRS=API2 Status CLI Service Ceph Jobs
|
||||
|
||||
PERLSOURCE = \
|
||||
API2.pm \
|
||||
@ -11,6 +11,7 @@ PERLSOURCE = \
|
||||
CertHelpers.pm \
|
||||
ExtMetric.pm \
|
||||
HTTPServer.pm \
|
||||
Jobs.pm \
|
||||
NodeConfig.pm \
|
||||
Report.pm \
|
||||
VZDump.pm
|
||||
|
Loading…
Reference in New Issue
Block a user