diff --git a/prov/cxi/include/cxip.h b/prov/cxi/include/cxip.h index 65b48da6db3..b8e42ca0a69 100644 --- a/prov/cxi/include/cxip.h +++ b/prov/cxi/include/cxip.h @@ -2957,6 +2957,9 @@ struct cxip_coll_mc { int next_red_id; // next available red_id int max_red_id; // limit total concurrency int seqno; // rolling seqno for packets + int close_state; // the state of the close operation + bool has_closed; // true after a mc close call + bool has_error; // true if any error bool is_multicast; // true if multicast address bool arm_disable; // arm-disable for testing bool retry_disable; // retry-disable for testing diff --git a/prov/cxi/src/cxip_coll.c b/prov/cxi/src/cxip_coll.c index 8d503c1c7b0..ae804e3779f 100644 --- a/prov/cxi/src/cxip_coll.c +++ b/prov/cxi/src/cxip_coll.c @@ -2704,13 +2704,17 @@ static void _curl_delete_mc_obj(struct cxip_coll_mc *mc_obj); static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle); /* Close multicast collective object */ -static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete) +static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete, bool has_error) { int count; if (!mc_obj) return; TRACE_JOIN("%s starting MC cleanup\n", __func__); + + mc_obj->has_closed = true; + mc_obj->has_error = has_error; + /* clear the mcast_addr -> mc_obj reference*/ ofi_idm_clear(&mc_obj->ep_obj->coll.mcast_map, mc_obj->mcast_addr); mc_obj->ep_obj->coll.is_hwroot = false; @@ -2739,10 +2743,18 @@ static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete) cxip_env.coll_fm_timeout_msec/1000, (cxip_env.coll_fm_timeout_msec%1000)*1000000}; + if (!mc_obj->has_error) + mc_obj->close_state = -FI_EAGAIN; + _tsset(&mc_obj->curlexpires, &expires); _curl_delete_mc_obj(mc_obj); - } else - free(mc_obj); + } else { + if (mc_obj->has_error) { + free(mc_obj); + } else { + mc_obj->close_state = FI_SUCCESS; + } + } } /* The user can close an individual collective MC address. It must do so on @@ -2752,11 +2764,37 @@ static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete) static int _fi_close_mc(struct fid *fid) { struct cxip_coll_mc *mc_obj; + int ret = FI_SUCCESS; TRACE_JOIN("%s: closing MC\n", __func__); mc_obj = container_of(fid, struct cxip_coll_mc, mc_fid.fid); - _close_mc(mc_obj, true); - return FI_SUCCESS; + if (!mc_obj) { + TRACE_JOIN("%s: MC object is null\n", __func__); + return ret; + } else if (mc_obj->has_closed) { + TRACE_JOIN("%s: close already called before\n", __func__); + return ret; + } else if (mc_obj->has_error) { + TRACE_JOIN("%s: encounted an error earlier\n", __func__); + return ret; + } + + _close_mc(mc_obj, true, false); + while (mc_obj && (ret = mc_obj->close_state) == -FI_EAGAIN) { + ret = cxip_curl_progress(NULL); + if (ret == -FI_EAGAIN) { + usleep(10); + continue; + } + if (ret < 0 && ret != -FI_ENODATA) { + TRACE_JOIN("%s: Curl progress failed, error=%d\n", __func__, ret); + break; + } + usleep(10); + } + free(mc_obj); + + return ret; } /* multicast object libfabric functions */ @@ -2986,6 +3024,11 @@ static int _initialize_mc(void *ptr) _coll_metrics.ep_data.isroot = mc_obj->hwroot_idx == mc_obj->mynode_idx; + /* Initially set close states to success */ + mc_obj->close_state = FI_SUCCESS; + mc_obj->has_closed = false; + mc_obj->has_error = false; + /* Return information to the caller */ jstate->mc_obj = mc_obj; *jstate->mc = &mc_obj->mc_fid; @@ -2996,7 +3039,7 @@ static int _initialize_mc(void *ptr) fail: jstate->prov_errno = FI_CXI_ERRNO_JOIN_FAIL_PTE; - _close_mc(mc_obj, true); + _close_mc(mc_obj, true, true); return ret; } @@ -3076,7 +3119,11 @@ static void _curl_delete_mc_obj(struct cxip_coll_mc *mc_obj) TRACE_JOIN("CURL delete mcast %d failed\n", mc_obj->mcast_addr); free(curl_usrptr); - free(mc_obj); + if (mc_obj->has_error) { + free(mc_obj); + } else { + mc_obj->close_state = ret; + } } } @@ -3102,7 +3149,11 @@ static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle) case 201: TRACE_JOIN("callback: %ld SUCCESS MCAST DELETED\n", handle->status); - free(mc_obj); + if (mc_obj->has_error) { + free(mc_obj); + } else { + mc_obj->close_state = FI_SUCCESS; + } break; case 409: TRACE_JOIN("callback: delete mcast failed: %ld '%s'\n", @@ -3110,7 +3161,11 @@ static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle) if (_tsexp(&mc_obj->curlexpires)) { TRACE_JOIN("callback: FM expired\n"); - free(mc_obj); + if (mc_obj->has_error) { + free(mc_obj); + } else { + mc_obj->close_state = FI_CXI_ERRNO_JOIN_CURL_TIMEOUT; + } break; } /* try again */ @@ -3118,7 +3173,11 @@ static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle) break; default: TRACE_JOIN("callback: %ld unknown status\n", handle->status); - free(mc_obj); + if (mc_obj->has_error) { + free(mc_obj); + } else { + mc_obj->close_state = FI_CXI_ERRNO_JOIN_CURL_FAILED; + } break; } /* free json memory */ @@ -4209,7 +4268,7 @@ void cxip_coll_close(struct cxip_ep_obj *ep_obj) while (!dlist_empty(&ep_obj->coll.mc_list)) { dlist_pop_front(&ep_obj->coll.mc_list, struct cxip_coll_mc, mc_obj, entry); - _close_mc(mc_obj, false); + _close_mc(mc_obj, false, true); } }