migration: refine the compression code

The current code for multi-thread compression is not clear,
especially in the aspect of using lock. Refine the code
to make it clear.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Message-Id: <1462433579-13691-8-git-send-email-liang.z.li@intel.com>
Signed-off-by: Amit Shah <amit.shah@redhat.com>
This commit is contained in:
Liang Li 2016-05-05 15:32:57 +08:00 committed by Amit Shah
parent 90e56fb46d
commit a7a9a88f9d

View File

@ -253,7 +253,6 @@ static struct BitmapRcu {
} *migration_bitmap_rcu; } *migration_bitmap_rcu;
struct CompressParam { struct CompressParam {
bool start;
bool done; bool done;
bool quit; bool quit;
QEMUFile *file; QEMUFile *file;
@ -293,34 +292,36 @@ static QemuThread *decompress_threads;
static QemuMutex decomp_done_lock; static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond; static QemuCond decomp_done_cond;
static int do_compress_ram_page(CompressParam *param); static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
ram_addr_t offset);
static void *do_data_compress(void *opaque) static void *do_data_compress(void *opaque)
{ {
CompressParam *param = opaque; CompressParam *param = opaque;
RAMBlock *block;
ram_addr_t offset;
while (!param->quit) {
qemu_mutex_lock(&param->mutex); qemu_mutex_lock(&param->mutex);
/* Re-check the quit flag in case of while (!param->quit) {
* terminate_compression_threads is called just before if (param->block) {
* qemu_mutex_lock(&param->mutex) and after block = param->block;
* while(!param->quit), re-check it here can make offset = param->offset;
* sure the compression thread terminate as expected. param->block = NULL;
*/
while (!param->start && !param->quit) {
qemu_cond_wait(&param->cond, &param->mutex);
}
if (!param->quit) {
do_compress_ram_page(param);
}
param->start = false;
qemu_mutex_unlock(&param->mutex); qemu_mutex_unlock(&param->mutex);
do_compress_ram_page(param->file, block, offset);
qemu_mutex_lock(comp_done_lock); qemu_mutex_lock(comp_done_lock);
param->done = true; param->done = true;
qemu_cond_signal(comp_done_cond); qemu_cond_signal(comp_done_cond);
qemu_mutex_unlock(comp_done_lock); qemu_mutex_unlock(comp_done_lock);
qemu_mutex_lock(&param->mutex);
} else {
qemu_cond_wait(&param->cond, &param->mutex);
} }
}
qemu_mutex_unlock(&param->mutex);
return NULL; return NULL;
} }
@ -808,18 +809,15 @@ static int ram_save_page(QEMUFile *f, PageSearchStatus *pss,
return pages; return pages;
} }
static int do_compress_ram_page(CompressParam *param) static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
ram_addr_t offset)
{ {
int bytes_sent, blen; int bytes_sent, blen;
uint8_t *p; uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
RAMBlock *block = param->block;
ram_addr_t offset = param->offset;
p = block->host + (offset & TARGET_PAGE_MASK); bytes_sent = save_page_header(f, block, offset |
bytes_sent = save_page_header(param->file, block, offset |
RAM_SAVE_FLAG_COMPRESS_PAGE); RAM_SAVE_FLAG_COMPRESS_PAGE);
blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE, blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE,
migrate_compress_level()); migrate_compress_level());
if (blen < 0) { if (blen < 0) {
bytes_sent = 0; bytes_sent = 0;
@ -832,15 +830,6 @@ static int do_compress_ram_page(CompressParam *param)
return bytes_sent; return bytes_sent;
} }
static inline void start_compression(CompressParam *param)
{
param->done = false;
qemu_mutex_lock(&param->mutex);
param->start = true;
qemu_cond_signal(&param->cond);
qemu_mutex_unlock(&param->mutex);
}
static inline void start_decompression(DecompressParam *param) static inline void start_decompression(DecompressParam *param)
{ {
param->done = false; param->done = false;
@ -860,18 +849,22 @@ static void flush_compressed_data(QEMUFile *f)
return; return;
} }
thread_count = migrate_compress_threads(); thread_count = migrate_compress_threads();
for (idx = 0; idx < thread_count; idx++) {
if (!comp_param[idx].done) {
qemu_mutex_lock(comp_done_lock); qemu_mutex_lock(comp_done_lock);
while (!comp_param[idx].done && !comp_param[idx].quit) { for (idx = 0; idx < thread_count; idx++) {
while (!comp_param[idx].done) {
qemu_cond_wait(comp_done_cond, comp_done_lock); qemu_cond_wait(comp_done_cond, comp_done_lock);
} }
qemu_mutex_unlock(comp_done_lock);
} }
qemu_mutex_unlock(comp_done_lock);
for (idx = 0; idx < thread_count; idx++) {
qemu_mutex_lock(&comp_param[idx].mutex);
if (!comp_param[idx].quit) { if (!comp_param[idx].quit) {
len = qemu_put_qemu_file(f, comp_param[idx].file); len = qemu_put_qemu_file(f, comp_param[idx].file);
bytes_transferred += len; bytes_transferred += len;
} }
qemu_mutex_unlock(&comp_param[idx].mutex);
} }
} }
@ -893,9 +886,12 @@ static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
while (true) { while (true) {
for (idx = 0; idx < thread_count; idx++) { for (idx = 0; idx < thread_count; idx++) {
if (comp_param[idx].done) { if (comp_param[idx].done) {
comp_param[idx].done = false;
bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file); bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
qemu_mutex_lock(&comp_param[idx].mutex);
set_compress_params(&comp_param[idx], block, offset); set_compress_params(&comp_param[idx], block, offset);
start_compression(&comp_param[idx]); qemu_cond_signal(&comp_param[idx].cond);
qemu_mutex_unlock(&comp_param[idx].mutex);
pages = 1; pages = 1;
acct_info.norm_pages++; acct_info.norm_pages++;
*bytes_transferred += bytes_xmit; *bytes_transferred += bytes_xmit;