From a41bad1a9b9f9982eb9b451165724c5f81096683 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 30 Nov 2012 13:49:51 +0800 Subject: ceph: re-calculate truncate_size for strip object Otherwise osd may truncate the object to larger size. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil --- net/ceph/osd_client.c | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index eb9a4447876..267f183b801 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -76,8 +76,16 @@ int ceph_calc_raw_layout(struct ceph_osd_client *osdc, orig_len - *plen, off, *plen); if (op_has_extent(op->op)) { + u32 osize = le32_to_cpu(layout->fl_object_size); op->extent.offset = objoff; op->extent.length = objlen; + if (op->extent.truncate_size <= off - objoff) { + op->extent.truncate_size = 0; + } else { + op->extent.truncate_size -= off - objoff; + if (op->extent.truncate_size > osize) + op->extent.truncate_size = osize; + } } req->r_num_pages = calc_pages_for(off, *plen); req->r_page_alignment = off & ~PAGE_MASK; -- cgit v1.2.3 From 1604f488ac2dcce33c8218e75a000e8c5fb57e61 Mon Sep 17 00:00:00 2001 From: Jim Schutt Date: Fri, 30 Nov 2012 09:15:25 -0700 Subject: libceph: for chooseleaf rules, retry CRUSH map descent from root if leaf is failed Add libceph support for a new CRUSH tunable recently added to Ceph servers. Consider the CRUSH rule step chooseleaf firstn 0 type This rule means that replicas will be chosen in a manner such that each chosen leaf's branch will contain a unique instance of . When an object is re-replicated after a leaf failure, if the CRUSH map uses a chooseleaf rule the remapped replica ends up under the bucket that held the failed leaf. This causes uneven data distribution across the storage cluster, to the point that when all the leaves but one fail under a particular bucket, that remaining leaf holds all the data from its failed peers. This behavior also limits the number of peers that can participate in the re-replication of the data held by the failed leaf, which increases the time required to re-replicate after a failure. For a chooseleaf CRUSH rule, the tree descent has two steps: call them the inner and outer descents. If the tree descent down to is the outer descent, and the descent from down to a leaf is the inner descent, the issue is that a down leaf is detected on the inner descent, so only the inner descent is retried. In order to disperse re-replicated data as widely as possible across a storage cluster after a failure, we want to retry the outer descent. So, fix up crush_choose() to allow the inner descent to return immediately on choosing a failed leaf. Wire this up as a new CRUSH tunable. Note that after this change, for a chooseleaf rule, if the primary OSD in a placement group has failed, choosing a replacement may result in one of the other OSDs in the PG colliding with the new primary. This requires that OSD's data for that PG to need moving as well. This seems unavoidable but should be relatively rare. This corresponds to ceph.git commit 88f218181a9e6d2292e2697fc93797d0f6d6e5dc. Signed-off-by: Jim Schutt Reviewed-by: Sage Weil --- net/ceph/crush/mapper.c | 13 ++++++++++--- net/ceph/osdmap.c | 6 ++++++ 2 files changed, 16 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 35fce755ce1..96c8a58937d 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in * @outpos: our position in that vector * @firstn: true if choosing "first n" items, false if choosing "indep" * @recurse_to_leaf: true if we want one device under each item of given type + * @descend_once: true if we should only try one descent before giving up * @out2: second output vector for leaf items (if @recurse_to_leaf) */ static int crush_choose(const struct crush_map *map, @@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map, int x, int numrep, int type, int *out, int outpos, int firstn, int recurse_to_leaf, - int *out2) + int descend_once, int *out2) { int rep; unsigned int ftotal, flocal; @@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map, x, outpos+1, 0, out2, outpos, firstn, 0, + map->chooseleaf_descend_once, NULL) <= outpos) /* didn't get leaf */ reject = 1; @@ -422,7 +424,10 @@ reject: ftotal++; flocal++; - if (collide && flocal <= map->choose_local_tries) + if (reject && descend_once) + /* let outer call try again */ + skip_rep = 1; + else if (collide && flocal <= map->choose_local_tries) /* retry locally a few times */ retry_bucket = 1; else if (map->choose_local_fallback_tries > 0 && @@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map, int i, j; int numrep; int firstn; + const int descend_once = 0; if ((__u32)ruleno >= map->max_rules) { dprintk(" bad ruleno %d\n", ruleno); @@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map, curstep->arg2, o+osize, j, firstn, - recurse_to_leaf, c+osize); + recurse_to_leaf, + descend_once, c+osize); } if (recurse_to_leaf) diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index de73214b5d2..ca05871635b 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -170,6 +170,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end) c->choose_local_tries = 2; c->choose_local_fallback_tries = 5; c->choose_total_tries = 19; + c->chooseleaf_descend_once = 0; ceph_decode_need(p, end, 4*sizeof(u32), bad); magic = ceph_decode_32(p); @@ -336,6 +337,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end) dout("crush decode tunable choose_total_tries = %d", c->choose_total_tries); + ceph_decode_need(p, end, sizeof(u32), done); + c->chooseleaf_descend_once = ceph_decode_32(p); + dout("crush decode tunable chooseleaf_descend_once = %d", + c->chooseleaf_descend_once); + done: dout("crush_decode success\n"); return c; -- cgit v1.2.3 From 7d7c1f6136bac00174842f845babe7fb3483724e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 15 Jan 2013 18:49:09 -0800 Subject: crush: avoid recursion if we have already collided This saves us some cycles, but does not affect the placement result at all. This corresponds to ceph.git commit 4abb53d4f. Signed-off-by: Sage Weil --- net/ceph/crush/mapper.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 96c8a58937d..cbd06a91941 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -392,7 +392,7 @@ static int crush_choose(const struct crush_map *map, } reject = 0; - if (recurse_to_leaf) { + if (!collide && recurse_to_leaf) { if (item < 0) { if (crush_choose(map, map->buckets[-1-item], -- cgit v1.2.3 From c3acb18196cf3d7d3db6a5121c1bc674c3fba31f Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 7 Dec 2012 09:57:58 -0600 Subject: libceph: reformat __reset_osd() Reformat __reset_osd() into three distinct blocks of code handling the three return cases. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 267f183b801..eade41bb710 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -747,31 +747,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc) */ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) { - struct ceph_osd_request *req; - int ret = 0; + struct ceph_entity_addr *peer_addr; dout("__reset_osd %p osd%d\n", osd, osd->o_osd); if (list_empty(&osd->o_requests) && list_empty(&osd->o_linger_requests)) { __remove_osd(osdc, osd); - ret = -ENODEV; - } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], - &osd->o_con.peer_addr, - sizeof(osd->o_con.peer_addr)) == 0 && - !ceph_con_opened(&osd->o_con)) { + + return -ENODEV; + } + + peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; + if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && + !ceph_con_opened(&osd->o_con)) { + struct ceph_osd_request *req; + dout(" osd addr hasn't changed and connection never opened," " letting msgr retry"); /* touch each r_stamp for handle_timeout()'s benfit */ list_for_each_entry(req, &osd->o_requests, r_osd_item) req->r_stamp = jiffies; - ret = -EAGAIN; - } else { - ceph_con_close(&osd->o_con); - ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, - &osdc->osdmap->osd_addr[osd->o_osd]); - osd->o_incarnation++; + + return -EAGAIN; } - return ret; + + ceph_con_close(&osd->o_con); + ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); + osd->o_incarnation++; + + return 0; } static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) -- cgit v1.2.3 From af77f26caa35a95af09d1dac5c513b3901de7e37 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 9 Nov 2012 08:43:15 -0600 Subject: rbd: drop oid parameters from ceph_osdc_build_request() The last two parameters to ceph_osd_build_request() describe the object id, but the values passed always come from the osd request structure whose address is also provided. Get rid of those last two parameters. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index eade41bb710..7d38327a8e8 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -376,9 +376,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, u64 *plen, struct ceph_osd_req_op *src_ops, struct ceph_snap_context *snapc, - struct timespec *mtime, - const char *oid, - int oid_len) + struct timespec *mtime) { struct ceph_msg *msg = req->r_request; struct ceph_osd_request_head *head; @@ -405,9 +403,9 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, /* fill in oid */ - head->object_len = cpu_to_le32(oid_len); - memcpy(p, oid, oid_len); - p += oid_len; + head->object_len = cpu_to_le32(req->r_oid_len); + memcpy(p, req->r_oid, req->r_oid_len); + p += req->r_oid_len; src_op = src_ops; while (src_op->op) { @@ -506,8 +504,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, ceph_osdc_build_request(req, off, plen, ops, snapc, - mtime, - req->r_oid, req->r_oid_len); + mtime); return req; } -- cgit v1.2.3 From c885837f7d4f8c4f5cb2a744cc6929bc078e9dc0 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 13 Nov 2012 21:11:15 -0600 Subject: libceph: always allow trail in osd request An osd request structure contains an optional trail portion, which if present will contain data to be passed in the payload portion of the message containing the request. The trail field is a ceph_pagelist pointer, and if null it indicates there is no trail. A ceph_pagelist structure contains a length field, and it can legitimately hold value 0. Make use of this to change the interpretation of the "trail" of an osd request so that every osd request has trailing data, it just might have length 0. This means we change the r_trail field in a ceph_osd_request structure from a pointer to a structure that is always initialized. Note that in ceph_osdc_start_request(), the trail pointer (or now address of that structure) is assigned to a ceph message's trail field. Here's why that's still OK (looking at net/ceph/messenger.c): - What would have resulted in a null pointer previously will now refer to a 0-length page list. That message trail pointer is used in two functions, write_partial_msg_pages() and out_msg_pos_next(). - In write_partial_msg_pages(), a null page list pointer is handled the same as a message with 0-length trail, and both result in a "in_trail" variable set to false. The trail pointer is only used if in_trail is true. - The only other place the message trail pointer is used is out_msg_pos_next(). That function is only called by write_partial_msg_pages() and only touches the trail pointer if the in_trail value it is passed is true. Therefore a null ceph_msg->trail pointer is equivalent to a non-null pointer referring to a 0-length page list structure. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 43 ++++++++++++------------------------------- 1 file changed, 12 insertions(+), 31 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 7d38327a8e8..2be50d82ccb 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -171,10 +171,7 @@ void ceph_osdc_release_request(struct kref *kref) bio_put(req->r_bio); #endif ceph_put_snap_context(req->r_snapc); - if (req->r_trail) { - ceph_pagelist_release(req->r_trail); - kfree(req->r_trail); - } + ceph_pagelist_release(&req->r_trail); if (req->r_mempool) mempool_free(req, req->r_osdc->req_mempool); else @@ -208,8 +205,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, { struct ceph_osd_request *req; struct ceph_msg *msg; - int needs_trail; - int num_op = get_num_ops(ops, &needs_trail); + int num_op = get_num_ops(ops, NULL); size_t msg_size = sizeof(struct ceph_osd_request_head); msg_size += num_op*sizeof(struct ceph_osd_op); @@ -252,15 +248,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, } req->r_reply = msg; - /* allocate space for the trailing data */ - if (needs_trail) { - req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags); - if (!req->r_trail) { - ceph_osdc_put_request(req); - return NULL; - } - ceph_pagelist_init(req->r_trail); - } + ceph_pagelist_init(&req->r_trail); /* create request message; allow space for oid */ msg_size += MAX_OBJ_NAME_SIZE; @@ -312,29 +300,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req, case CEPH_OSD_OP_GETXATTR: case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_CMPXATTR: - BUG_ON(!req->r_trail); - dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); dst->xattr.cmp_op = src->xattr.cmp_op; dst->xattr.cmp_mode = src->xattr.cmp_mode; - ceph_pagelist_append(req->r_trail, src->xattr.name, + ceph_pagelist_append(&req->r_trail, src->xattr.name, src->xattr.name_len); - ceph_pagelist_append(req->r_trail, src->xattr.val, + ceph_pagelist_append(&req->r_trail, src->xattr.val, src->xattr.value_len); break; case CEPH_OSD_OP_CALL: - BUG_ON(!req->r_trail); - dst->cls.class_len = src->cls.class_len; dst->cls.method_len = src->cls.method_len; dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); - ceph_pagelist_append(req->r_trail, src->cls.class_name, + ceph_pagelist_append(&req->r_trail, src->cls.class_name, src->cls.class_len); - ceph_pagelist_append(req->r_trail, src->cls.method_name, + ceph_pagelist_append(&req->r_trail, src->cls.method_name, src->cls.method_len); - ceph_pagelist_append(req->r_trail, src->cls.indata, + ceph_pagelist_append(&req->r_trail, src->cls.indata, src->cls.indata_len); break; case CEPH_OSD_OP_ROLLBACK: @@ -347,11 +331,9 @@ static void osd_req_encode_op(struct ceph_osd_request *req, __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); __le32 timeout = cpu_to_le32(src->watch.timeout); - BUG_ON(!req->r_trail); - - ceph_pagelist_append(req->r_trail, + ceph_pagelist_append(&req->r_trail, &prot_ver, sizeof(prot_ver)); - ceph_pagelist_append(req->r_trail, + ceph_pagelist_append(&req->r_trail, &timeout, sizeof(timeout)); } case CEPH_OSD_OP_NOTIFY_ACK: @@ -414,8 +396,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, op++; } - if (req->r_trail) - data_len += req->r_trail->length; + data_len += req->r_trail.length; if (snapc) { head->snap_seq = cpu_to_le64(snapc->seq); @@ -1715,7 +1696,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, #ifdef CONFIG_BLOCK req->r_request->bio = req->r_bio; #endif - req->r_request->trail = req->r_trail; + req->r_request->trail = &req->r_trail; register_request(osdc, req); -- cgit v1.2.3 From 5b9d1b1cd46aa6c8abf891f25c15aee31538da7e Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 13 Nov 2012 21:11:15 -0600 Subject: libceph: kill op_needs_trail() Since every osd message is now prepared to include trailing data, there's no need to check ahead of time whether any operations will make use of the trail portion of the message. We can drop the second argument to get_num_ops(), and as a result we can also get rid of op_needs_trail() which is no longer used. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 27 ++++----------------------- 1 file changed, 4 insertions(+), 23 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 2be50d82ccb..37d43d5b828 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -32,20 +32,6 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc, static void __send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); -static int op_needs_trail(int op) -{ - switch (op) { - case CEPH_OSD_OP_GETXATTR: - case CEPH_OSD_OP_SETXATTR: - case CEPH_OSD_OP_CMPXATTR: - case CEPH_OSD_OP_CALL: - case CEPH_OSD_OP_NOTIFY: - return 1; - default: - return 0; - } -} - static int op_has_extent(int op) { return (op == CEPH_OSD_OP_READ || @@ -179,17 +165,12 @@ void ceph_osdc_release_request(struct kref *kref) } EXPORT_SYMBOL(ceph_osdc_release_request); -static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail) +static int get_num_ops(struct ceph_osd_req_op *ops) { int i = 0; - if (needs_trail) - *needs_trail = 0; - while (ops[i].op) { - if (needs_trail && op_needs_trail(ops[i].op)) - *needs_trail = 1; + while (ops[i].op) i++; - } return i; } @@ -205,7 +186,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, { struct ceph_osd_request *req; struct ceph_msg *msg; - int num_op = get_num_ops(ops, NULL); + int num_op = get_num_ops(ops); size_t msg_size = sizeof(struct ceph_osd_request_head); msg_size += num_op*sizeof(struct ceph_osd_op); @@ -365,7 +346,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, struct ceph_osd_req_op *src_op; struct ceph_osd_op *op; void *p; - int num_op = get_num_ops(src_ops, NULL); + int num_op = get_num_ops(src_ops); size_t msg_size = sizeof(*head) + num_op*sizeof(*op); int flags = req->r_flags; u64 data_len = 0; -- cgit v1.2.3 From 0120be3c60d46d6d55f4bf7a3d654cc705eb0c54 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 14 Nov 2012 09:38:19 -0600 Subject: libceph: pass length to ceph_osdc_build_request() The len argument to ceph_osdc_build_request() is set up to be passed by address, but that function never updates its value so there's no need to do this. Tighten up the interface by passing the length directly. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 37d43d5b828..e29a3ed9295 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -336,7 +336,7 @@ static void osd_req_encode_op(struct ceph_osd_request *req, * */ void ceph_osdc_build_request(struct ceph_osd_request *req, - u64 off, u64 *plen, + u64 off, u64 len, struct ceph_osd_req_op *src_ops, struct ceph_snap_context *snapc, struct timespec *mtime) @@ -390,7 +390,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, if (flags & CEPH_OSD_FLAG_WRITE) { req->r_request->hdr.data_off = cpu_to_le16(off); - req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); + req->r_request->hdr.data_len = cpu_to_le32(len + data_len); } else if (data_len) { req->r_request->hdr.data_off = 0; req->r_request->hdr.data_len = cpu_to_le32(data_len); @@ -464,7 +464,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req->r_num_pages = calc_pages_for(page_align, *plen); req->r_page_alignment = page_align; - ceph_osdc_build_request(req, off, plen, ops, + ceph_osdc_build_request(req, off, *plen, ops, snapc, mtime); -- cgit v1.2.3 From e8afad656cbcd06d02a7bacd4b318fa0e2907de0 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 14 Nov 2012 09:38:19 -0600 Subject: libceph: pass length to ceph_calc_file_object_mapping() ceph_calc_file_object_mapping() takes (among other things) a "file" offset and length, and based on the layout, determines the object number ("bno") backing the affected portion of the file's data and the offset into that object where the desired range begins. It also computes the size that should be used for the request--either the amount requested or something less if that would exceed the end of the object. This patch changes the input length parameter in this function so it is used only for input. That is, the argument will be passed by value rather than by address, so the value provided won't get updated by the function. The value would only get updated if the length would surpass the current object, and in that case the value it got updated to would be exactly that returned in *oxlen. Only one of the two callers is affected by this change. Update ceph_calc_raw_layout() so it records any updated value. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 6 ++++-- net/ceph/osdmap.c | 9 ++++----- 2 files changed, 8 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index e29a3ed9295..47e5f5b1f94 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -53,13 +53,15 @@ int ceph_calc_raw_layout(struct ceph_osd_client *osdc, reqhead->snapid = cpu_to_le64(snapid); /* object extent? */ - r = ceph_calc_file_object_mapping(layout, off, plen, bno, + r = ceph_calc_file_object_mapping(layout, off, orig_len, bno, &objoff, &objlen); if (r < 0) return r; - if (*plen < orig_len) + if (objlen < orig_len) { + *plen = objlen; dout(" skipping last %llu, final file extent %llu~%llu\n", orig_len - *plen, off, *plen); + } if (op_has_extent(op->op)) { u32 osize = le32_to_cpu(layout->fl_object_size); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index ca05871635b..369f03ba9ee 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1016,7 +1016,7 @@ bad: * pass a stride back to the caller. */ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, - u64 off, u64 *plen, + u64 off, u64 len, u64 *ono, u64 *oxoff, u64 *oxlen) { @@ -1027,7 +1027,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, u32 su_per_object; u64 t, su_offset; - dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen, + dout("mapping %llu~%llu osize %u fl_su %u\n", off, len, osize, su); if (su == 0 || sc == 0) goto invalid; @@ -1060,11 +1060,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, /* * Calculate the length of the extent being written to the selected - * object. This is the minimum of the full length requested (plen) or + * object. This is the minimum of the full length requested (len) or * the remainder of the current stripe being written to. */ - *oxlen = min_t(u64, *plen, su - su_offset); - *plen = *oxlen; + *oxlen = min_t(u64, len, su - su_offset); dout(" obj extent %llu~%llu\n", *oxoff, *oxlen); return 0; -- cgit v1.2.3 From 4d6b250bf18d44571d69a0f4afec4b6a1969729f Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 13 Nov 2012 21:11:15 -0600 Subject: libceph: drop snapid in ceph_calc_raw_layout() A snapshot id must be provided to ceph_calc_raw_layout() even though it is not needed at all for calculating the layout. Where the snapshot id *is* needed is when building the request message for an osd operation. Drop the snapid parameter from ceph_calc_raw_layout() and pass that value instead in ceph_osdc_build_request(). Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 47e5f5b1f94..b5a4b2875e8 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -40,18 +40,14 @@ static int op_has_extent(int op) int ceph_calc_raw_layout(struct ceph_osd_client *osdc, struct ceph_file_layout *layout, - u64 snapid, u64 off, u64 *plen, u64 *bno, struct ceph_osd_request *req, struct ceph_osd_req_op *op) { - struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; u64 orig_len = *plen; u64 objoff, objlen; /* extent in object */ int r; - reqhead->snapid = cpu_to_le64(snapid); - /* object extent? */ r = ceph_calc_file_object_mapping(layout, off, orig_len, bno, &objoff, &objlen); @@ -121,8 +117,7 @@ static int calc_layout(struct ceph_osd_client *osdc, u64 bno; int r; - r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, - plen, &bno, req, op); + r = ceph_calc_raw_layout(osdc, layout, off, plen, &bno, req, op); if (r < 0) return r; @@ -340,7 +335,7 @@ static void osd_req_encode_op(struct ceph_osd_request *req, void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, u64 len, struct ceph_osd_req_op *src_ops, - struct ceph_snap_context *snapc, + struct ceph_snap_context *snapc, u64 snap_id, struct timespec *mtime) { struct ceph_msg *msg = req->r_request; @@ -355,6 +350,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, int i; head = msg->front.iov_base; + head->snapid = cpu_to_le64(snap_id); op = (void *)(head + 1); p = (void *)(op + num_op); @@ -466,9 +462,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req->r_num_pages = calc_pages_for(page_align, *plen); req->r_page_alignment = page_align; - ceph_osdc_build_request(req, off, *plen, ops, - snapc, - mtime); + ceph_osdc_build_request(req, off, *plen, ops, snapc, vino.snap, mtime); return req; } -- cgit v1.2.3 From e75b45cf36565fd8ba206a9d80f670a86e61ba2f Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 13 Nov 2012 21:11:14 -0600 Subject: libceph: drop osdc from ceph_calc_raw_layout() The osdc parameter to ceph_calc_raw_layout() is not used, so get rid of it. Consequently, the corresponding parameter in calc_layout() becomes unused, so get rid of that as well. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index b5a4b2875e8..cd9c28387de 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -38,8 +38,7 @@ static int op_has_extent(int op) op == CEPH_OSD_OP_WRITE); } -int ceph_calc_raw_layout(struct ceph_osd_client *osdc, - struct ceph_file_layout *layout, +int ceph_calc_raw_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, u64 *bno, struct ceph_osd_request *req, struct ceph_osd_req_op *op) @@ -107,8 +106,7 @@ EXPORT_SYMBOL(ceph_calc_raw_layout); * * fill osd op in request message. */ -static int calc_layout(struct ceph_osd_client *osdc, - struct ceph_vino vino, +static int calc_layout(struct ceph_vino vino, struct ceph_file_layout *layout, u64 off, u64 *plen, struct ceph_osd_request *req, @@ -117,7 +115,7 @@ static int calc_layout(struct ceph_osd_client *osdc, u64 bno; int r; - r = ceph_calc_raw_layout(osdc, layout, off, plen, &bno, req, op); + r = ceph_calc_raw_layout(layout, off, plen, &bno, req, op); if (r < 0) return r; @@ -452,7 +450,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, return ERR_PTR(-ENOMEM); /* calculate max write size */ - r = calc_layout(osdc, vino, layout, off, plen, req, ops); + r = calc_layout(vino, layout, off, plen, req, ops); if (r < 0) return ERR_PTR(r); req->r_file_layout = *layout; /* keep a copy */ -- cgit v1.2.3 From d178a9e74006e80f568d87e29f2a68f14fc7cbb1 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 13 Nov 2012 21:11:15 -0600 Subject: libceph: don't set flags in ceph_osdc_alloc_request() The only thing ceph_osdc_alloc_request() really does with the flags value it is passed is assign it to the newly-created osd request structure. Do that in the caller instead. Both callers subsequently call ceph_osdc_build_request(), so have that function (instead of ceph_osdc_alloc_request()) issue a warning if a request comes through with neither the read nor write flags set. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index cd9c28387de..77ce1edaa07 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -171,7 +171,6 @@ static int get_num_ops(struct ceph_osd_req_op *ops) } struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, - int flags, struct ceph_snap_context *snapc, struct ceph_osd_req_op *ops, bool use_mempool, @@ -208,10 +207,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); - req->r_flags = flags; - - WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); - /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); @@ -347,6 +342,8 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 data_len = 0; int i; + WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); + head = msg->front.iov_base; head->snapid = cpu_to_le64(snap_id); op = (void *)(head + 1); @@ -442,12 +439,12 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, } else ops[1].op = 0; - req = ceph_osdc_alloc_request(osdc, flags, - snapc, ops, + req = ceph_osdc_alloc_request(osdc, snapc, ops, use_mempool, GFP_NOFS, NULL, NULL); if (!req) return ERR_PTR(-ENOMEM); + req->r_flags = flags; /* calculate max write size */ r = calc_layout(vino, layout, off, plen, req, ops); -- cgit v1.2.3 From 54a5400721da7fa5a16cea151aade5bdfee74111 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 13 Nov 2012 21:11:15 -0600 Subject: libceph: don't set pages or bio in ceph_osdc_alloc_request() Only one of the two callers of ceph_osdc_alloc_request() provides page or bio data for its payload. And essentially all that function was doing with those arguments was assigning them to fields in the osd request structure. Simplify ceph_osdc_alloc_request() by having the caller take care of making those assignments Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 77ce1edaa07..bdc3bb12bfd 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -174,9 +174,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_snap_context *snapc, struct ceph_osd_req_op *ops, bool use_mempool, - gfp_t gfp_flags, - struct page **pages, - struct bio *bio) + gfp_t gfp_flags) { struct ceph_osd_request *req; struct ceph_msg *msg; @@ -237,13 +235,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, memset(msg->front.iov_base, 0, msg->front.iov_len); req->r_request = msg; - req->r_pages = pages; -#ifdef CONFIG_BLOCK - if (bio) { - req->r_bio = bio; - bio_get(req->r_bio); - } -#endif return req; } @@ -439,9 +430,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, } else ops[1].op = 0; - req = ceph_osdc_alloc_request(osdc, snapc, ops, - use_mempool, - GFP_NOFS, NULL, NULL); + req = ceph_osdc_alloc_request(osdc, snapc, ops, use_mempool, GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); req->r_flags = flags; -- cgit v1.2.3 From ae7ca4a35b1f5df86e2c32b2cfc01a8d528c7b8c Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 13 Nov 2012 21:11:15 -0600 Subject: libceph: pass num_op with ops Both ceph_osdc_alloc_request() and ceph_osdc_build_request() are provided an array of ceph osd request operations. Rather than just passing the number of operations in the array, the caller is required append an additional zeroed operation structure to signal the end of the array. All callers know the number of operations at the time these functions are called, so drop the silly zero entry and supply that number directly. As a result, get_num_ops() is no longer needed. This also means that ceph_osdc_alloc_request() never uses its ops argument, so that can be dropped. Also rbd_create_rw_ops() no longer needs to add one to reserve room for the additional op. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 43 +++++++++++++++---------------------------- 1 file changed, 15 insertions(+), 28 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index bdc3bb12bfd..500ae8b4932 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -160,25 +160,14 @@ void ceph_osdc_release_request(struct kref *kref) } EXPORT_SYMBOL(ceph_osdc_release_request); -static int get_num_ops(struct ceph_osd_req_op *ops) -{ - int i = 0; - - while (ops[i].op) - i++; - - return i; -} - struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_snap_context *snapc, - struct ceph_osd_req_op *ops, + unsigned int num_op, bool use_mempool, gfp_t gfp_flags) { struct ceph_osd_request *req; struct ceph_msg *msg; - int num_op = get_num_ops(ops); size_t msg_size = sizeof(struct ceph_osd_request_head); msg_size += num_op*sizeof(struct ceph_osd_op); @@ -317,7 +306,7 @@ static void osd_req_encode_op(struct ceph_osd_request *req, * */ void ceph_osdc_build_request(struct ceph_osd_request *req, - u64 off, u64 len, + u64 off, u64 len, unsigned int num_op, struct ceph_osd_req_op *src_ops, struct ceph_snap_context *snapc, u64 snap_id, struct timespec *mtime) @@ -327,7 +316,6 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, struct ceph_osd_req_op *src_op; struct ceph_osd_op *op; void *p; - int num_op = get_num_ops(src_ops); size_t msg_size = sizeof(*head) + num_op*sizeof(*op); int flags = req->r_flags; u64 data_len = 0; @@ -346,20 +334,17 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, head->flags = cpu_to_le32(flags); if (flags & CEPH_OSD_FLAG_WRITE) ceph_encode_timespec(&head->mtime, mtime); + BUG_ON(num_op > (unsigned int) ((u16) -1)); head->num_ops = cpu_to_le16(num_op); - /* fill in oid */ head->object_len = cpu_to_le32(req->r_oid_len); memcpy(p, req->r_oid, req->r_oid_len); p += req->r_oid_len; src_op = src_ops; - while (src_op->op) { - osd_req_encode_op(req, op, src_op); - src_op++; - op++; - } + while (num_op--) + osd_req_encode_op(req, op++, src_op++); data_len += req->r_trail.length; @@ -414,23 +399,24 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, bool use_mempool, int num_reply, int page_align) { - struct ceph_osd_req_op ops[3]; + struct ceph_osd_req_op ops[2]; struct ceph_osd_request *req; + unsigned int num_op = 1; int r; + memset(&ops, 0, sizeof ops); + ops[0].op = opcode; ops[0].extent.truncate_seq = truncate_seq; ops[0].extent.truncate_size = truncate_size; - ops[0].payload_len = 0; if (do_sync) { ops[1].op = CEPH_OSD_OP_STARTSYNC; - ops[1].payload_len = 0; - ops[2].op = 0; - } else - ops[1].op = 0; + num_op++; + } - req = ceph_osdc_alloc_request(osdc, snapc, ops, use_mempool, GFP_NOFS); + req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, + GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); req->r_flags = flags; @@ -446,7 +432,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req->r_num_pages = calc_pages_for(page_align, *plen); req->r_page_alignment = page_align; - ceph_osdc_build_request(req, off, *plen, ops, snapc, vino.snap, mtime); + ceph_osdc_build_request(req, off, *plen, num_op, ops, + snapc, vino.snap, mtime); return req; } -- cgit v1.2.3 From 1ec3911dbd19076bcdfe5540096ff67f91a6ec02 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Fri, 25 Jan 2013 17:48:59 -0600 Subject: libceph: fix undefined behavior when using snprintf() The variable "str" is used as both the source and destination in function snprintf(), which is undefined behavior based on C11. The original description in C11 is: "If copying takes place between objects that overlap, the behavior is undefined." And, the function of ceph_osdmap_state_str() is to return the osdmap state, so it should return "doesn't exist" when all the conditions are not satisfied. I fix it in this patch. [elder@inktank.com: shortened the commit message] Signed-off-by: Cong Ding Reviewed-by: Alex Elder --- net/ceph/osdmap.c | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) (limited to 'net') diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 369f03ba9ee..3c61e21611d 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -13,26 +13,18 @@ char *ceph_osdmap_state_str(char *str, int len, int state) { - int flag = 0; - if (!len) - goto done; - - *str = '\0'; - if (state) { - if (state & CEPH_OSD_EXISTS) { - snprintf(str, len, "exists"); - flag = 1; - } - if (state & CEPH_OSD_UP) { - snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""), - "up"); - flag = 1; - } - } else { + return str; + + if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP)) + snprintf(str, len, "exists, up"); + else if (state & CEPH_OSD_EXISTS) + snprintf(str, len, "exists"); + else if (state & CEPH_OSD_UP) + snprintf(str, len, "up"); + else snprintf(str, len, "doesn't exist"); - } -done: + return str; } -- cgit v1.2.3 From 3ebc21f7bc2f9c0145bbbf0f12430b766a200f9f Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 31 Jan 2013 16:02:01 -0600 Subject: libceph: fix messenger CONFIG_BLOCK dependencies The ceph messenger has a few spots that are only used when bio messages are supported, and that's only when CONFIG_BLOCK is defined. This surrounds a couple of spots with #ifdef's that would cause a problem if CONFIG_BLOCK were not present in the kernel configuration. This resolves: http://tracker.ceph.com/issues/3976 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/messenger.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5ccf87ed8d6..8a62a559a2a 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -9,8 +9,9 @@ #include #include #include +#ifdef CONFIG_BLOCK #include -#include +#endif /* CONFIG_BLOCK */ #include #include @@ -2651,9 +2652,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, m->page_alignment = 0; m->pages = NULL; m->pagelist = NULL; +#ifdef CONFIG_BLOCK m->bio = NULL; m->bio_iter = NULL; m->bio_seg = 0; +#endif /* CONFIG_BLOCK */ m->trail = NULL; /* front */ -- cgit v1.2.3 From 72fe25e3460c8673984370208e0e6261101372d6 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 30 Jan 2013 11:13:33 -0600 Subject: libceph: add a compatibility check interface An upcoming change implements semantic change that could lead to a crash if an old version of the libceph kernel module is used with a new version of the rbd kernel module. In order to preclude that possibility, this adds a compatibilty check interface. If this interface doesn't exist, the modules are obviously not compatible. But if it does exist, this provides a way of letting the caller know whether it will operate properly with this libceph module. Perhaps confusingly, it returns false right now. The semantic change mentioned above will make it return true. This resolves: http://tracker.ceph.com/issues/3800 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/ceph_common.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'net') diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index ee71ea26777..a98c03ff853 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -26,6 +26,22 @@ #include "crypto.h" +/* + * Module compatibility interface. For now it doesn't do anything, + * but its existence signals a certain level of functionality. + * + * The data buffer is used to pass information both to and from + * libceph. The return value indicates whether libceph determines + * it is compatible with the caller (from another kernel module), + * given the provided data. + * + * The data pointer can be null. + */ +bool libceph_compatible(void *data) +{ + return false; +} +EXPORT_SYMBOL(libceph_compatible); /* * find filename portion of a path (/foo/bar/baz -> baz) -- cgit v1.2.3 From 1e32d34cfa6759df58b5f4002664241f2a0fef6a Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 30 Jan 2013 11:13:33 -0600 Subject: rbd: don't take extra bio reference for osd client Currently, if the OSD client finds an osd request has had a bio list attached to it, it drops a reference to it (or rather, to the first entry on that list) when the request is released. The code that added that reference (i.e., the rbd client) is therefore required to take an extra reference to that first bio structure. The osd client doesn't really do anything with the bio pointer other than transfer it from the osd request structure to outgoing (for writes) and ingoing (for reads) messages. So it really isn't the right place to be taking or dropping references. Furthermore, the rbd client already holds references to all bio structures it passes to the osd client, and holds them until the request is completed. So there's no need for this extra reference whatsoever. So remove the bio_put() call in ceph_osdc_release_request(), as well as its matching bio_get() call in rbd_osd_req_create(). This change could lead to a crash if old libceph.ko was used with new rbd.ko. Add a compatibility check at rbd initialization time to avoid this possibilty. This resolves: http://tracker.ceph.com/issues/3798 and http://tracker.ceph.com/issues/3799 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/ceph_common.c | 2 +- net/ceph/osd_client.c | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) (limited to 'net') diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index a98c03ff853..c236c235c4a 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -39,7 +39,7 @@ */ bool libceph_compatible(void *data) { - return false; + return true; } EXPORT_SYMBOL(libceph_compatible); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 500ae8b4932..ba03648533c 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -147,10 +147,6 @@ void ceph_osdc_release_request(struct kref *kref) if (req->r_own_pages) ceph_release_page_vector(req->r_pages, req->r_num_pages); -#ifdef CONFIG_BLOCK - if (req->r_bio) - bio_put(req->r_bio); -#endif ceph_put_snap_context(req->r_snapc); ceph_pagelist_release(&req->r_trail); if (req->r_mempool) -- cgit v1.2.3 From 9cbb1d7268afa997a7f96d779470cc57d28e1a13 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 31 Jan 2013 16:02:00 -0600 Subject: libceph: don't require r_num_pages for bio requests There is a check in the completion path for osd requests that ensures the number of pages allocated is enough to hold the amount of incoming data expected. For bio requests coming from rbd the "number of pages" is not really meaningful (although total length would be). So stop requiring that nr_pages be supplied for bio requests. This is done by checking whether the pages pointer is null before checking the value of nr_pages. Note that this value is passed on to the messenger, but there it's only used for debugging--it's never used for validation. While here, change another spot that used r_pages in a debug message inappropriately, and also invalidate the r_con_filling_msg pointer after dropping a reference to it. This resolves: http://tracker.ceph.com/issues/3875 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ba03648533c..d9d58bbe9f9 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -137,10 +137,11 @@ void ceph_osdc_release_request(struct kref *kref) if (req->r_request) ceph_msg_put(req->r_request); if (req->r_con_filling_msg) { - dout("%s revoking pages %p from con %p\n", __func__, - req->r_pages, req->r_con_filling_msg); + dout("%s revoking msg %p from con %p\n", __func__, + req->r_reply, req->r_con_filling_msg); ceph_msg_revoke_incoming(req->r_reply); req->r_con_filling_msg->ops->put(req->r_con_filling_msg); + req->r_con_filling_msg = NULL; } if (req->r_reply) ceph_msg_put(req->r_reply); @@ -1981,7 +1982,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, if (data_len > 0) { int want = calc_pages_for(req->r_page_alignment, data_len); - if (unlikely(req->r_num_pages < want)) { + if (req->r_pages && unlikely(req->r_num_pages < want)) { pr_warning("tid %lld reply has %d bytes %d pages, we" " had only %d pages ready\n", tid, data_len, want, req->r_num_pages); -- cgit v1.2.3 From 87f979d390f9ecfa3d0038a9f9a002a62f8a1895 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:29 -0600 Subject: ceph: kill ceph_osdc_writepages() "nofail" parameter There is only one caller of ceph_osdc_writepages(), and it always passes the value true as its "nofail" argument. Get rid of that argument and replace its use in ceph_osdc_writepages() with the constant value true. This and a number of cleanup patches that follow resolve: http://tracker.ceph.com/issues/4126 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index d9d58bbe9f9..dd01b1340e9 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1867,7 +1867,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, struct page **pages, int num_pages, - int flags, int do_sync, bool nofail) + int flags, int do_sync) { struct ceph_osd_request *req; int rc = 0; @@ -1880,7 +1880,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, CEPH_OSD_FLAG_WRITE, snapc, do_sync, truncate_seq, truncate_size, mtime, - nofail, 1, page_align); + true, 1, page_align); if (IS_ERR(req)) return PTR_ERR(req); @@ -1889,7 +1889,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, dout("writepages %llu~%llu (%d pages)\n", off, len, req->r_num_pages); - rc = ceph_osdc_start_request(osdc, req, nofail); + rc = ceph_osdc_start_request(osdc, req, true); if (!rc) rc = ceph_osdc_wait_request(osdc, req); -- cgit v1.2.3 From fbf8685fb155e12a9f4d4b966c7b3442ed557687 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:29 -0600 Subject: ceph: kill ceph_osdc_writepages() "dosync" parameter There is only one caller of ceph_osdc_writepages(), and it always passes 0 as its "dosync" argument. Get rid of that argument and replace its use in ceph_osdc_writepages() with 0. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index dd01b1340e9..ac186b7c998 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1867,7 +1867,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, struct page **pages, int num_pages, - int flags, int do_sync) + int flags) { struct ceph_osd_request *req; int rc = 0; @@ -1878,7 +1878,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, CEPH_OSD_OP_WRITE, flags | CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, - snapc, do_sync, + snapc, 0, truncate_seq, truncate_size, mtime, true, 1, page_align); if (IS_ERR(req)) -- cgit v1.2.3 From 2480882611e3ab844563dd3d0a822227604ab8fe Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:29 -0600 Subject: ceph: kill ceph_osdc_writepages() "flags" parameter There is only one caller of ceph_osdc_writepages(), and it always passes 0 as its "flags" argument. Get rid of that argument and replace its use in ceph_osdc_writepages() with 0. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ac186b7c998..d4e3812bceb 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1866,8 +1866,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, u64 off, u64 len, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - struct page **pages, int num_pages, - int flags) + struct page **pages, int num_pages) { struct ceph_osd_request *req; int rc = 0; @@ -1876,8 +1875,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, BUG_ON(vino.snap != CEPH_NOSNAP); req = ceph_osdc_new_request(osdc, layout, vino, off, &len, CEPH_OSD_OP_WRITE, - flags | CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE, + CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, snapc, 0, truncate_seq, truncate_size, mtime, true, 1, page_align); -- cgit v1.2.3 From a3bea47e8bdd51d921e5b2045720d60140612c7c Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:29 -0600 Subject: ceph: kill ceph_osdc_new_request() "num_reply" parameter The "num_reply" parameter to ceph_osdc_new_request() is never used inside that function, so get rid of it. Note that ceph_sync_write() passes 2 for that argument, while all other callers pass 1. It doesn't matter, but perhaps someone should verify this doesn't indicate a problem. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index d4e3812bceb..d3e75138506 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -393,7 +393,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - bool use_mempool, int num_reply, + bool use_mempool, int page_align) { struct ceph_osd_req_op ops[2]; @@ -1837,7 +1837,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, req = ceph_osdc_new_request(osdc, layout, vino, off, plen, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, 0, truncate_seq, truncate_size, NULL, - false, 1, page_align); + false, page_align); if (IS_ERR(req)) return PTR_ERR(req); @@ -1878,7 +1878,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, snapc, 0, truncate_seq, truncate_size, mtime, - true, 1, page_align); + true, page_align); if (IS_ERR(req)) return PTR_ERR(req); -- cgit v1.2.3 From f9d251994522fed06f47855b534f21c07ecf7181 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:29 -0600 Subject: libceph: lock outside send_queued() Two of the three callers of the osd client's send_queued() function already hold the osd client mutex and drop it before the call. Change send_queued() so it assumes the caller holds the mutex, and update all callers accordingly. Rename it __send_queued() to match the convention used elsewhere in the file with respect to the lock. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index d3e75138506..edda0704f5a 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -23,7 +23,7 @@ static const struct ceph_connection_operations osd_con_ops; -static void send_queued(struct ceph_osd_client *osdc); +static void __send_queued(struct ceph_osd_client *osdc); static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); static void __register_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); @@ -554,8 +554,8 @@ static void osd_reset(struct ceph_connection *con) down_read(&osdc->map_sem); mutex_lock(&osdc->request_mutex); __kick_osd_requests(osdc, osd); + __send_queued(osdc); mutex_unlock(&osdc->request_mutex); - send_queued(osdc); up_read(&osdc->map_sem); } @@ -997,16 +997,13 @@ static void __send_request(struct ceph_osd_client *osdc, /* * Send any requests in the queue (req_unsent). */ -static void send_queued(struct ceph_osd_client *osdc) +static void __send_queued(struct ceph_osd_client *osdc) { struct ceph_osd_request *req, *tmp; - dout("send_queued\n"); - mutex_lock(&osdc->request_mutex); - list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { + dout("__send_queued\n"); + list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) __send_request(osdc, req); - } - mutex_unlock(&osdc->request_mutex); } /* @@ -1058,8 +1055,8 @@ static void handle_timeout(struct work_struct *work) } __schedule_osd_timeout(osdc); + __send_queued(osdc); mutex_unlock(&osdc->request_mutex); - send_queued(osdc); up_read(&osdc->map_sem); } @@ -1397,7 +1394,9 @@ done: if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) ceph_monc_request_next_osdmap(&osdc->client->monc); - send_queued(osdc); + mutex_lock(&osdc->request_mutex); + __send_queued(osdc); + mutex_unlock(&osdc->request_mutex); up_read(&osdc->map_sem); wake_up_all(&osdc->client->auth_wq); return; -- cgit v1.2.3 From 60789380ae833061803030d51952a5a341e4dade Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:29 -0600 Subject: libdeph: don't export ceph_osdc_init() or ceph_osdc_stop() The only callers of ceph_osdc_init() and ceph_osdc_stop() ceph_create_client() and ceph_destroy_client() (respectively) and they are in the same kernel module as those two functions. There's therefore no need to export those interfaces, so don't. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index edda0704f5a..0d67cd37173 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1799,7 +1799,6 @@ out_mempool: out: return err; } -EXPORT_SYMBOL(ceph_osdc_init); void ceph_osdc_stop(struct ceph_osd_client *osdc) { @@ -1816,7 +1815,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op_reply); } -EXPORT_SYMBOL(ceph_osdc_stop); /* * Read some contiguous pages. If we cross a stripe boundary, shorten -- cgit v1.2.3 From 60e56f138180e72fa8487d4b9c1c916013494f46 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:29 -0600 Subject: libceph: kill ceph_calc_raw_layout() There is no caller of ceph_calc_raw_layout() outside of libceph, so there's no need to export from the module. Furthermore, there is only one caller, in calc_layout(), and it is not much more than a simple wrapper for that function. So get rid of ceph_calc_raw_layout() and embed it instead within calc_layout(). While touching "osd_client.c", get rid of the unnecessary forward declaration of __send_request(). Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 77 +++++++++++++++++++++------------------------------ 1 file changed, 32 insertions(+), 45 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 0d67cd37173..cd3a489b743 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -38,49 +38,6 @@ static int op_has_extent(int op) op == CEPH_OSD_OP_WRITE); } -int ceph_calc_raw_layout(struct ceph_file_layout *layout, - u64 off, u64 *plen, u64 *bno, - struct ceph_osd_request *req, - struct ceph_osd_req_op *op) -{ - u64 orig_len = *plen; - u64 objoff, objlen; /* extent in object */ - int r; - - /* object extent? */ - r = ceph_calc_file_object_mapping(layout, off, orig_len, bno, - &objoff, &objlen); - if (r < 0) - return r; - if (objlen < orig_len) { - *plen = objlen; - dout(" skipping last %llu, final file extent %llu~%llu\n", - orig_len - *plen, off, *plen); - } - - if (op_has_extent(op->op)) { - u32 osize = le32_to_cpu(layout->fl_object_size); - op->extent.offset = objoff; - op->extent.length = objlen; - if (op->extent.truncate_size <= off - objoff) { - op->extent.truncate_size = 0; - } else { - op->extent.truncate_size -= off - objoff; - if (op->extent.truncate_size > osize) - op->extent.truncate_size = osize; - } - } - req->r_num_pages = calc_pages_for(off, *plen); - req->r_page_alignment = off & ~PAGE_MASK; - if (op->op == CEPH_OSD_OP_WRITE) - op->payload_len = *plen; - - dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", - *bno, objoff, objlen, req->r_num_pages); - return 0; -} -EXPORT_SYMBOL(ceph_calc_raw_layout); - /* * Implement client access to distributed object storage cluster. * @@ -112,12 +69,42 @@ static int calc_layout(struct ceph_vino vino, struct ceph_osd_request *req, struct ceph_osd_req_op *op) { - u64 bno; + u64 orig_len = *plen; + u64 bno = 0; + u64 objoff = 0; + u64 objlen = 0; int r; - r = ceph_calc_raw_layout(layout, off, plen, &bno, req, op); + /* object extent? */ + r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno, + &objoff, &objlen); if (r < 0) return r; + if (objlen < orig_len) { + *plen = objlen; + dout(" skipping last %llu, final file extent %llu~%llu\n", + orig_len - *plen, off, *plen); + } + + if (op_has_extent(op->op)) { + u32 osize = le32_to_cpu(layout->fl_object_size); + op->extent.offset = objoff; + op->extent.length = objlen; + if (op->extent.truncate_size <= off - objoff) { + op->extent.truncate_size = 0; + } else { + op->extent.truncate_size -= off - objoff; + if (op->extent.truncate_size > osize) + op->extent.truncate_size = osize; + } + } + req->r_num_pages = calc_pages_for(off, *plen); + req->r_page_alignment = off & ~PAGE_MASK; + if (op->op == CEPH_OSD_OP_WRITE) + op->payload_len = *plen; + + dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", + bno, objoff, objlen, req->r_num_pages); snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); req->r_oid_len = strlen(req->r_oid); -- cgit v1.2.3 From 3c663bbdcdf9296e0fe3362acb9e81f49d7b72c6 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:30 -0600 Subject: libceph: kill ceph_osdc_create_event() "one_shot" parameter There is only one caller of ceph_osdc_create_event(), and it provides 0 as its "one_shot" argument. Get rid of that argument and just use 0 in its place. Replace the code in handle_watch_notify() that executes if one_shot is nonzero in the event with a BUG_ON() call. While modifying "osd_client.c", give handle_watch_notify() static scope. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index cd3a489b743..4322faa7c81 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1477,8 +1477,7 @@ static void __remove_event(struct ceph_osd_event *event) int ceph_osdc_create_event(struct ceph_osd_client *osdc, void (*event_cb)(u64, u64, u8, void *), - int one_shot, void *data, - struct ceph_osd_event **pevent) + void *data, struct ceph_osd_event **pevent) { struct ceph_osd_event *event; @@ -1488,7 +1487,7 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc, dout("create_event %p\n", event); event->cb = event_cb; - event->one_shot = one_shot; + event->one_shot = 0; event->data = data; event->osdc = osdc; INIT_LIST_HEAD(&event->osd_node); @@ -1541,7 +1540,8 @@ static void do_event_work(struct work_struct *work) /* * Process osd watch notifications */ -void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) +static void handle_watch_notify(struct ceph_osd_client *osdc, + struct ceph_msg *msg) { void *p, *end; u8 proto_ver; @@ -1562,9 +1562,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) spin_lock(&osdc->event_lock); event = __find_event(osdc, cookie); if (event) { + BUG_ON(event->one_shot); get_event(event); - if (event->one_shot) - __remove_event(event); } spin_unlock(&osdc->event_lock); dout("handle_watch_notify cookie %lld ver %lld event %p\n", -- cgit v1.2.3 From 2d2f522699fe8b827087941eb31b9a12cf465f17 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:30 -0600 Subject: libceph: kill ceph_osdc_wait_event() There are no actual users of ceph_osdc_wait_event(). This would have been one-shot events, but we no longer support those so just get rid of this function. Since this leaves nothing else that waits for the completion of an event, we can get rid of the completion in a struct ceph_osd_event. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 18 ------------------ 1 file changed, 18 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 4322faa7c81..ad6b8b35f5c 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1494,7 +1494,6 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc, RB_CLEAR_NODE(&event->node); kref_init(&event->kref); /* one ref for us */ kref_get(&event->kref); /* one ref for the caller */ - init_completion(&event->completion); spin_lock(&osdc->event_lock); event->cookie = ++osdc->event_count; @@ -1530,7 +1529,6 @@ static void do_event_work(struct work_struct *work) dout("do_event_work completing %p\n", event); event->cb(ver, notify_id, opcode, event->data); - complete(&event->completion); dout("do_event_work completed %p\n", event); ceph_osdc_put_event(event); kfree(event_work); @@ -1588,7 +1586,6 @@ static void handle_watch_notify(struct ceph_osd_client *osdc, return; done_err: - complete(&event->completion); ceph_osdc_put_event(event); return; @@ -1597,21 +1594,6 @@ bad: return; } -int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout) -{ - int err; - - dout("wait_event %p\n", event); - err = wait_for_completion_interruptible_timeout(&event->completion, - timeout * HZ); - ceph_osdc_put_event(event); - if (err > 0) - err = 0; - dout("wait_event %p returns %d\n", event, err); - return err; -} -EXPORT_SYMBOL(ceph_osdc_wait_event); - /* * Register request, send initial attempt. */ -- cgit v1.2.3 From 4b568b1aaf23d0ce64b98d01d5ad1bcc7694440a Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:30 -0600 Subject: libceph: add ceph_osd_state_name() Add the definition of ceph_osd_state_name(), to match its counterpart in user space. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/ceph_strings.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'net') diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c index 3fbda04de29..833075cff4e 100644 --- a/net/ceph/ceph_strings.c +++ b/net/ceph/ceph_strings.c @@ -68,6 +68,21 @@ const char *ceph_osd_op_name(int op) return "???"; } +const char *ceph_osd_state_name(int s) +{ + switch (s) { + case CEPH_OSD_EXISTS: + return "exists"; + case CEPH_OSD_UP: + return "up"; + case CEPH_OSD_AUTOOUT: + return "autoout"; + case CEPH_OSD_NEW: + return "new"; + default: + return "???"; + } +} const char *ceph_pool_op_name(int op) { -- cgit v1.2.3 From 2979ddb11befcd757a6ab5a04fad9e264560385b Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:30 -0600 Subject: libceph: update ceph_osd_op_name() Update ceph_osd_op_name() to include the newly-added definitions in "rados.h", and to match its counterpart in the user space code. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/ceph_strings.c | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) (limited to 'net') diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c index 833075cff4e..1348df96fe1 100644 --- a/net/ceph/ceph_strings.c +++ b/net/ceph/ceph_strings.c @@ -21,9 +21,15 @@ const char *ceph_osd_op_name(int op) switch (op) { case CEPH_OSD_OP_READ: return "read"; case CEPH_OSD_OP_STAT: return "stat"; + case CEPH_OSD_OP_MAPEXT: return "mapext"; + case CEPH_OSD_OP_SPARSE_READ: return "sparse-read"; + case CEPH_OSD_OP_NOTIFY: return "notify"; + case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack"; + case CEPH_OSD_OP_ASSERT_VER: return "assert-version"; case CEPH_OSD_OP_MASKTRUNC: return "masktrunc"; + case CEPH_OSD_OP_CREATE: return "create"; case CEPH_OSD_OP_WRITE: return "write"; case CEPH_OSD_OP_DELETE: return "delete"; case CEPH_OSD_OP_TRUNCATE: return "truncate"; @@ -39,6 +45,11 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_TMAPUP: return "tmapup"; case CEPH_OSD_OP_TMAPGET: return "tmapget"; case CEPH_OSD_OP_TMAPPUT: return "tmapput"; + case CEPH_OSD_OP_WATCH: return "watch"; + + case CEPH_OSD_OP_CLONERANGE: return "clonerange"; + case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version"; + case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr"; case CEPH_OSD_OP_GETXATTR: return "getxattr"; case CEPH_OSD_OP_GETXATTRS: return "getxattrs"; @@ -53,6 +64,10 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_BALANCEREADS: return "balance-reads"; case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads"; case CEPH_OSD_OP_SCRUB: return "scrub"; + case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve"; + case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve"; + case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop"; + case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map"; case CEPH_OSD_OP_WRLOCK: return "wrlock"; case CEPH_OSD_OP_WRUNLOCK: return "wrunlock"; @@ -64,6 +79,15 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_CALL: return "call"; case CEPH_OSD_OP_PGLS: return "pgls"; + case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter"; + case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys"; + case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals"; + case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header"; + case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys"; + case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals"; + case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header"; + case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear"; + case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys"; } return "???"; } -- cgit v1.2.3 From 4c46459cae3b945e6e167f3f3a12b68f55cc5937 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:30 -0600 Subject: libceph: report defined but unsupported osd ops If osd_req_encode_op() is given any opcode it doesn't recognize it reports an error. This patch fleshes out that routine to distinguish between well-defined but unsupported values and values that are simply bogus. This and the next commit are related to: http://tracker.ceph.com/issues/4126 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ad6b8b35f5c..ac7bcbf1957 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -281,6 +281,60 @@ static void osd_req_encode_op(struct ceph_osd_request *req, pr_err("unrecognized osd opcode %d\n", dst->op); WARN_ON(1); break; + case CEPH_OSD_OP_STAT: + case CEPH_OSD_OP_MAPEXT: + case CEPH_OSD_OP_MASKTRUNC: + case CEPH_OSD_OP_SPARSE_READ: + case CEPH_OSD_OP_ASSERT_VER: + case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_TRUNCATE: + case CEPH_OSD_OP_ZERO: + case CEPH_OSD_OP_DELETE: + case CEPH_OSD_OP_APPEND: + case CEPH_OSD_OP_SETTRUNC: + case CEPH_OSD_OP_TRIMTRUNC: + case CEPH_OSD_OP_TMAPUP: + case CEPH_OSD_OP_TMAPPUT: + case CEPH_OSD_OP_TMAPGET: + case CEPH_OSD_OP_CREATE: + case CEPH_OSD_OP_OMAPGETKEYS: + case CEPH_OSD_OP_OMAPGETVALS: + case CEPH_OSD_OP_OMAPGETHEADER: + case CEPH_OSD_OP_OMAPGETVALSBYKEYS: + case CEPH_OSD_OP_MODE_RD: + case CEPH_OSD_OP_OMAPSETVALS: + case CEPH_OSD_OP_OMAPSETHEADER: + case CEPH_OSD_OP_OMAPCLEAR: + case CEPH_OSD_OP_OMAPRMKEYS: + case CEPH_OSD_OP_OMAP_CMP: + case CEPH_OSD_OP_CLONERANGE: + case CEPH_OSD_OP_ASSERT_SRC_VERSION: + case CEPH_OSD_OP_SRC_CMPXATTR: + case CEPH_OSD_OP_GETXATTRS: + case CEPH_OSD_OP_SETXATTRS: + case CEPH_OSD_OP_RESETXATTRS: + case CEPH_OSD_OP_RMXATTR: + case CEPH_OSD_OP_PULL: + case CEPH_OSD_OP_PUSH: + case CEPH_OSD_OP_BALANCEREADS: + case CEPH_OSD_OP_UNBALANCEREADS: + case CEPH_OSD_OP_SCRUB: + case CEPH_OSD_OP_SCRUB_RESERVE: + case CEPH_OSD_OP_SCRUB_UNRESERVE: + case CEPH_OSD_OP_SCRUB_STOP: + case CEPH_OSD_OP_SCRUB_MAP: + case CEPH_OSD_OP_WRLOCK: + case CEPH_OSD_OP_WRUNLOCK: + case CEPH_OSD_OP_RDLOCK: + case CEPH_OSD_OP_RDUNLOCK: + case CEPH_OSD_OP_UPLOCK: + case CEPH_OSD_OP_DNLOCK: + case CEPH_OSD_OP_PGLS: + case CEPH_OSD_OP_PGLS_FILTER: + pr_err("unsupported osd opcode %s\n", + ceph_osd_op_name(dst->op)); + WARN_ON(1); + break; } dst->payload_len = cpu_to_le32(src->payload_len); } -- cgit v1.2.3 From a9f36c3ed48dbfbad4cf8ba92142873b62923300 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 15 Feb 2013 11:42:30 -0600 Subject: libceph: remove dead code in osd_req_encode_op() In osd_req_encode_op() there are a few cases that handle osd opcodes that are never used in the kernel. The presence of this code gives the impression it's correct (which really can't be assumed), and may impose some unnecessary restrictions on some upcoming refactoring of this code. So delete this effectively dead code, and report uses of the previously handled cases as unsupported. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 31 +++++-------------------------- 1 file changed, 5 insertions(+), 26 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ac7bcbf1957..bbd157592c6 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -231,19 +231,6 @@ static void osd_req_encode_op(struct ceph_osd_request *req, dst->extent.truncate_seq = cpu_to_le32(src->extent.truncate_seq); break; - - case CEPH_OSD_OP_GETXATTR: - case CEPH_OSD_OP_SETXATTR: - case CEPH_OSD_OP_CMPXATTR: - dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); - dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); - dst->xattr.cmp_op = src->xattr.cmp_op; - dst->xattr.cmp_mode = src->xattr.cmp_mode; - ceph_pagelist_append(&req->r_trail, src->xattr.name, - src->xattr.name_len); - ceph_pagelist_append(&req->r_trail, src->xattr.val, - src->xattr.value_len); - break; case CEPH_OSD_OP_CALL: dst->cls.class_len = src->cls.class_len; dst->cls.method_len = src->cls.method_len; @@ -256,21 +243,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req, ceph_pagelist_append(&req->r_trail, src->cls.indata, src->cls.indata_len); break; - case CEPH_OSD_OP_ROLLBACK: - dst->snap.snapid = cpu_to_le64(src->snap.snapid); - break; case CEPH_OSD_OP_STARTSYNC: break; - case CEPH_OSD_OP_NOTIFY: - { - __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); - __le32 timeout = cpu_to_le32(src->watch.timeout); - - ceph_pagelist_append(&req->r_trail, - &prot_ver, sizeof(prot_ver)); - ceph_pagelist_append(&req->r_trail, - &timeout, sizeof(timeout)); - } case CEPH_OSD_OP_NOTIFY_ACK: case CEPH_OSD_OP_WATCH: dst->watch.cookie = cpu_to_le64(src->watch.cookie); @@ -285,6 +259,7 @@ static void osd_req_encode_op(struct ceph_osd_request *req, case CEPH_OSD_OP_MAPEXT: case CEPH_OSD_OP_MASKTRUNC: case CEPH_OSD_OP_SPARSE_READ: + case CEPH_OSD_OP_NOTIFY: case CEPH_OSD_OP_ASSERT_VER: case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_TRUNCATE: @@ -297,6 +272,7 @@ static void osd_req_encode_op(struct ceph_osd_request *req, case CEPH_OSD_OP_TMAPPUT: case CEPH_OSD_OP_TMAPGET: case CEPH_OSD_OP_CREATE: + case CEPH_OSD_OP_ROLLBACK: case CEPH_OSD_OP_OMAPGETKEYS: case CEPH_OSD_OP_OMAPGETVALS: case CEPH_OSD_OP_OMAPGETHEADER: @@ -310,7 +286,10 @@ static void osd_req_encode_op(struct ceph_osd_request *req, case CEPH_OSD_OP_CLONERANGE: case CEPH_OSD_OP_ASSERT_SRC_VERSION: case CEPH_OSD_OP_SRC_CMPXATTR: + case CEPH_OSD_OP_GETXATTR: case CEPH_OSD_OP_GETXATTRS: + case CEPH_OSD_OP_CMPXATTR: + case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_SETXATTRS: case CEPH_OSD_OP_RESETXATTRS: case CEPH_OSD_OP_RMXATTR: -- cgit v1.2.3 From f44246e394eadf0754bd6716f8ba1fabf362a87d Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 14 Feb 2013 12:16:43 -0600 Subject: libceph: simplify data length calculation Simplify the way the data length recorded in a message header is calculated in ceph_osdc_build_request(). Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index bbd157592c6..b58748ec405 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -335,7 +335,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, void *p; size_t msg_size = sizeof(*head) + num_op*sizeof(*op); int flags = req->r_flags; - u64 data_len = 0; + u64 data_len; int i; WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); @@ -363,8 +363,6 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, while (num_op--) osd_req_encode_op(req, op++, src_op++); - data_len += req->r_trail.length; - if (snapc) { head->snap_seq = cpu_to_le64(snapc->seq); head->num_snaps = cpu_to_le32(snapc->num_snaps); @@ -374,14 +372,12 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, } } + data_len = req->r_trail.length; if (flags & CEPH_OSD_FLAG_WRITE) { req->r_request->hdr.data_off = cpu_to_le16(off); - req->r_request->hdr.data_len = cpu_to_le32(len + data_len); - } else if (data_len) { - req->r_request->hdr.data_off = 0; - req->r_request->hdr.data_len = cpu_to_le32(data_len); + data_len += len; } - + req->r_request->hdr.data_len = cpu_to_le32(data_len); req->r_request->page_alignment = req->r_page_alignment; BUG_ON(p > msg->front.iov_base + msg->front.iov_len); -- cgit v1.2.3 From fbfab53966b279f9cdb36b96ffa1e22f042c96ff Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 8 Feb 2013 09:55:48 -0600 Subject: libceph: allow STAT osd operations Add support for CEPH_OSD_OP_STAT operations in the osd client and in rbd. This operation sends no data to the osd; everything required is encoded in identity of the target object. The result will be ENOENT if the object doesn't exist. If it does exist and no other error occurs the server returns the size and last modification time of the target object as output data (in little endian format). The size is a 64 bit unsigned and the time is ceph_timespec structure (two unsigned 32-bit integers, representing a seconds and nanoseconds value). This resolves: http://tracker.ceph.com/issues/4007 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/osd_client.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index b58748ec405..39629b66f3b 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -220,6 +220,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req, dst->op = cpu_to_le16(src->op); switch (src->op) { + case CEPH_OSD_OP_STAT: + break; case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: dst->extent.offset = @@ -255,7 +257,6 @@ static void osd_req_encode_op(struct ceph_osd_request *req, pr_err("unrecognized osd opcode %d\n", dst->op); WARN_ON(1); break; - case CEPH_OSD_OP_STAT: case CEPH_OSD_OP_MAPEXT: case CEPH_OSD_OP_MASKTRUNC: case CEPH_OSD_OP_SPARSE_READ: -- cgit v1.2.3 From b324814e8436772cb3367b14149ba003a9954525 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 6 Feb 2013 13:11:38 -0600 Subject: libceph: use void pointers in page vector functions The functions used for working with ceph page vectors are defined with char pointers, but they're really intended to operate on untyped data. Change the types of these function parameters to (void *) to reflect this. (Note that the functions now assume void pointer arithmetic works like arithmetic on char pointers.) Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/pagevec.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'net') diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index cd9c21df87d..5b20be979c1 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -12,7 +12,7 @@ /* * build a vector of user pages */ -struct page **ceph_get_direct_page_vector(const char __user *data, +struct page **ceph_get_direct_page_vector(const void __user *data, int num_pages, bool write_page) { struct page **pages; @@ -93,7 +93,7 @@ EXPORT_SYMBOL(ceph_alloc_page_vector); * copy user data into a page vector */ int ceph_copy_user_to_page_vector(struct page **pages, - const char __user *data, + const void __user *data, loff_t off, size_t len) { int i = 0; @@ -119,7 +119,7 @@ int ceph_copy_user_to_page_vector(struct page **pages, EXPORT_SYMBOL(ceph_copy_user_to_page_vector); int ceph_copy_to_page_vector(struct page **pages, - const char *data, + const void *data, loff_t off, size_t len) { int i = 0; @@ -143,7 +143,7 @@ int ceph_copy_to_page_vector(struct page **pages, EXPORT_SYMBOL(ceph_copy_to_page_vector); int ceph_copy_from_page_vector(struct page **pages, - char *data, + void *data, loff_t off, size_t len) { int i = 0; @@ -170,7 +170,7 @@ EXPORT_SYMBOL(ceph_copy_from_page_vector); * copy user data from a page vector into a user pointer */ int ceph_copy_page_vector_to_user(struct page **pages, - char __user *data, + void __user *data, loff_t off, size_t len) { int i = 0; -- cgit v1.2.3 From 903bb32e890237ca43ab847e561e5377cfe0fdb3 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 6 Feb 2013 13:11:38 -0600 Subject: libceph: drop return value from page vector copy routines The return values provided for ceph_copy_to_page_vector() and ceph_copy_from_page_vector() serve no purpose, so get rid of them. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/pagevec.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) (limited to 'net') diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index 5b20be979c1..815a2249cfa 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -118,17 +118,17 @@ int ceph_copy_user_to_page_vector(struct page **pages, } EXPORT_SYMBOL(ceph_copy_user_to_page_vector); -int ceph_copy_to_page_vector(struct page **pages, +void ceph_copy_to_page_vector(struct page **pages, const void *data, loff_t off, size_t len) { int i = 0; size_t po = off & ~PAGE_CACHE_MASK; size_t left = len; - size_t l; while (left > 0) { - l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + memcpy(page_address(pages[i]) + po, data, l); data += l; left -= l; @@ -138,21 +138,20 @@ int ceph_copy_to_page_vector(struct page **pages, i++; } } - return len; } EXPORT_SYMBOL(ceph_copy_to_page_vector); -int ceph_copy_from_page_vector(struct page **pages, +void ceph_copy_from_page_vector(struct page **pages, void *data, loff_t off, size_t len) { int i = 0; size_t po = off & ~PAGE_CACHE_MASK; size_t left = len; - size_t l; while (left > 0) { - l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + memcpy(data, page_address(pages[i]) + po, l); data += l; left -= l; @@ -162,7 +161,6 @@ int ceph_copy_from_page_vector(struct page **pages, i++; } } - return len; } EXPORT_SYMBOL(ceph_copy_from_page_vector); -- cgit v1.2.3 From c9ffc77adebf9dfe3026ede6c8b3c61586b485b7 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 20 Feb 2013 10:25:12 -0600 Subject: libceph: define connection flag helpers Define and use functions that encapsulate operations performed on a connection's flags. This resolves: http://tracker.ceph.com/issues/4234 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/messenger.c | 107 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 29 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 8a62a559a2a..771d4c90446 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -98,6 +98,57 @@ #define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ #define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ +static bool con_flag_valid(unsigned long con_flag) +{ + switch (con_flag) { + case CON_FLAG_LOSSYTX: + case CON_FLAG_KEEPALIVE_PENDING: + case CON_FLAG_WRITE_PENDING: + case CON_FLAG_SOCK_CLOSED: + case CON_FLAG_BACKOFF: + return true; + default: + return false; + } +} + +static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + clear_bit(con_flag, &con->flags); +} + +static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + set_bit(con_flag, &con->flags); +} + +static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + return test_bit(con_flag, &con->flags); +} + +static bool con_flag_test_and_clear(struct ceph_connection *con, + unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + return test_and_clear_bit(con_flag, &con->flags); +} + +static bool con_flag_test_and_set(struct ceph_connection *con, + unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + return test_and_set_bit(con_flag, &con->flags); +} + /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; @@ -309,7 +360,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ - if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) { + if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); @@ -334,7 +385,7 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); - set_bit(CON_FLAG_SOCK_CLOSED, &con->flags); + con_flag_set(con, CON_FLAG_SOCK_CLOSED); queue_con(con); break; case TCP_ESTABLISHED: @@ -475,7 +526,7 @@ static int con_close_socket(struct ceph_connection *con) * received a socket close event before we had the chance to * shut the socket down. */ - clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags); + con_flag_clear(con, CON_FLAG_SOCK_CLOSED); con_sock_state_closed(con); return rc; @@ -539,11 +590,10 @@ void ceph_con_close(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); con->state = CON_STATE_CLOSED; - clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */ - clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); - clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); - clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); - clear_bit(CON_FLAG_BACKOFF, &con->flags); + con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ + con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); + con_flag_clear(con, CON_FLAG_WRITE_PENDING); + con_flag_clear(con, CON_FLAG_BACKOFF); reset_connection(con); con->peer_global_seq = 0; @@ -799,7 +849,7 @@ static void prepare_write_message(struct ceph_connection *con) /* no, queue up footer too and be done */ prepare_write_message_footer(con); - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } /* @@ -820,7 +870,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } /* @@ -831,7 +881,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } /* @@ -874,7 +924,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } static int prepare_write_connect(struct ceph_connection *con) @@ -924,7 +974,7 @@ static int prepare_write_connect(struct ceph_connection *con) auth->authorizer_buf); con->out_more = 0; - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); return 0; } @@ -1644,7 +1694,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) - set_bit(CON_FLAG_LOSSYTX, &con->flags); + con_flag_set(con, CON_FLAG_LOSSYTX); con->delay = 0; /* reset backoff memory */ @@ -2081,15 +2131,14 @@ do_next: prepare_write_ack(con); goto more; } - if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING, - &con->flags)) { + if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { prepare_write_keepalive(con); goto more; } } /* Nothing to do! */ - clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_clear(con, CON_FLAG_WRITE_PENDING); dout("try_write nothing else to write.\n"); ret = 0; out: @@ -2269,7 +2318,7 @@ static void queue_con(struct ceph_connection *con) static bool con_sock_closed(struct ceph_connection *con) { - if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) + if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) return false; #define CASE(x) \ @@ -2310,14 +2359,14 @@ restart: if (con_sock_closed(con)) goto fault; - if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { + if (con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) { dout("con_work %p backing off\n", con); ret = queue_con_delay(con, round_jiffies_relative(con->delay)); if (ret) { dout("con_work %p FAILED to back off %lu\n", con, con->delay); BUG_ON(ret == -ENOENT); - set_bit(CON_FLAG_BACKOFF, &con->flags); + con_flag_set(con, CON_FLAG_BACKOFF); } goto done; } @@ -2382,7 +2431,7 @@ static void ceph_fault(struct ceph_connection *con) con_close_socket(con); - if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) { + if (con_flag_test(con, CON_FLAG_LOSSYTX)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CON_STATE_CLOSED; goto out_unlock; @@ -2402,9 +2451,9 @@ static void ceph_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && - !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) { + !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); - clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_clear(con, CON_FLAG_WRITE_PENDING); con->state = CON_STATE_STANDBY; } else { /* retry after a delay. */ @@ -2413,7 +2462,7 @@ static void ceph_fault(struct ceph_connection *con) con->delay = BASE_DELAY_INTERVAL; else if (con->delay < MAX_DELAY_INTERVAL) con->delay *= 2; - set_bit(CON_FLAG_BACKOFF, &con->flags); + con_flag_set(con, CON_FLAG_BACKOFF); queue_con(con); } @@ -2470,8 +2519,8 @@ static void clear_standby(struct ceph_connection *con) dout("clear_standby %p and ++connect_seq\n", con); con->state = CON_STATE_PREOPEN; con->connect_seq++; - WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags)); - WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)); + WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); + WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); } } @@ -2512,7 +2561,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ - if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) + if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); @@ -2601,8 +2650,8 @@ void ceph_con_keepalive(struct ceph_connection *con) mutex_lock(&con->mutex); clear_standby(con); mutex_unlock(&con->mutex); - if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 && - test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) + if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 && + con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); -- cgit v1.2.3 From 154171678989950f6c392e126fa8006a145ed1cc Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 19 Feb 2013 12:25:56 -0600 Subject: libceph: eliminate sparse warnings Eliminate most of the problems in the libceph code that cause sparse to issue warnings. - Convert functions that are never referenced externally to have static scope. - Pass NULL rather than 0 for a pointer argument in one spot in ceph_monc_delete_snapid() This partially resolves: http://tracker.ceph.com/issues/4184 Reported-by: Fengguang Wu Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/crypto.c | 7 ++++--- net/ceph/messenger.c | 2 +- net/ceph/mon_client.c | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) (limited to 'net') diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c index af14cb42516..6e7a236525b 100644 --- a/net/ceph/crypto.c +++ b/net/ceph/crypto.c @@ -423,7 +423,8 @@ int ceph_encrypt2(struct ceph_crypto_key *secret, void *dst, size_t *dst_len, } } -int ceph_key_instantiate(struct key *key, struct key_preparsed_payload *prep) +static int ceph_key_instantiate(struct key *key, + struct key_preparsed_payload *prep) { struct ceph_crypto_key *ckey; size_t datalen = prep->datalen; @@ -458,12 +459,12 @@ err: return ret; } -int ceph_key_match(const struct key *key, const void *description) +static int ceph_key_match(const struct key *key, const void *description) { return strcmp(key->description, description) == 0; } -void ceph_key_destroy(struct key *key) { +static void ceph_key_destroy(struct key *key) { struct ceph_crypto_key *ckey = key->payload.data; ceph_crypto_key_destroy(ckey); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 771d4c90446..ed9e237d967 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -223,7 +223,7 @@ static void encode_my_addr(struct ceph_messenger *msgr) */ static struct workqueue_struct *ceph_msgr_wq; -void _ceph_msgr_exit(void) +static void _ceph_msgr_exit(void) { if (ceph_msgr_wq) { destroy_workqueue(ceph_msgr_wq); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 812eb3b46c1..aef5b1062be 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -697,7 +697,7 @@ int ceph_monc_delete_snapid(struct ceph_mon_client *monc, u32 pool, u64 snapid) { return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, - pool, snapid, 0, 0); + pool, snapid, NULL, 0); } -- cgit v1.2.3 From f20a39fd6e6356b4cf3c1650c4dc6c66c99d8bae Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 19 Feb 2013 12:25:57 -0600 Subject: libceph: encapsulate connection backoff Collect the code that tests for and implements a backoff delay for a ceph connection into a new function, ceph_backoff(). Make the debug output messages in that part of the code report things consistently by reporting a message in the socket closed case, and by making the one for PREOPEN state report the connection pointer like the rest. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/messenger.c | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index ed9e237d967..9a29d8a4bad 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2345,6 +2345,24 @@ static bool con_sock_closed(struct ceph_connection *con) return true; } +static bool con_backoff(struct ceph_connection *con) +{ + int ret; + + if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) + return false; + + ret = queue_con_delay(con, round_jiffies_relative(con->delay)); + if (ret) { + dout("%s: con %p FAILED to back off %lu\n", __func__, + con, con->delay); + BUG_ON(ret == -ENOENT); + con_flag_set(con, CON_FLAG_BACKOFF); + } + + return true; +} + /* * Do some work on a connection. Drop a connection ref when we're done. */ @@ -2356,21 +2374,14 @@ static void con_work(struct work_struct *work) mutex_lock(&con->mutex); restart: - if (con_sock_closed(con)) + if (con_sock_closed(con)) { + dout("%s: con %p SOCK_CLOSED\n", __func__, con); goto fault; - - if (con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) { - dout("con_work %p backing off\n", con); - ret = queue_con_delay(con, round_jiffies_relative(con->delay)); - if (ret) { - dout("con_work %p FAILED to back off %lu\n", con, - con->delay); - BUG_ON(ret == -ENOENT); - con_flag_set(con, CON_FLAG_BACKOFF); - } + } + if (con_backoff(con)) { + dout("%s: con %p BACKOFF\n", __func__, con); goto done; } - if (con->state == CON_STATE_STANDBY) { dout("con_work %p STANDBY\n", con); goto done; @@ -2381,7 +2392,7 @@ restart: goto done; } if (con->state == CON_STATE_PREOPEN) { - dout("con_work OPENING\n"); + dout("%s: con %p OPENING\n", __func__, con); BUG_ON(con->sock); } -- cgit v1.2.3 From 93209264203987cdd2c69d34df6eaa2cd184e283 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 19 Feb 2013 12:25:57 -0600 Subject: libceph: separate non-locked fault handling An error occurring on a ceph connection is treated as a fault, causing the connection to be reset. The initial part of this fault handling has to be done while holding the connection mutex, but it must then be dropped for the last part. Separate the part of this fault handling that executes without the lock into its own function, con_fault_finish(). Move the call to this new function, as well as call that drops the connection mutex, into ceph_fault(). Rename that function con_fault() to reflect that it's only handling the connection part of the fault handling. The motivation for this was a warning from sparse about the locking being done here. Rearranging things this way keeps all the mutex manipulation within ceph_fault(), and this stops sparse from complaining. This partially resolves: http://tracker.ceph.com/issues/4184 Reported-by: Fengguang Wu Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/messenger.c | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 9a29d8a4bad..c3b9060d484 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -166,7 +166,7 @@ static struct lock_class_key socket_class; static void queue_con(struct ceph_connection *con); static void con_work(struct work_struct *); -static void ceph_fault(struct ceph_connection *con); +static void con_fault(struct ceph_connection *con); /* * Nicely render a sockaddr as a string. An array of formatted @@ -2363,6 +2363,23 @@ static bool con_backoff(struct ceph_connection *con) return true; } +/* Finish fault handling; con->mutex must *not* be held here */ + +static void con_fault_finish(struct ceph_connection *con) +{ + /* + * in case we faulted due to authentication, invalidate our + * current tickets so that we can get new ones. + */ + if (con->auth_retry && con->ops->invalidate_authorizer) { + dout("calling invalidate_authorizer()\n"); + con->ops->invalidate_authorizer(con); + } + + if (con->ops->fault) + con->ops->fault(con); +} + /* * Do some work on a connection. Drop a connection ref when we're done. */ @@ -2419,7 +2436,9 @@ done_unlocked: return; fault: - ceph_fault(con); /* error/fault path */ + con_fault(con); + mutex_unlock(&con->mutex); + con_fault_finish(con); goto done_unlocked; } @@ -2428,8 +2447,7 @@ fault: * Generic error/fault handler. A retry mechanism is used with * exponential backoff */ -static void ceph_fault(struct ceph_connection *con) - __releases(con->mutex) +static void con_fault(struct ceph_connection *con) { pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); @@ -2445,7 +2463,7 @@ static void ceph_fault(struct ceph_connection *con) if (con_flag_test(con, CON_FLAG_LOSSYTX)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CON_STATE_CLOSED; - goto out_unlock; + return; } if (con->in_msg) { @@ -2476,20 +2494,6 @@ static void ceph_fault(struct ceph_connection *con) con_flag_set(con, CON_FLAG_BACKOFF); queue_con(con); } - -out_unlock: - mutex_unlock(&con->mutex); - /* - * in case we faulted due to authentication, invalidate our - * current tickets so that we can get new ones. - */ - if (con->auth_retry && con->ops->invalidate_authorizer) { - dout("calling invalidate_authorizer()\n"); - con->ops->invalidate_authorizer(con); - } - - if (con->ops->fault) - con->ops->fault(con); } -- cgit v1.2.3 From b6e7b6a11923bda6102b4e3e196693567944869c Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 19 Feb 2013 12:25:57 -0600 Subject: libceph: use a flag to indicate a fault has occurred This just rearranges the logic in con_work() a little bit so that a flag is used to indicate a fault has occurred. This allows both the fault and non-fault case to be handled the same way and avoids a couple of nearly consecutive gotos. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/messenger.c | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index c3b9060d484..18eb788bbb9 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2387,13 +2387,15 @@ static void con_work(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); + bool fault = false; int ret; mutex_lock(&con->mutex); restart: if (con_sock_closed(con)) { dout("%s: con %p SOCK_CLOSED\n", __func__, con); - goto fault; + fault = true; + goto done; } if (con_backoff(con)) { dout("%s: con %p BACKOFF\n", __func__, con); @@ -2418,7 +2420,8 @@ restart: goto restart; if (ret < 0) { con->error_msg = "socket error on read"; - goto fault; + fault = true; + goto done; } ret = try_write(con); @@ -2426,20 +2429,17 @@ restart: goto restart; if (ret < 0) { con->error_msg = "socket error on write"; - goto fault; + fault = true; } - done: + if (fault) + con_fault(con); mutex_unlock(&con->mutex); -done_unlocked: - con->ops->put(con); - return; -fault: - con_fault(con); - mutex_unlock(&con->mutex); - con_fault_finish(con); - goto done_unlocked; + if (fault) + con_fault_finish(con); + + con->ops->put(con); } -- cgit v1.2.3 From 49659416ba4fa8308bd29e453f54c3bcf8a0fbf1 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Tue, 19 Feb 2013 12:25:57 -0600 Subject: libceph: use a do..while loop in con_work() This just converts a manually-implemented loop into a do..while loop in con_work(). It also moves handling of EAGAIN inside the blocks where it's already been determined an error code was returned. Also update a few dout() calls near the affected code for consistency. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- net/ceph/messenger.c | 83 ++++++++++++++++++++++++++-------------------------- 1 file changed, 42 insertions(+), 41 deletions(-) (limited to 'net') diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 18eb788bbb9..2c0669fb54e 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2387,51 +2387,53 @@ static void con_work(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); - bool fault = false; - int ret; + bool fault; mutex_lock(&con->mutex); -restart: - if (con_sock_closed(con)) { - dout("%s: con %p SOCK_CLOSED\n", __func__, con); - fault = true; - goto done; - } - if (con_backoff(con)) { - dout("%s: con %p BACKOFF\n", __func__, con); - goto done; - } - if (con->state == CON_STATE_STANDBY) { - dout("con_work %p STANDBY\n", con); - goto done; - } - if (con->state == CON_STATE_CLOSED) { - dout("con_work %p CLOSED\n", con); - BUG_ON(con->sock); - goto done; - } - if (con->state == CON_STATE_PREOPEN) { - dout("%s: con %p OPENING\n", __func__, con); - BUG_ON(con->sock); - } + while (true) { + int ret; - ret = try_read(con); - if (ret == -EAGAIN) - goto restart; - if (ret < 0) { - con->error_msg = "socket error on read"; - fault = true; - goto done; - } + if ((fault = con_sock_closed(con))) { + dout("%s: con %p SOCK_CLOSED\n", __func__, con); + break; + } + if (con_backoff(con)) { + dout("%s: con %p BACKOFF\n", __func__, con); + break; + } + if (con->state == CON_STATE_STANDBY) { + dout("%s: con %p STANDBY\n", __func__, con); + break; + } + if (con->state == CON_STATE_CLOSED) { + dout("%s: con %p CLOSED\n", __func__, con); + BUG_ON(con->sock); + break; + } + if (con->state == CON_STATE_PREOPEN) { + dout("%s: con %p PREOPEN\n", __func__, con); + BUG_ON(con->sock); + } - ret = try_write(con); - if (ret == -EAGAIN) - goto restart; - if (ret < 0) { - con->error_msg = "socket error on write"; - fault = true; + ret = try_read(con); + if (ret < 0) { + if (ret == -EAGAIN) + continue; + con->error_msg = "socket error on read"; + fault = true; + break; + } + + ret = try_write(con); + if (ret < 0) { + if (ret == -EAGAIN) + continue; + con->error_msg = "socket error on write"; + fault = true; + } + + break; /* If we make it to here, we're done */ } -done: if (fault) con_fault(con); mutex_unlock(&con->mutex); @@ -2442,7 +2444,6 @@ done: con->ops->put(con); } - /* * Generic error/fault handler. A retry mechanism is used with * exponential backoff -- cgit v1.2.3 From 12979354a1d6ef25d86f381e4d5f9e103f29913a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 8 Jan 2013 09:15:10 -0800 Subject: libceph: rename ceph_pg -> ceph_pg_v1 Rename the old version this type to distinguish it from the new version. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/osd_client.c | 2 +- net/ceph/osdmap.c | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 39629b66f3b..e3ab8d60d08 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -914,7 +914,7 @@ static int __map_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req, int force_resend) { struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; - struct ceph_pg pgid; + struct ceph_pg_v1 pgid; int acting[CEPH_PG_MAX_SIZE]; int o = -1, num = 0; int err; diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 3c61e21611d..8c89ac25081 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -350,7 +350,7 @@ bad: * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid * to a set of osds) */ -static int pgid_cmp(struct ceph_pg l, struct ceph_pg r) +static int pgid_cmp(struct ceph_pg_v1 l, struct ceph_pg_v1 r) { u64 a = *(u64 *)&l; u64 b = *(u64 *)&r; @@ -389,7 +389,7 @@ static int __insert_pg_mapping(struct ceph_pg_mapping *new, } static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root, - struct ceph_pg pgid) + struct ceph_pg_v1 pgid) { struct rb_node *n = root->rb_node; struct ceph_pg_mapping *pg; @@ -411,7 +411,7 @@ static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root, return NULL; } -static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid) +static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg_v1 pgid) { struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid); @@ -721,7 +721,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ceph_decode_32_safe(p, end, len, bad); for (i = 0; i < len; i++) { int n, j; - struct ceph_pg pgid; + struct ceph_pg_v1 pgid; struct ceph_pg_mapping *pg; ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad); @@ -944,7 +944,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, while (len--) { struct ceph_pg_mapping *pg; int j; - struct ceph_pg pgid; + struct ceph_pg_v1 pgid; u32 pglen; ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad); ceph_decode_copy(p, &pgid, sizeof(pgid)); @@ -1079,7 +1079,7 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol, struct ceph_osdmap *osdmap) { unsigned int num, num_mask; - struct ceph_pg pgid; + struct ceph_pg_v1 pgid; int poolid = le32_to_cpu(fl->fl_pg_pool); struct ceph_pg_pool_info *pool; unsigned int ps; @@ -1108,7 +1108,7 @@ EXPORT_SYMBOL(ceph_calc_object_layout); * Calculate raw osd vector for the given pgid. Return pointer to osd * array, or NULL on failure. */ -static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, +static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg_v1 pgid, int *osds, int *num) { struct ceph_pg_mapping *pg; @@ -1163,7 +1163,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, /* * Return acting set for given pgid. */ -int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, +int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg_v1 pgid, int *acting) { int rawosds[CEPH_PG_MAX_SIZE], *osds; @@ -1184,7 +1184,7 @@ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, /* * Return primary osd for given pgid, or -1 if none. */ -int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) +int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg_v1 pgid) { int rawosds[CEPH_PG_MAX_SIZE], *osds; int i, num = CEPH_PG_MAX_SIZE; -- cgit v1.2.3 From 5b191d9914eb68257f47de9d5bfe099b77f0687c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sat, 23 Feb 2013 10:38:16 -0800 Subject: libceph: decode into cpu-native ceph_pg type Always decode data into our cpu-native ceph_pg type that has the correct field widths. Limit any remaining uses of ceph_pg_v1 to dealing with the legacy protocol. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/debugfs.c | 5 ++-- net/ceph/osd_client.c | 9 +++--- net/ceph/osdmap.c | 78 ++++++++++++++++++++++++++++----------------------- 3 files changed, 50 insertions(+), 42 deletions(-) (limited to 'net') diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 38b5dc1823d..61a9af634f8 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -131,10 +131,9 @@ static int osdc_show(struct seq_file *s, void *pp) req = rb_entry(p, struct ceph_osd_request, r_node); - seq_printf(s, "%lld\tosd%d\t%d.%x\t", req->r_tid, + seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid, req->r_osd ? req->r_osd->o_osd : -1, - le32_to_cpu(req->r_pgid.pool), - le16_to_cpu(req->r_pgid.ps)); + req->r_pgid.pool, req->r_pgid.seed); head = req->r_request->front.iov_base; op = (void *)(head + 1); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index e3ab8d60d08..1990834e518 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -914,7 +914,7 @@ static int __map_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req, int force_resend) { struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; - struct ceph_pg_v1 pgid; + struct ceph_pg pgid; int acting[CEPH_PG_MAX_SIZE]; int o = -1, num = 0; int err; @@ -926,7 +926,8 @@ static int __map_request(struct ceph_osd_client *osdc, list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; } - pgid = reqhead->layout.ol_pgid; + pgid.pool = le32_to_cpu(reqhead->layout.ol_pgid.pool); + pgid.seed = le16_to_cpu(reqhead->layout.ol_pgid.ps); req->r_pgid = pgid; err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); @@ -943,8 +944,8 @@ static int __map_request(struct ceph_osd_client *osdc, (req->r_osd == NULL && o == -1)) return 0; /* no change */ - dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n", - req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, + dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", + req->r_tid, pgid.pool, pgid.seed, o, req->r_osd ? req->r_osd->o_osd : -1); /* record full pg acting set */ diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 8c89ac25081..81118db5bd1 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -350,14 +350,15 @@ bad: * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid * to a set of osds) */ -static int pgid_cmp(struct ceph_pg_v1 l, struct ceph_pg_v1 r) +static int pgid_cmp(struct ceph_pg l, struct ceph_pg r) { - u64 a = *(u64 *)&l; - u64 b = *(u64 *)&r; - - if (a < b) + if (l.pool < r.pool) + return -1; + if (l.pool > r.pool) + return 1; + if (l.seed < r.seed) return -1; - if (a > b) + if (l.seed > r.seed) return 1; return 0; } @@ -389,7 +390,7 @@ static int __insert_pg_mapping(struct ceph_pg_mapping *new, } static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root, - struct ceph_pg_v1 pgid) + struct ceph_pg pgid) { struct rb_node *n = root->rb_node; struct ceph_pg_mapping *pg; @@ -403,25 +404,26 @@ static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root, } else if (c > 0) { n = n->rb_right; } else { - dout("__lookup_pg_mapping %llx got %p\n", - *(u64 *)&pgid, pg); + dout("__lookup_pg_mapping %lld.%x got %p\n", + pgid.pool, pgid.seed, pg); return pg; } } return NULL; } -static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg_v1 pgid) +static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid) { struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid); if (pg) { - dout("__remove_pg_mapping %llx %p\n", *(u64 *)&pgid, pg); + dout("__remove_pg_mapping %lld.%x %p\n", pgid.pool, pgid.seed, + pg); rb_erase(&pg->node, root); kfree(pg); return 0; } - dout("__remove_pg_mapping %llx dne\n", *(u64 *)&pgid); + dout("__remove_pg_mapping %lld.%x dne\n", pgid.pool, pgid.seed); return -ENOENT; } @@ -721,11 +723,14 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ceph_decode_32_safe(p, end, len, bad); for (i = 0; i < len; i++) { int n, j; - struct ceph_pg_v1 pgid; + struct ceph_pg pgid; + struct ceph_pg_v1 pgid_v1; struct ceph_pg_mapping *pg; ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad); - ceph_decode_copy(p, &pgid, sizeof(pgid)); + ceph_decode_copy(p, &pgid_v1, sizeof(pgid_v1)); + pgid.pool = le32_to_cpu(pgid_v1.pool); + pgid.seed = le16_to_cpu(pgid_v1.ps); n = ceph_decode_32(p); err = -EINVAL; if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) @@ -743,7 +748,8 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) err = __insert_pg_mapping(pg, &map->pg_temp); if (err) goto bad; - dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, len); + dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed, + len); } /* crush */ @@ -944,10 +950,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, while (len--) { struct ceph_pg_mapping *pg; int j; - struct ceph_pg_v1 pgid; + struct ceph_pg_v1 pgid_v1; + struct ceph_pg pgid; u32 pglen; ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad); - ceph_decode_copy(p, &pgid, sizeof(pgid)); + ceph_decode_copy(p, &pgid_v1, sizeof(pgid_v1)); + pgid.pool = le32_to_cpu(pgid_v1.pool); + pgid.seed = le16_to_cpu(pgid_v1.ps); pglen = ceph_decode_32(p); if (pglen) { @@ -973,8 +982,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, kfree(pg); goto bad; } - dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, - pglen); + dout(" added pg_temp %lld.%x len %d\n", pgid.pool, + pgid.seed, pglen); } else { /* remove */ __remove_pg_mapping(&map->pg_temp, pgid); @@ -1079,26 +1088,25 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol, struct ceph_osdmap *osdmap) { unsigned int num, num_mask; - struct ceph_pg_v1 pgid; - int poolid = le32_to_cpu(fl->fl_pg_pool); + struct ceph_pg pgid; struct ceph_pg_pool_info *pool; - unsigned int ps; BUG_ON(!osdmap); - pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); + pgid.pool = le32_to_cpu(fl->fl_pg_pool); + pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); if (!pool) return -EIO; - ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); + pgid.seed = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); num = le32_to_cpu(pool->v.pg_num); num_mask = pool->pg_num_mask; - pgid.ps = cpu_to_le16(ps); - pgid.preferred = cpu_to_le16(-1); - pgid.pool = fl->fl_pg_pool; - dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps); + dout("calc_object_layout '%s' pgid %lld.%x\n", oid, pgid.pool, + pgid.seed); - ol->ol_pgid = pgid; + ol->ol_pgid.ps = cpu_to_le16(pgid.seed); + ol->ol_pgid.pool = fl->fl_pg_pool; + ol->ol_pgid.preferred = cpu_to_le16(-1); ol->ol_stripe_unit = fl->fl_object_stripe_unit; return 0; } @@ -1108,7 +1116,7 @@ EXPORT_SYMBOL(ceph_calc_object_layout); * Calculate raw osd vector for the given pgid. Return pointer to osd * array, or NULL on failure. */ -static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg_v1 pgid, +static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, int *osds, int *num) { struct ceph_pg_mapping *pg; @@ -1116,8 +1124,8 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg_v1 pgid, int ruleno; unsigned int poolid, ps, pps, t, r; - poolid = le32_to_cpu(pgid.pool); - ps = le16_to_cpu(pgid.ps); + poolid = pgid.pool; + ps = pgid.seed; pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); if (!pool) @@ -1126,7 +1134,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg_v1 pgid, /* pg_temp? */ t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), pool->pgp_num_mask); - pgid.ps = cpu_to_le16(t); + pgid.seed = t; pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { *num = pg->len; @@ -1163,7 +1171,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg_v1 pgid, /* * Return acting set for given pgid. */ -int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg_v1 pgid, +int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, int *acting) { int rawosds[CEPH_PG_MAX_SIZE], *osds; @@ -1184,7 +1192,7 @@ int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg_v1 pgid, /* * Return primary osd for given pgid, or -1 if none. */ -int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg_v1 pgid) +int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) { int rawosds[CEPH_PG_MAX_SIZE], *osds; int i, num = CEPH_PG_MAX_SIZE; -- cgit v1.2.3 From 4f6a7e5ee1393ec4b243b39dac9f36992d161540 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sat, 23 Feb 2013 10:41:09 -0800 Subject: ceph: update support for PGID64, PGPOOL3, OSDENC protocol features Support (and require) the PGID64, PGPOOL3, and OSDENC protocol features. These have been present in ceph.git since v0.42, Feb 2012. Require these features to simplify support; nobody is running older userspace. Note that the new request and reply encoding is still not in place, so the new code is not yet functional. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/ceph_common.c | 6 +- net/ceph/debugfs.c | 6 +- net/ceph/osdmap.c | 162 ++++++++++++++++++++++++++----------------------- 3 files changed, 91 insertions(+), 83 deletions(-) (limited to 'net') diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index c236c235c4a..c5605ae9671 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -601,10 +601,8 @@ static int __init init_ceph_lib(void) if (ret < 0) goto out_crypto; - pr_info("loaded (mon/osd proto %d/%d, osdmap %d/%d %d/%d)\n", - CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL, - CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT, - CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT); + pr_info("loaded (mon/osd proto %d/%d)\n", + CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL); return 0; diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 61a9af634f8..f4d4b27d602 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -66,9 +66,9 @@ static int osdmap_show(struct seq_file *s, void *p) for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) { struct ceph_pg_pool_info *pool = rb_entry(n, struct ceph_pg_pool_info, node); - seq_printf(s, "pg_pool %d pg_num %d / %d, lpg_num %d / %d\n", - pool->id, pool->v.pg_num, pool->pg_num_mask, - pool->v.lpg_num, pool->lpg_num_mask); + seq_printf(s, "pg_pool %llu pg_num %d / %d\n", + (unsigned long long)pool->id, pool->pg_num, + pool->pg_num_mask); } for (i = 0; i < client->osdc.osdmap->max_osd; i++) { struct ceph_entity_addr *addr = diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 81118db5bd1..911919320d2 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -45,13 +45,8 @@ static int calc_bits_of(unsigned int t) */ static void calc_pg_masks(struct ceph_pg_pool_info *pi) { - pi->pg_num_mask = (1 << calc_bits_of(le32_to_cpu(pi->v.pg_num)-1)) - 1; - pi->pgp_num_mask = - (1 << calc_bits_of(le32_to_cpu(pi->v.pgp_num)-1)) - 1; - pi->lpg_num_mask = - (1 << calc_bits_of(le32_to_cpu(pi->v.lpg_num)-1)) - 1; - pi->lpgp_num_mask = - (1 << calc_bits_of(le32_to_cpu(pi->v.lpgp_num)-1)) - 1; + pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1; + pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1; } /* @@ -452,7 +447,7 @@ static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new) return 0; } -static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id) +static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id) { struct ceph_pg_pool_info *pi; struct rb_node *n = root->rb_node; @@ -508,24 +503,57 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) { - unsigned int n, m; + u8 ev, cv; + unsigned len, num; + void *pool_end; + + ceph_decode_need(p, end, 2 + 4, bad); + ev = ceph_decode_8(p); /* encoding version */ + cv = ceph_decode_8(p); /* compat version */ + if (ev < 5) { + pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); + return -EINVAL; + } + if (cv > 7) { + pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv); + return -EINVAL; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, bad); + pool_end = *p + len; - ceph_decode_copy(p, &pi->v, sizeof(pi->v)); - calc_pg_masks(pi); + pi->type = ceph_decode_8(p); + pi->size = ceph_decode_8(p); + pi->crush_ruleset = ceph_decode_8(p); + pi->object_hash = ceph_decode_8(p); - /* num_snaps * snap_info_t */ - n = le32_to_cpu(pi->v.num_snaps); - while (n--) { - ceph_decode_need(p, end, sizeof(u64) + 1 + sizeof(u64) + - sizeof(struct ceph_timespec), bad); - *p += sizeof(u64) + /* key */ - 1 + sizeof(u64) + /* u8, snapid */ - sizeof(struct ceph_timespec); - m = ceph_decode_32(p); /* snap name */ - *p += m; + pi->pg_num = ceph_decode_32(p); + pi->pgp_num = ceph_decode_32(p); + + *p += 4 + 4; /* skip lpg* */ + *p += 4; /* skip last_change */ + *p += 8 + 4; /* skip snap_seq, snap_epoch */ + + /* skip snaps */ + num = ceph_decode_32(p); + while (num--) { + *p += 8; /* snapid key */ + *p += 1 + 1; /* versions */ + len = ceph_decode_32(p); + *p += len; } - *p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2; + /* skip removed snaps */ + num = ceph_decode_32(p); + *p += num * (8 + 8); + + *p += 8; /* skip auid */ + pi->flags = ceph_decode_64(p); + + /* ignore the rest */ + + *p = pool_end; + calc_pg_masks(pi); return 0; bad: @@ -535,14 +563,15 @@ bad: static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) { struct ceph_pg_pool_info *pi; - u32 num, len, pool; + u32 num, len; + u64 pool; ceph_decode_32_safe(p, end, num, bad); dout(" %d pool names\n", num); while (num--) { - ceph_decode_32_safe(p, end, pool, bad); + ceph_decode_64_safe(p, end, pool, bad); ceph_decode_32_safe(p, end, len, bad); - dout(" pool %d len %d\n", pool, len); + dout(" pool %llu len %d\n", pool, len); ceph_decode_need(p, end, len, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); if (pi) { @@ -633,7 +662,6 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) struct ceph_osdmap *map; u16 version; u32 len, max, i; - u8 ev; int err = -EINVAL; void *start = *p; struct ceph_pg_pool_info *pi; @@ -646,9 +674,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) map->pg_temp = RB_ROOT; ceph_decode_16_safe(p, end, version, bad); - if (version > CEPH_OSDMAP_VERSION) { - pr_warning("got unknown v %d > %d of osdmap\n", version, - CEPH_OSDMAP_VERSION); + if (version > 6) { + pr_warning("got unknown v %d > 6 of osdmap\n", version); + goto bad; + } + if (version < 6) { + pr_warning("got old v %d < 6 of osdmap\n", version); goto bad; } @@ -660,20 +691,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ceph_decode_32_safe(p, end, max, bad); while (max--) { - ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad); + ceph_decode_need(p, end, 8 + 2, bad); err = -ENOMEM; pi = kzalloc(sizeof(*pi), GFP_NOFS); if (!pi) goto bad; - pi->id = ceph_decode_32(p); - err = -EINVAL; - ev = ceph_decode_8(p); /* encoding version */ - if (ev > CEPH_PG_POOL_VERSION) { - pr_warning("got unknown v %d > %d of ceph_pg_pool\n", - ev, CEPH_PG_POOL_VERSION); - kfree(pi); - goto bad; - } + pi->id = ceph_decode_64(p); err = __decode_pool(p, end, pi); if (err < 0) { kfree(pi); @@ -682,12 +705,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) __insert_pg_pool(&map->pg_pools, pi); } - if (version >= 5) { - err = __decode_pool_names(p, end, map); - if (err < 0) { - dout("fail to decode pool names"); - goto bad; - } + err = __decode_pool_names(p, end, map); + if (err < 0) { + dout("fail to decode pool names"); + goto bad; } ceph_decode_32_safe(p, end, map->pool_max, bad); @@ -788,16 +809,17 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, struct ceph_fsid fsid; u32 epoch = 0; struct ceph_timespec modified; - u32 len, pool; - __s32 new_pool_max, new_flags, max; + s32 len; + u64 pool; + __s64 new_pool_max; + __s32 new_flags, max; void *start = *p; int err = -EINVAL; u16 version; ceph_decode_16_safe(p, end, version, bad); - if (version > CEPH_OSDMAP_INC_VERSION) { - pr_warning("got unknown v %d > %d of inc osdmap\n", version, - CEPH_OSDMAP_INC_VERSION); + if (version > 6) { + pr_warning("got unknown v %d > %d of inc osdmap\n", version, 6); goto bad; } @@ -807,7 +829,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, epoch = ceph_decode_32(p); BUG_ON(epoch != map->epoch+1); ceph_decode_copy(p, &modified, sizeof(modified)); - new_pool_max = ceph_decode_32(p); + new_pool_max = ceph_decode_64(p); new_flags = ceph_decode_32(p); /* full map? */ @@ -857,18 +879,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, /* new_pool */ ceph_decode_32_safe(p, end, len, bad); while (len--) { - __u8 ev; struct ceph_pg_pool_info *pi; - ceph_decode_32_safe(p, end, pool, bad); - ceph_decode_need(p, end, 1 + sizeof(pi->v), bad); - ev = ceph_decode_8(p); /* encoding version */ - if (ev > CEPH_PG_POOL_VERSION) { - pr_warning("got unknown v %d > %d of ceph_pg_pool\n", - ev, CEPH_PG_POOL_VERSION); - err = -EINVAL; - goto bad; - } + ceph_decode_64_safe(p, end, pool, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); if (!pi) { pi = kzalloc(sizeof(*pi), GFP_NOFS); @@ -894,7 +907,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, while (len--) { struct ceph_pg_pool_info *pi; - ceph_decode_32_safe(p, end, pool, bad); + ceph_decode_64_safe(p, end, pool, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); if (pi) __remove_pg_pool(&map->pg_pools, pi); @@ -1097,8 +1110,8 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol, pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); if (!pool) return -EIO; - pgid.seed = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); - num = le32_to_cpu(pool->v.pg_num); + pgid.seed = ceph_str_hash(pool->object_hash, oid, strlen(oid)); + num = pool->pg_num; num_mask = pool->pg_num_mask; dout("calc_object_layout '%s' pgid %lld.%x\n", oid, pgid.pool, @@ -1132,8 +1145,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, return NULL; /* pg_temp? */ - t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), - pool->pgp_num_mask); + t = ceph_stable_mod(ps, pool->pg_num, pool->pgp_num_mask); pgid.seed = t; pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { @@ -1142,26 +1154,24 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, } /* crush */ - ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset, - pool->v.type, pool->v.size); + ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, + pool->type, pool->size); if (ruleno < 0) { pr_err("no crush rule pool %d ruleset %d type %d size %d\n", - poolid, pool->v.crush_ruleset, pool->v.type, - pool->v.size); + poolid, pool->crush_ruleset, pool->type, + pool->size); return NULL; } - pps = ceph_stable_mod(ps, - le32_to_cpu(pool->v.pgp_num), - pool->pgp_num_mask); + pps = ceph_stable_mod(ps, pool->pgp_num, pool->pgp_num_mask); pps += poolid; r = crush_do_rule(osdmap->crush, ruleno, pps, osds, - min_t(int, pool->v.size, *num), + min_t(int, pool->size, *num), osdmap->osd_weight); if (r < 0) { pr_err("error %d from crush rule: pool %d ruleset %d type %d" - " size %d\n", r, poolid, pool->v.crush_ruleset, - pool->v.type, pool->v.size); + " size %d\n", r, poolid, pool->crush_ruleset, + pool->type, pool->size); return NULL; } *num = r; -- cgit v1.2.3 From 2169aea649c08374bec7d220a3b8f64712275356 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 25 Feb 2013 16:13:08 -0800 Subject: libceph: calculate placement based on the internal data types Instead of using the old ceph_object_layout struct, update our internal ceph_calc_object_layout method to use the ceph_pg type. This allows us to pass the full 32-bit precision of the pgid.seed to the callers. It also allows some callers to avoid reaching into the request structures for the struct ceph_object_layout fields. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/osd_client.c | 11 +++++++---- net/ceph/osdmap.c | 18 +++++------------- 2 files changed, 12 insertions(+), 17 deletions(-) (limited to 'net') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 1990834e518..5584f0a08e2 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -913,21 +913,18 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger); static int __map_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req, int force_resend) { - struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_pg pgid; int acting[CEPH_PG_MAX_SIZE]; int o = -1, num = 0; int err; dout("map_request %p tid %lld\n", req, req->r_tid); - err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, + err = ceph_calc_object_layout(&pgid, req->r_oid, &req->r_file_layout, osdc->osdmap); if (err) { list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; } - pgid.pool = le32_to_cpu(reqhead->layout.ol_pgid.pool); - pgid.seed = le16_to_cpu(reqhead->layout.ol_pgid.ps); req->r_pgid = pgid; err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); @@ -1000,10 +997,16 @@ static void __send_request(struct ceph_osd_client *osdc, req, req->r_tid, req->r_osd->o_osd, req->r_flags); reqhead = req->r_request->front.iov_base; + reqhead->snapid = cpu_to_le64(req->r_snapid); reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ reqhead->reassert_version = req->r_reassert_version; + reqhead->layout.ol_pgid.ps = cpu_to_le16(req->r_pgid.seed); + reqhead->layout.ol_pgid.pool = cpu_to_le32(req->r_pgid.pool); + reqhead->layout.ol_pgid.preferred = cpu_to_le16(-1); + reqhead->layout.ol_stripe_unit = 0; + req->r_stamp = jiffies; list_move_tail(&req->r_req_lru_item, &osdc->req_lru); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 911919320d2..37847164450 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1095,32 +1095,24 @@ EXPORT_SYMBOL(ceph_calc_file_object_mapping); * calculate an object layout (i.e. pgid) from an oid, * file_layout, and osdmap */ -int ceph_calc_object_layout(struct ceph_object_layout *ol, +int ceph_calc_object_layout(struct ceph_pg *pg, const char *oid, struct ceph_file_layout *fl, struct ceph_osdmap *osdmap) { unsigned int num, num_mask; - struct ceph_pg pgid; struct ceph_pg_pool_info *pool; BUG_ON(!osdmap); - - pgid.pool = le32_to_cpu(fl->fl_pg_pool); - pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); + pg->pool = le32_to_cpu(fl->fl_pg_pool); + pool = __lookup_pg_pool(&osdmap->pg_pools, pg->pool); if (!pool) return -EIO; - pgid.seed = ceph_str_hash(pool->object_hash, oid, strlen(oid)); + pg->seed = ceph_str_hash(pool->object_hash, oid, strlen(oid)); num = pool->pg_num; num_mask = pool->pg_num_mask; - dout("calc_object_layout '%s' pgid %lld.%x\n", oid, pgid.pool, - pgid.seed); - - ol->ol_pgid.ps = cpu_to_le16(pgid.seed); - ol->ol_pgid.pool = fl->fl_pg_pool; - ol->ol_pgid.preferred = cpu_to_le16(-1); - ol->ol_stripe_unit = fl->fl_object_stripe_unit; + dout("calc_object_layout '%s' pgid %lld.%x\n", oid, pg->pool, pg->seed); return 0; } EXPORT_SYMBOL(ceph_calc_object_layout); -- cgit v1.2.3 From 1b83bef24c6746a146d39915a18fb5425f2facb0 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 25 Feb 2013 16:11:12 -0800 Subject: libceph: update osd request/reply encoding Use the new version of the encoding for osd requests and replies. In the process, update the way we are tracking request ops and reply lengths and results in the struct ceph_osd_request. Update the rbd and fs/ceph users appropriately. The main changes are: - we keep pointers into the request memory for fields we need to update each time the request is sent out over the wire - we keep information about the result in an array in the request struct where the users can easily get at it. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/debugfs.c | 18 +--- net/ceph/osd_client.c | 233 ++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 174 insertions(+), 77 deletions(-) (limited to 'net') diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index f4d4b27d602..00d051f4894 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -123,10 +123,7 @@ static int osdc_show(struct seq_file *s, void *pp) mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { struct ceph_osd_request *req; - struct ceph_osd_request_head *head; - struct ceph_osd_op *op; - int num_ops; - int opcode, olen; + int opcode; int i; req = rb_entry(p, struct ceph_osd_request, r_node); @@ -135,13 +132,7 @@ static int osdc_show(struct seq_file *s, void *pp) req->r_osd ? req->r_osd->o_osd : -1, req->r_pgid.pool, req->r_pgid.seed); - head = req->r_request->front.iov_base; - op = (void *)(head + 1); - - num_ops = le16_to_cpu(head->num_ops); - olen = le32_to_cpu(head->object_len); - seq_printf(s, "%.*s", olen, - (const char *)(head->ops + num_ops)); + seq_printf(s, "%.*s", req->r_oid_len, req->r_oid); if (req->r_reassert_version.epoch) seq_printf(s, "\t%u'%llu", @@ -150,10 +141,9 @@ static int osdc_show(struct seq_file *s, void *pp) else seq_printf(s, "\t"); - for (i = 0; i < num_ops; i++) { - opcode = le16_to_cpu(op->op); + for (i = 0; i < req->r_num_ops; i++) { + opcode = le16_to_cpu(req->r_request_ops[i].op); seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); - op++; } seq_printf(s, "\n"); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 5584f0a08e2..d730dd4d8eb 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -146,15 +146,23 @@ EXPORT_SYMBOL(ceph_osdc_release_request); struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_snap_context *snapc, - unsigned int num_op, + unsigned int num_ops, bool use_mempool, gfp_t gfp_flags) { struct ceph_osd_request *req; struct ceph_msg *msg; - size_t msg_size = sizeof(struct ceph_osd_request_head); - - msg_size += num_op*sizeof(struct ceph_osd_op); + size_t msg_size; + + msg_size = 4 + 4 + 8 + 8 + 4+8; + msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ + msg_size += 1 + 8 + 4 + 4; /* pg_t */ + msg_size += 4 + MAX_OBJ_NAME_SIZE; + msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); + msg_size += 8; /* snapid */ + msg_size += 8; /* snap_seq */ + msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ + msg_size += 4; if (use_mempool) { req = mempool_alloc(osdc->req_mempool, gfp_flags); @@ -193,9 +201,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ceph_pagelist_init(&req->r_trail); /* create request message; allow space for oid */ - msg_size += MAX_OBJ_NAME_SIZE; - if (snapc) - msg_size += sizeof(u64) * snapc->num_snaps; if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op, 0); else @@ -324,55 +329,80 @@ static void osd_req_encode_op(struct ceph_osd_request *req, * */ void ceph_osdc_build_request(struct ceph_osd_request *req, - u64 off, u64 len, unsigned int num_op, + u64 off, u64 len, unsigned int num_ops, struct ceph_osd_req_op *src_ops, struct ceph_snap_context *snapc, u64 snap_id, struct timespec *mtime) { struct ceph_msg *msg = req->r_request; - struct ceph_osd_request_head *head; struct ceph_osd_req_op *src_op; - struct ceph_osd_op *op; void *p; - size_t msg_size = sizeof(*head) + num_op*sizeof(*op); + size_t msg_size; int flags = req->r_flags; u64 data_len; int i; - WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); - - head = msg->front.iov_base; - head->snapid = cpu_to_le64(snap_id); - op = (void *)(head + 1); - p = (void *)(op + num_op); - + req->r_num_ops = num_ops; + req->r_snapid = snap_id; req->r_snapc = ceph_get_snap_context(snapc); - head->client_inc = cpu_to_le32(1); /* always, for now. */ - head->flags = cpu_to_le32(flags); - if (flags & CEPH_OSD_FLAG_WRITE) - ceph_encode_timespec(&head->mtime, mtime); - BUG_ON(num_op > (unsigned int) ((u16) -1)); - head->num_ops = cpu_to_le16(num_op); + /* encode request */ + msg->hdr.version = cpu_to_le16(4); - /* fill in oid */ - head->object_len = cpu_to_le32(req->r_oid_len); + p = msg->front.iov_base; + ceph_encode_32(&p, 1); /* client_inc is always 1 */ + req->r_request_osdmap_epoch = p; + p += 4; + req->r_request_flags = p; + p += 4; + if (req->r_flags & CEPH_OSD_FLAG_WRITE) + ceph_encode_timespec(p, mtime); + p += sizeof(struct ceph_timespec); + req->r_request_reassert_version = p; + p += sizeof(struct ceph_eversion); /* will get filled in */ + + /* oloc */ + ceph_encode_8(&p, 4); + ceph_encode_8(&p, 4); + ceph_encode_32(&p, 8 + 4 + 4); + req->r_request_pool = p; + p += 8; + ceph_encode_32(&p, -1); /* preferred */ + ceph_encode_32(&p, 0); /* key len */ + + ceph_encode_8(&p, 1); + req->r_request_pgid = p; + p += 8 + 4; + ceph_encode_32(&p, -1); /* preferred */ + + /* oid */ + ceph_encode_32(&p, req->r_oid_len); memcpy(p, req->r_oid, req->r_oid_len); + dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); p += req->r_oid_len; + /* ops */ + ceph_encode_16(&p, num_ops); src_op = src_ops; - while (num_op--) - osd_req_encode_op(req, op++, src_op++); + req->r_request_ops = p; + for (i = 0; i < num_ops; i++, src_op++) { + osd_req_encode_op(req, p, src_op); + p += sizeof(struct ceph_osd_op); + } - if (snapc) { - head->snap_seq = cpu_to_le64(snapc->seq); - head->num_snaps = cpu_to_le32(snapc->num_snaps); + /* snaps */ + ceph_encode_64(&p, req->r_snapid); + ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); + ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); + if (req->r_snapc) { for (i = 0; i < snapc->num_snaps; i++) { - put_unaligned_le64(snapc->snaps[i], p); - p += sizeof(u64); + ceph_encode_64(&p, req->r_snapc->snaps[i]); } } + req->r_request_attempts = p; + p += 4; + data_len = req->r_trail.length; if (flags & CEPH_OSD_FLAG_WRITE) { req->r_request->hdr.data_off = cpu_to_le16(off); @@ -385,6 +415,9 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, msg_size = p - msg->front.iov_base; msg->front.iov_len = msg_size; msg->hdr.front_len = cpu_to_le32(msg_size); + + dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, + num_ops); return; } EXPORT_SYMBOL(ceph_osdc_build_request); @@ -991,21 +1024,22 @@ out: static void __send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { - struct ceph_osd_request_head *reqhead; - - dout("send_request %p tid %llu to osd%d flags %d\n", - req, req->r_tid, req->r_osd->o_osd, req->r_flags); - - reqhead = req->r_request->front.iov_base; - reqhead->snapid = cpu_to_le64(req->r_snapid); - reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); - reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ - reqhead->reassert_version = req->r_reassert_version; + void *p; - reqhead->layout.ol_pgid.ps = cpu_to_le16(req->r_pgid.seed); - reqhead->layout.ol_pgid.pool = cpu_to_le32(req->r_pgid.pool); - reqhead->layout.ol_pgid.preferred = cpu_to_le16(-1); - reqhead->layout.ol_stripe_unit = 0; + dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", + req, req->r_tid, req->r_osd->o_osd, req->r_flags, + (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); + + /* fill in message content that changes each time we send it */ + put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); + put_unaligned_le32(req->r_flags, req->r_request_flags); + put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); + p = req->r_request_pgid; + ceph_encode_64(&p, req->r_pgid.pool); + ceph_encode_32(&p, req->r_pgid.seed); + put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ + memcpy(req->r_request_reassert_version, &req->r_reassert_version, + sizeof(req->r_reassert_version)); req->r_stamp = jiffies; list_move_tail(&req->r_req_lru_item, &osdc->req_lru); @@ -1105,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req) complete_all(&req->r_safe_completion); /* fsync waiter */ } +static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid) +{ + __u8 v; + + ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad); + v = ceph_decode_8(p); + if (v > 1) { + pr_warning("do not understand pg encoding %d > 1", v); + return -EINVAL; + } + pgid->pool = ceph_decode_64(p); + pgid->seed = ceph_decode_32(p); + *p += 4; + return 0; + +bad: + pr_warning("incomplete pg encoding"); + return -EINVAL; +} + /* * handle osd op reply. either call the callback if it is specified, * or do the completion to wake up the waiting thread. @@ -1112,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req) static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, struct ceph_connection *con) { - struct ceph_osd_reply_head *rhead = msg->front.iov_base; + void *p, *end; struct ceph_osd_request *req; u64 tid; - int numops, object_len, flags; + int object_len; + int numops, payload_len, flags; s32 result; + s32 retry_attempt; + struct ceph_pg pg; + int err; + u32 reassert_epoch; + u64 reassert_version; + u32 osdmap_epoch; + int i; tid = le64_to_cpu(msg->hdr.tid); - if (msg->front.iov_len < sizeof(*rhead)) - goto bad; - numops = le32_to_cpu(rhead->num_ops); - object_len = le32_to_cpu(rhead->object_len); - result = le32_to_cpu(rhead->result); - if (msg->front.iov_len != sizeof(*rhead) + object_len + - numops * sizeof(struct ceph_osd_op)) + dout("handle_reply %p tid %llu\n", msg, tid); + + p = msg->front.iov_base; + end = p + msg->front.iov_len; + + ceph_decode_need(&p, end, 4, bad); + object_len = ceph_decode_32(&p); + ceph_decode_need(&p, end, object_len, bad); + p += object_len; + + err = __decode_pgid(&p, end, &pg); + if (err) goto bad; - dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); + + ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); + flags = ceph_decode_64(&p); + result = ceph_decode_32(&p); + reassert_epoch = ceph_decode_32(&p); + reassert_version = ceph_decode_64(&p); + osdmap_epoch = ceph_decode_32(&p); + /* lookup */ mutex_lock(&osdc->request_mutex); req = __lookup_request(osdc, tid); @@ -1137,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, return; } ceph_osdc_get_request(req); - flags = le32_to_cpu(rhead->flags); + + dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, + req, result); + + ceph_decode_need(&p, end, 4, bad); + numops = ceph_decode_32(&p); + if (numops > CEPH_OSD_MAX_OP) + goto bad_put; + if (numops != req->r_num_ops) + goto bad_put; + payload_len = 0; + ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); + for (i = 0; i < numops; i++) { + struct ceph_osd_op *op = p; + int len; + + len = le32_to_cpu(op->payload_len); + req->r_reply_op_len[i] = len; + dout(" op %d has %d bytes\n", i, len); + payload_len += len; + p += sizeof(*op); + } + if (payload_len != le32_to_cpu(msg->hdr.data_len)) { + pr_warning("sum of op payload lens %d != data_len %d", + payload_len, le32_to_cpu(msg->hdr.data_len)); + goto bad_put; + } + + ceph_decode_need(&p, end, 4 + numops * 4, bad); + retry_attempt = ceph_decode_32(&p); + for (i = 0; i < numops; i++) + req->r_reply_op_result[i] = ceph_decode_32(&p); /* * if this connection filled our message, drop our reference now, to @@ -1152,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, if (!req->r_got_reply) { unsigned int bytes; - req->r_result = le32_to_cpu(rhead->result); + req->r_result = result; bytes = le32_to_cpu(msg->hdr.data_len); dout("handle_reply result %d bytes %d\n", req->r_result, bytes); @@ -1160,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, req->r_result = bytes; /* in case this is a write and we need to replay, */ - req->r_reassert_version = rhead->reassert_version; + req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); + req->r_reassert_version.version = cpu_to_le64(reassert_version); req->r_got_reply = 1; } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { @@ -1195,10 +1301,11 @@ done: ceph_osdc_put_request(req); return; +bad_put: + ceph_osdc_put_request(req); bad: - pr_err("corrupt osd_op_reply got %d %d expected %d\n", - (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), - (int)sizeof(*rhead)); + pr_err("corrupt osd_op_reply got %d %d\n", + (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); ceph_msg_dump(msg); } -- cgit v1.2.3 From 83ca14fdd35821554058e5fd4fa7b118ee504a33 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 26 Feb 2013 10:39:09 -0800 Subject: libceph: add support for HASHPSPOOL pool flag The legacy behavior adds the pgid seed and pool together as the input for CRUSH. That is problematic because each pool's PGs end up mapping to the same OSDs: 1.5 == 2.4 == 3.3 == ... Instead, if the HASHPSPOOL flag is set, we has the ps and pool together and feed that into CRUSH. This ensures that two adjacent pools will map to an independent pseudorandom set of OSDs. Advertise our support for this via a protocol feature flag. Signed-off-by: Sage Weil Reviewed-by: Alex Elder --- net/ceph/osdmap.c | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) (limited to 'net') diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 37847164450..69bc4bf89e3 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1127,18 +1127,16 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, struct ceph_pg_mapping *pg; struct ceph_pg_pool_info *pool; int ruleno; - unsigned int poolid, ps, pps, t, r; + int r; + u32 pps; - poolid = pgid.pool; - ps = pgid.seed; - - pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); + pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); if (!pool) return NULL; /* pg_temp? */ - t = ceph_stable_mod(ps, pool->pg_num, pool->pgp_num_mask); - pgid.seed = t; + pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, + pool->pgp_num_mask); pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { *num = pg->len; @@ -1149,20 +1147,35 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, pool->type, pool->size); if (ruleno < 0) { - pr_err("no crush rule pool %d ruleset %d type %d size %d\n", - poolid, pool->crush_ruleset, pool->type, + pr_err("no crush rule pool %lld ruleset %d type %d size %d\n", + pgid.pool, pool->crush_ruleset, pool->type, pool->size); return NULL; } - pps = ceph_stable_mod(ps, pool->pgp_num, pool->pgp_num_mask); - pps += poolid; + if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { + /* hash pool id and seed sothat pool PGs do not overlap */ + pps = crush_hash32_2(CRUSH_HASH_RJENKINS1, + ceph_stable_mod(pgid.seed, pool->pgp_num, + pool->pgp_num_mask), + pgid.pool); + } else { + /* + * legacy ehavior: add ps and pool together. this is + * not a great approach because the PGs from each pool + * will overlap on top of each other: 0.5 == 1.4 == + * 2.3 == ... + */ + pps = ceph_stable_mod(pgid.seed, pool->pgp_num, + pool->pgp_num_mask) + + (unsigned)pgid.pool; + } r = crush_do_rule(osdmap->crush, ruleno, pps, osds, min_t(int, pool->size, *num), osdmap->osd_weight); if (r < 0) { - pr_err("error %d from crush rule: pool %d ruleset %d type %d" - " size %d\n", r, poolid, pool->crush_ruleset, + pr_err("error %d from crush rule: pool %lld ruleset %d type %d" + " size %d\n", r, pgid.pool, pool->crush_ruleset, pool->type, pool->size); return NULL; } -- cgit v1.2.3