mirror of
				https://git.proxmox.com/git/mirror_zfs
				synced 2025-10-26 16:21:07 +00:00 
			
		
		
		
	 577d41d3b2
			
		
	
	
		577d41d3b2
		
			
		
	
	
	
	
		
			
			- Some optimizations for bqueue enqueue/dequeue. - Added a fix to prevent deadlock when both bqueue_enqueue_impl() and bqueue_dequeue() waits for signal to be triggered. Reviewed-by: Alexander Motin <mav@FreeBSD.org> Reviewed-by: Ryan Moeller <ryan@iXsystems.com> Signed-off-by: Ameer Hamza <ahamza@ixsystems.com> Closes #13855
		
			
				
	
	
		
			159 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			159 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*
 | |
|  * CDDL HEADER START
 | |
|  *
 | |
|  * This file and its contents are supplied under the terms of the
 | |
|  * Common Development and Distribution License ("CDDL"), version 1.0.
 | |
|  * You may only use this file in accordance with the terms of version
 | |
|  * 1.0 of the CDDL.
 | |
|  *
 | |
|  * A full copy of the text of the CDDL should have accompanied this
 | |
|  * source.  A copy of the CDDL is also available via the Internet at
 | |
|  * http://www.illumos.org/license/CDDL.
 | |
|  *
 | |
|  * CDDL HEADER END
 | |
|  */
 | |
| /*
 | |
|  * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
 | |
|  */
 | |
| 
 | |
| #include	<sys/bqueue.h>
 | |
| #include	<sys/zfs_context.h>
 | |
| 
 | |
| static inline bqueue_node_t *
 | |
| obj2node(bqueue_t *q, void *data)
 | |
| {
 | |
| 	return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Initialize a blocking queue  The maximum capacity of the queue is set to
 | |
|  * size.  Types that are stored in a bqueue must contain a bqueue_node_t,
 | |
|  * and node_offset must be its offset from the start of the struct.
 | |
|  * fill_fraction is a performance tuning value; when the queue is full, any
 | |
|  * threads attempting to enqueue records will block.  They will block until
 | |
|  * they're signaled, which will occur when the queue is at least 1/fill_fraction
 | |
|  * empty.  Similar behavior occurs on dequeue; if the queue is empty, threads
 | |
|  * block.  They will be signalled when the queue has 1/fill_fraction full, or
 | |
|  * when bqueue_flush is called.  As a result, you must call bqueue_flush when
 | |
|  * you enqueue your final record on a thread, in case the dequeueing threads are
 | |
|  * currently blocked and that enqueue does not cause them to be awoken.
 | |
|  * Alternatively, this behavior can be disabled (causing signaling to happen
 | |
|  * immediately) by setting fill_fraction to any value larger than size.
 | |
|  * Return 0 on success, or -1 on failure.
 | |
|  */
 | |
| int
 | |
| bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
 | |
| {
 | |
| 	if (fill_fraction == 0) {
 | |
| 		return (-1);
 | |
| 	}
 | |
| 	list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
 | |
| 	    node_offset + offsetof(bqueue_node_t, bqn_node));
 | |
| 	cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
 | |
| 	cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
 | |
| 	mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
 | |
| 	q->bq_node_offset = node_offset;
 | |
| 	q->bq_size = 0;
 | |
| 	q->bq_maxsize = size;
 | |
| 	q->bq_fill_fraction = fill_fraction;
 | |
| 	return (0);
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Destroy a blocking queue.  This function asserts that there are no
 | |
|  * elements in the queue, and no one is blocked on the condition
 | |
|  * variables.
 | |
|  */
 | |
| void
 | |
| bqueue_destroy(bqueue_t *q)
 | |
| {
 | |
| 	mutex_enter(&q->bq_lock);
 | |
| 	ASSERT0(q->bq_size);
 | |
| 	cv_destroy(&q->bq_add_cv);
 | |
| 	cv_destroy(&q->bq_pop_cv);
 | |
| 	list_destroy(&q->bq_list);
 | |
| 	mutex_exit(&q->bq_lock);
 | |
| 	mutex_destroy(&q->bq_lock);
 | |
| }
 | |
| 
 | |
| static void
 | |
| bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
 | |
| {
 | |
| 	ASSERT3U(item_size, >, 0);
 | |
| 	ASSERT3U(item_size, <=, q->bq_maxsize);
 | |
| 	mutex_enter(&q->bq_lock);
 | |
| 	obj2node(q, data)->bqn_size = item_size;
 | |
| 	while (q->bq_size && q->bq_size + item_size > q->bq_maxsize) {
 | |
| 		/*
 | |
| 		 * Wake up bqueue_dequeue() thread if already sleeping in order
 | |
| 		 * to prevent the deadlock condition
 | |
| 		 */
 | |
| 		cv_signal(&q->bq_pop_cv);
 | |
| 		cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
 | |
| 	}
 | |
| 	q->bq_size += item_size;
 | |
| 	list_insert_tail(&q->bq_list, data);
 | |
| 	if (flush)
 | |
| 		cv_broadcast(&q->bq_pop_cv);
 | |
| 	else if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction)
 | |
| 		cv_signal(&q->bq_pop_cv);
 | |
| 	mutex_exit(&q->bq_lock);
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Add data to q, consuming size units of capacity.  If there is insufficient
 | |
|  * capacity to consume size units, block until capacity exists.  Asserts size is
 | |
|  * > 0.
 | |
|  */
 | |
| void
 | |
| bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
 | |
| {
 | |
| 	bqueue_enqueue_impl(q, data, item_size, B_FALSE);
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Enqueue an entry, and then flush the queue.  This forces the popping threads
 | |
|  * to wake up, even if we're below the fill fraction.  We have this in a single
 | |
|  * function, rather than having a separate call, because it prevents race
 | |
|  * conditions between the enqueuing thread and the dequeueing thread, where the
 | |
|  * enqueueing thread will wake up the dequeueing thread, that thread will
 | |
|  * destroy the condvar before the enqueuing thread is done.
 | |
|  */
 | |
| void
 | |
| bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
 | |
| {
 | |
| 	bqueue_enqueue_impl(q, data, item_size, B_TRUE);
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Take the first element off of q.  If there are no elements on the queue, wait
 | |
|  * until one is put there.  Return the removed element.
 | |
|  */
 | |
| void *
 | |
| bqueue_dequeue(bqueue_t *q)
 | |
| {
 | |
| 	void *ret = NULL;
 | |
| 	size_t item_size;
 | |
| 	mutex_enter(&q->bq_lock);
 | |
| 	while (q->bq_size == 0) {
 | |
| 		cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
 | |
| 	}
 | |
| 	ret = list_remove_head(&q->bq_list);
 | |
| 	ASSERT3P(ret, !=, NULL);
 | |
| 	item_size = obj2node(q, ret)->bqn_size;
 | |
| 	q->bq_size -= item_size;
 | |
| 	if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction))
 | |
| 		cv_signal(&q->bq_add_cv);
 | |
| 	mutex_exit(&q->bq_lock);
 | |
| 	return (ret);
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Returns true if the space used is 0.
 | |
|  */
 | |
| boolean_t
 | |
| bqueue_empty(bqueue_t *q)
 | |
| {
 | |
| 	return (q->bq_size == 0);
 | |
| }
 |