aboutsummaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-11 23:34:24 -0500
committerSage Weil <sage@inktank.com>2013-05-01 21:17:37 -0700
commit6644ed7b7e04f8e588aebdaa58cededb9416ab95 (patch)
tree2770308c0a48245b81754238f0c37e87605fde3b /net
parent8ea299bcbc85aeaf5348d99614b35433287bec6b (diff)
libceph: make message data be a pointer
Begin the transition from a single message data item to a list of them by replacing the "data" structure in a message with a pointer to a ceph_msg_data structure. A null pointer will indicate the message has no data; replace the use of ceph_msg_has_data() with a simple check for a null pointer. Create functions ceph_msg_data_create() and ceph_msg_data_destroy() to dynamically allocate and free a data item structure of a given type. When a message has its data item "set," allocate one of these to hold the data description, and free it when the last reference to the message is dropped. This partially resolves: http://tracker.ceph.com/issues/4429 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/messenger.c94
1 files changed, 61 insertions, 33 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index dd4b8226a48..d4e46d8a088 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1086,7 +1086,7 @@ static void prepare_message_data(struct ceph_msg *msg)
/* Initialize data cursor */
- ceph_msg_data_cursor_init(&msg->data, data_len);
+ ceph_msg_data_cursor_init(msg->data, data_len);
}
/*
@@ -1406,13 +1406,13 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page,
static int write_partial_message_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
- struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
+ struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
bool do_datacrc = !con->msgr->nocrc;
u32 crc;
dout("%s %p msg %p\n", __func__, con, msg);
- if (WARN_ON(!ceph_msg_has_data(msg)))
+ if (WARN_ON(!msg->data))
return -EINVAL;
/*
@@ -1432,7 +1432,7 @@ static int write_partial_message_data(struct ceph_connection *con)
bool need_crc;
int ret;
- page = ceph_msg_data_next(&msg->data, &page_offset, &length,
+ page = ceph_msg_data_next(msg->data, &page_offset, &length,
&last_piece);
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
length, last_piece);
@@ -1444,7 +1444,7 @@ static int write_partial_message_data(struct ceph_connection *con)
}
if (do_datacrc && cursor->need_crc)
crc = ceph_crc32c_page(crc, page, page_offset, length);
- need_crc = ceph_msg_data_advance(&msg->data, (size_t) ret);
+ need_crc = ceph_msg_data_advance(msg->data, (size_t)ret);
}
dout("%s %p msg %p done\n", __func__, con, msg);
@@ -2104,7 +2104,7 @@ static int read_partial_message_section(struct ceph_connection *con,
static int read_partial_msg_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
- struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
+ struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
const bool do_datacrc = !con->msgr->nocrc;
struct page *page;
size_t page_offset;
@@ -2113,13 +2113,13 @@ static int read_partial_msg_data(struct ceph_connection *con)
int ret;
BUG_ON(!msg);
- if (WARN_ON(!ceph_msg_has_data(msg)))
+ if (!msg->data)
return -EIO;
if (do_datacrc)
crc = con->in_data_crc;
while (cursor->resid) {
- page = ceph_msg_data_next(&msg->data, &page_offset, &length,
+ page = ceph_msg_data_next(msg->data, &page_offset, &length,
NULL);
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
if (ret <= 0) {
@@ -2131,7 +2131,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
if (do_datacrc)
crc = ceph_crc32c_page(crc, page, page_offset, ret);
- (void) ceph_msg_data_advance(&msg->data, (size_t) ret);
+ (void) ceph_msg_data_advance(msg->data, (size_t)ret);
}
if (do_datacrc)
con->in_data_crc = crc;
@@ -2947,44 +2947,80 @@ void ceph_con_keepalive(struct ceph_connection *con)
}
EXPORT_SYMBOL(ceph_con_keepalive);
-static void ceph_msg_data_init(struct ceph_msg_data *data)
+static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
{
- data->type = CEPH_MSG_DATA_NONE;
+ struct ceph_msg_data *data;
+
+ if (WARN_ON(!ceph_msg_data_type_valid(type)))
+ return NULL;
+
+ data = kzalloc(sizeof (*data), GFP_NOFS);
+ if (data)
+ data->type = type;
+
+ return data;
+}
+
+static void ceph_msg_data_destroy(struct ceph_msg_data *data)
+{
+ if (!data)
+ return;
+
+ if (data->type == CEPH_MSG_DATA_PAGELIST) {
+ ceph_pagelist_release(data->pagelist);
+ kfree(data->pagelist);
+ }
+ kfree(data);
}
void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t alignment)
{
+ struct ceph_msg_data *data;
+
BUG_ON(!pages);
BUG_ON(!length);
- BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
+ BUG_ON(msg->data != NULL);
+
+ data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
+ BUG_ON(!data);
+ data->pages = pages;
+ data->length = length;
+ data->alignment = alignment & ~PAGE_MASK;
- msg->data.type = CEPH_MSG_DATA_PAGES;
- msg->data.pages = pages;
- msg->data.length = length;
- msg->data.alignment = alignment & ~PAGE_MASK;
+ msg->data = data;
}
EXPORT_SYMBOL(ceph_msg_data_set_pages);
void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist)
{
+ struct ceph_msg_data *data;
+
BUG_ON(!pagelist);
BUG_ON(!pagelist->length);
- BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
+ BUG_ON(msg->data != NULL);
- msg->data.type = CEPH_MSG_DATA_PAGELIST;
- msg->data.pagelist = pagelist;
+ data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
+ BUG_ON(!data);
+ data->pagelist = pagelist;
+
+ msg->data = data;
}
EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
{
+ struct ceph_msg_data *data;
+
BUG_ON(!bio);
- BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
+ BUG_ON(msg->data != NULL);
- msg->data.type = CEPH_MSG_DATA_BIO;
- msg->data.bio = bio;
+ data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
+ BUG_ON(!data);
+ data->bio = bio;
+
+ msg->data = data;
}
EXPORT_SYMBOL(ceph_msg_data_set_bio);
@@ -3008,8 +3044,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
INIT_LIST_HEAD(&m->list_head);
kref_init(&m->kref);
- ceph_msg_data_init(&m->data);
-
/* front */
m->front_max = front_len;
if (front_len) {
@@ -3163,14 +3197,8 @@ void ceph_msg_last_put(struct kref *kref)
ceph_buffer_put(m->middle);
m->middle = NULL;
}
- if (ceph_msg_has_data(m)) {
- if (m->data.type == CEPH_MSG_DATA_PAGELIST) {
- ceph_pagelist_release(m->data.pagelist);
- kfree(m->data.pagelist);
- }
- memset(&m->data, 0, sizeof m->data);
- ceph_msg_data_init(&m->data);
- }
+ ceph_msg_data_destroy(m->data);
+ m->data = NULL;
if (m->pool)
ceph_msgpool_put(m->pool, m);
@@ -3182,7 +3210,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
void ceph_msg_dump(struct ceph_msg *msg)
{
pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
- msg->front_max, msg->data.length);
+ msg->front_max, msg->data->length);
print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true);