aboutsummaryrefslogtreecommitdiff
path: root/buffered_file.c
diff options
context:
space:
mode:
Diffstat (limited to 'buffered_file.c')
-rw-r--r--buffered_file.c152
1 files changed, 64 insertions, 88 deletions
diff --git a/buffered_file.c b/buffered_file.c
index f170aa0..bd0f61d 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -23,11 +23,7 @@
typedef struct QEMUFileBuffered
{
- BufferedPutFunc *put_buffer;
- BufferedPutReadyFunc *put_ready;
- BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
- BufferedCloseFunc *close;
- void *opaque;
+ MigrationState *migration_state;
QEMUFile *file;
int freeze_output;
size_t bytes_xfer;
@@ -50,70 +46,60 @@ static void buffered_append(QEMUFileBuffered *s,
const uint8_t *buf, size_t size)
{
if (size > (s->buffer_capacity - s->buffer_size)) {
- void *tmp;
-
DPRINTF("increasing buffer capacity from %zu by %zu\n",
s->buffer_capacity, size + 1024);
s->buffer_capacity += size + 1024;
- tmp = g_realloc(s->buffer, s->buffer_capacity);
- if (tmp == NULL) {
- fprintf(stderr, "qemu file buffer expansion failed\n");
- exit(1);
- }
-
- s->buffer = tmp;
+ s->buffer = g_realloc(s->buffer, s->buffer_capacity);
}
memcpy(s->buffer + s->buffer_size, buf, size);
s->buffer_size += size;
}
-static void buffered_flush(QEMUFileBuffered *s)
+static ssize_t buffered_flush(QEMUFileBuffered *s)
{
size_t offset = 0;
- int error;
-
- error = qemu_file_get_error(s->file);
- if (error != 0) {
- DPRINTF("flush when error, bailing: %s\n", strerror(-error));
- return;
- }
+ ssize_t ret = 0;
DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
- while (offset < s->buffer_size) {
- ssize_t ret;
+ while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
- ret = s->put_buffer(s->opaque, s->buffer + offset,
- s->buffer_size - offset);
+ ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset,
+ s->buffer_size - offset);
if (ret == -EAGAIN) {
DPRINTF("backend not ready, freezing\n");
+ ret = 0;
s->freeze_output = 1;
break;
}
if (ret <= 0) {
DPRINTF("error flushing data, %zd\n", ret);
- qemu_file_set_error(s->file, ret);
break;
} else {
DPRINTF("flushed %zd byte(s)\n", ret);
offset += ret;
+ s->bytes_xfer += ret;
}
}
DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
s->buffer_size -= offset;
+
+ if (ret < 0) {
+ return ret;
+ }
+ return offset;
}
static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
{
QEMUFileBuffered *s = opaque;
- int offset = 0, error;
- ssize_t ret;
+ ssize_t error;
DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
@@ -126,65 +112,54 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
DPRINTF("unfreezing output\n");
s->freeze_output = 0;
- buffered_flush(s);
-
- while (!s->freeze_output && offset < size) {
- if (s->bytes_xfer > s->xfer_limit) {
- DPRINTF("transfer limit exceeded when putting\n");
- break;
- }
-
- ret = s->put_buffer(s->opaque, buf + offset, size - offset);
- if (ret == -EAGAIN) {
- DPRINTF("backend not ready, freezing\n");
- s->freeze_output = 1;
- break;
- }
-
- if (ret <= 0) {
- DPRINTF("error putting\n");
- qemu_file_set_error(s->file, ret);
- offset = -EINVAL;
- break;
- }
-
- DPRINTF("put %zd byte(s)\n", ret);
- offset += ret;
- s->bytes_xfer += ret;
+ if (size > 0) {
+ DPRINTF("buffering %d bytes\n", size - offset);
+ buffered_append(s, buf, size);
}
- if (offset >= 0) {
- DPRINTF("buffering %d bytes\n", size - offset);
- buffered_append(s, buf + offset, size - offset);
- offset = size;
+ error = buffered_flush(s);
+ if (error < 0) {
+ DPRINTF("buffered flush error. bailing: %s\n", strerror(-error));
+ return error;
}
if (pos == 0 && size == 0) {
DPRINTF("file is ready\n");
- if (s->bytes_xfer <= s->xfer_limit) {
+ if (!s->freeze_output && s->bytes_xfer < s->xfer_limit) {
DPRINTF("notifying client\n");
- s->put_ready(s->opaque);
+ migrate_fd_put_ready(s->migration_state);
}
}
- return offset;
+ return size;
}
static int buffered_close(void *opaque)
{
QEMUFileBuffered *s = opaque;
- int ret;
+ ssize_t ret = 0;
+ int ret2;
DPRINTF("closing\n");
+ s->xfer_limit = INT_MAX;
while (!qemu_file_get_error(s->file) && s->buffer_size) {
- buffered_flush(s);
- if (s->freeze_output)
- s->wait_for_unfreeze(s->opaque);
+ ret = buffered_flush(s);
+ if (ret < 0) {
+ break;
+ }
+ if (s->freeze_output) {
+ ret = migrate_fd_wait_for_unfreeze(s->migration_state);
+ if (ret < 0) {
+ break;
+ }
+ }
}
- ret = s->close(s->opaque);
-
+ ret2 = migrate_fd_close(s->migration_state);
+ if (ret >= 0) {
+ ret = ret2;
+ }
qemu_del_timer(s->timer);
qemu_free_timer(s->timer);
g_free(s->buffer);
@@ -199,6 +174,13 @@ static int buffered_close(void *opaque)
* 1: Time to stop
* negative: There has been an error
*/
+static int buffered_get_fd(void *opaque)
+{
+ QEMUFileBuffered *s = opaque;
+
+ return qemu_get_fd(s->file);
+}
+
static int buffered_rate_limit(void *opaque)
{
QEMUFileBuffered *s = opaque;
@@ -256,34 +238,28 @@ static void buffered_rate_tick(void *opaque)
s->bytes_xfer = 0;
- buffered_flush(s);
-
- /* Add some checks around this */
- s->put_ready(s->opaque);
+ buffered_put_buffer(s, NULL, 0, 0);
}
-QEMUFile *qemu_fopen_ops_buffered(void *opaque,
- size_t bytes_per_sec,
- BufferedPutFunc *put_buffer,
- BufferedPutReadyFunc *put_ready,
- BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
- BufferedCloseFunc *close)
+static const QEMUFileOps buffered_file_ops = {
+ .get_fd = buffered_get_fd,
+ .put_buffer = buffered_put_buffer,
+ .close = buffered_close,
+ .rate_limit = buffered_rate_limit,
+ .get_rate_limit = buffered_get_rate_limit,
+ .set_rate_limit = buffered_set_rate_limit,
+};
+
+QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state)
{
QEMUFileBuffered *s;
s = g_malloc0(sizeof(*s));
- s->opaque = opaque;
- s->xfer_limit = bytes_per_sec / 10;
- s->put_buffer = put_buffer;
- s->put_ready = put_ready;
- s->wait_for_unfreeze = wait_for_unfreeze;
- s->close = close;
-
- s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
- buffered_close, buffered_rate_limit,
- buffered_set_rate_limit,
- buffered_get_rate_limit);
+ s->migration_state = migration_state;
+ s->xfer_limit = migration_state->bandwidth_limit / 10;
+
+ s->file = qemu_fopen_ops(s, &buffered_file_ops);
s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);