mirror of
https://salsa.debian.org/ha-team/libqb
synced 2025-12-31 02:32:41 +00:00
228 lines
7.0 KiB
C
228 lines
7.0 KiB
C
/*
|
|
* Copyright (C) 2006, 2010 Red Hat, Inc.
|
|
*
|
|
* Author: Steven Dake <sdake@redhat.com>
|
|
*
|
|
* This file is part of libqb.
|
|
*
|
|
* libqb is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Lesser General Public License as published by
|
|
* the Free Software Foundation, either version 2.1 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* libqb is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public License
|
|
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
/*
|
|
* Add work to a work group and have threads process the work
|
|
* Provide blocking for all work to complete
|
|
*/
|
|
|
|
#include <config.h>
|
|
|
|
#include <stdlib.h>
|
|
#include <pthread.h>
|
|
#include <errno.h>
|
|
#include <qb/qbqueue.h>
|
|
#include <qb/qbwthread.h>
|
|
|
|
struct thread_data {
|
|
void *thread_state;
|
|
void *data;
|
|
};
|
|
|
|
struct qb_wthread_t {
|
|
struct qb_wthread_group *worker_thread_group;
|
|
pthread_mutex_t new_work_mutex;
|
|
pthread_cond_t new_work_cond;
|
|
pthread_cond_t cond;
|
|
pthread_mutex_t done_work_mutex;
|
|
pthread_cond_t done_work_cond;
|
|
pthread_t thread_id;
|
|
struct qb_queue queue;
|
|
void *thread_state;
|
|
struct thread_data thread_data;
|
|
};
|
|
|
|
static void *start_worker_thread(void *thread_data_in)
|
|
{
|
|
struct thread_data *thread_data = (struct thread_data *)thread_data_in;
|
|
struct qb_wthread_t *worker_thread =
|
|
(struct qb_wthread_t *)thread_data->data;
|
|
void *data_for_worker_fn;
|
|
|
|
for (;;) {
|
|
pthread_mutex_lock(&worker_thread->new_work_mutex);
|
|
if (qb_queue_is_empty(&worker_thread->queue) == 1) {
|
|
pthread_cond_wait(&worker_thread->new_work_cond,
|
|
&worker_thread->new_work_mutex);
|
|
}
|
|
|
|
/*
|
|
* We unlock then relock the new_work_mutex to allow the
|
|
* worker function to execute and also allow new work to be
|
|
* added to the work queue
|
|
*/
|
|
data_for_worker_fn = qb_queue_item_get(&worker_thread->queue);
|
|
pthread_mutex_unlock(&worker_thread->new_work_mutex);
|
|
worker_thread->worker_thread_group->worker_fn(worker_thread->
|
|
thread_state,
|
|
data_for_worker_fn);
|
|
pthread_mutex_lock(&worker_thread->new_work_mutex);
|
|
qb_queue_item_remove(&worker_thread->queue);
|
|
pthread_mutex_unlock(&worker_thread->new_work_mutex);
|
|
pthread_mutex_lock(&worker_thread->done_work_mutex);
|
|
if (qb_queue_is_empty(&worker_thread->queue) == 1) {
|
|
pthread_cond_signal(&worker_thread->done_work_cond);
|
|
}
|
|
pthread_mutex_unlock(&worker_thread->done_work_mutex);
|
|
}
|
|
return (NULL);
|
|
}
|
|
|
|
int qb_wthread_group_init(struct qb_wthread_group *worker_thread_group,
|
|
int threads,
|
|
int items_max,
|
|
int item_size,
|
|
int thread_state_size,
|
|
void (*thread_state_constructor) (void *),
|
|
void (*worker_fn) (void *thread_state,
|
|
void *work_item))
|
|
{
|
|
int i;
|
|
|
|
worker_thread_group->threadcount = threads;
|
|
worker_thread_group->last_scheduled = 0;
|
|
worker_thread_group->worker_fn = worker_fn;
|
|
worker_thread_group->threads = malloc(sizeof(struct qb_wthread_t) *
|
|
threads);
|
|
if (worker_thread_group->threads == 0) {
|
|
return (-1);
|
|
}
|
|
|
|
for (i = 0; i < threads; i++) {
|
|
if (thread_state_size) {
|
|
worker_thread_group->threads[i].thread_state =
|
|
malloc(thread_state_size);
|
|
} else {
|
|
worker_thread_group->threads[i].thread_state = NULL;
|
|
}
|
|
if (thread_state_constructor) {
|
|
thread_state_constructor(worker_thread_group->threads
|
|
[i].thread_state);
|
|
}
|
|
worker_thread_group->threads[i].worker_thread_group =
|
|
worker_thread_group;
|
|
pthread_mutex_init(&worker_thread_group->threads[i].
|
|
new_work_mutex, NULL);
|
|
pthread_cond_init(&worker_thread_group->threads[i].
|
|
new_work_cond, NULL);
|
|
pthread_mutex_init(&worker_thread_group->threads[i].
|
|
done_work_mutex, NULL);
|
|
pthread_cond_init(&worker_thread_group->threads[i].
|
|
done_work_cond, NULL);
|
|
qb_queue_init(&worker_thread_group->threads[i].queue, items_max,
|
|
item_size);
|
|
|
|
worker_thread_group->threads[i].thread_data.thread_state =
|
|
worker_thread_group->threads[i].thread_state;
|
|
worker_thread_group->threads[i].thread_data.data =
|
|
&worker_thread_group->threads[i];
|
|
pthread_create(&worker_thread_group->threads[i].thread_id, NULL,
|
|
start_worker_thread,
|
|
&worker_thread_group->threads[i].thread_data);
|
|
}
|
|
return (0);
|
|
}
|
|
|
|
int qb_wthread_group_work_add(struct qb_wthread_group *worker_thread_group,
|
|
void *item)
|
|
{
|
|
int schedule;
|
|
|
|
schedule =
|
|
(worker_thread_group->last_scheduled +
|
|
1) % (worker_thread_group->threadcount);
|
|
worker_thread_group->last_scheduled = schedule;
|
|
|
|
pthread_mutex_lock(&worker_thread_group->threads[schedule].
|
|
new_work_mutex);
|
|
if (qb_queue_is_full(&worker_thread_group->threads[schedule].queue)) {
|
|
pthread_mutex_unlock(&worker_thread_group->threads[schedule].
|
|
new_work_mutex);
|
|
return (-1);
|
|
}
|
|
qb_queue_item_add(&worker_thread_group->threads[schedule].queue, item);
|
|
pthread_cond_signal(&worker_thread_group->threads[schedule].
|
|
new_work_cond);
|
|
pthread_mutex_unlock(&worker_thread_group->threads[schedule].
|
|
new_work_mutex);
|
|
return (0);
|
|
}
|
|
|
|
void qb_wthread_group_wait(struct qb_wthread_group *worker_thread_group)
|
|
{
|
|
int i;
|
|
|
|
for (i = 0; i < worker_thread_group->threadcount; i++) {
|
|
pthread_mutex_lock(&worker_thread_group->threads[i].
|
|
done_work_mutex);
|
|
if (qb_queue_is_empty(&worker_thread_group->threads[i].queue) ==
|
|
0) {
|
|
pthread_cond_wait(&worker_thread_group->threads[i].
|
|
done_work_cond,
|
|
&worker_thread_group->threads[i].
|
|
done_work_mutex);
|
|
}
|
|
pthread_mutex_unlock(&worker_thread_group->threads[i].
|
|
done_work_mutex);
|
|
}
|
|
}
|
|
|
|
void qb_wthread_group_exit(struct qb_wthread_group *worker_thread_group)
|
|
{
|
|
int i;
|
|
|
|
for (i = 0; i < worker_thread_group->threadcount; i++) {
|
|
pthread_cancel(worker_thread_group->threads[i].thread_id);
|
|
|
|
/* Wait for worker thread to exit gracefully before destroying
|
|
* mutexes and processing items in the queue etc.
|
|
*/
|
|
pthread_join(worker_thread_group->threads[i].thread_id, NULL);
|
|
pthread_mutex_destroy(&worker_thread_group->threads[i].
|
|
new_work_mutex);
|
|
pthread_cond_destroy(&worker_thread_group->threads[i].
|
|
new_work_cond);
|
|
pthread_mutex_destroy(&worker_thread_group->threads[i].
|
|
done_work_mutex);
|
|
pthread_cond_destroy(&worker_thread_group->threads[i].
|
|
done_work_cond);
|
|
}
|
|
}
|
|
|
|
void qb_wthread_group_atsegv(struct qb_wthread_group *worker_thread_group)
|
|
{
|
|
void *data_for_worker_fn;
|
|
struct qb_wthread_t *worker_thread;
|
|
unsigned int i;
|
|
|
|
for (i = 0; i < worker_thread_group->threadcount; i++) {
|
|
worker_thread = &worker_thread_group->threads[i];
|
|
while (qb_queue_is_empty(&worker_thread->queue) == 0) {
|
|
data_for_worker_fn =
|
|
qb_queue_item_get(&worker_thread->queue);
|
|
worker_thread->worker_thread_group->
|
|
worker_fn(worker_thread->thread_state,
|
|
data_for_worker_fn);
|
|
qb_queue_item_remove(&worker_thread->queue);
|
|
}
|
|
}
|
|
}
|