From f4116f35f0163fed28660e3b2ecfddeb241cc7f5 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 28 Nov 2024 14:50:45 +0800 Subject: [PATCH] fix/refactor-vnode-management-open-vnode --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 5 +- source/dnode/mgmt/mgmt_vnode/src/vmFile.c | 24 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 32 +-- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 302 +++++++++++--------- 4 files changed, 196 insertions(+), 167 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 7842077d88f..164be636a95 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -35,7 +35,7 @@ typedef struct SVnodeMgmt { SWWorkerPool fetchPool; SSingleWorker mgmtWorker; SSingleWorker mgmtMultiWorker; - SHashObj *hash; + SHashObj *runngingHash; SHashObj *closedHash; SHashObj *creatingHash; TdThreadRwlock lock; @@ -98,7 +98,8 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict); void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl); void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed); -void vmRemoveFromCreatingHash(SVnodeMgmt *pMgmt, int32_t vgId); +void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId); +void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId); // vmHandle.c SArray *vmGetMsgHandles(); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index b4453ad6fc8..dbef048c23d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -23,7 +23,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod (void)taosThreadRwlockRdlock(&pMgmt->lock); int32_t num = 0; - int32_t size = taosHashGetSize(pMgmt->hash); + int32_t size = taosHashGetSize(pMgmt->runngingHash); int32_t closedSize = taosHashGetSize(pMgmt->closedHash); size += closedSize; SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *)); @@ -32,7 +32,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod return terrno; } - void *pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->runngingHash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; SVnodeObj *pVnode = *ppVnode; @@ -40,9 +40,9 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); dTrace("vgId:%d,acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount); pVnodes[num++] = (*ppVnode); - pIter = taosHashIterate(pMgmt->hash, pIter); + pIter = taosHashIterate(pMgmt->runngingHash, pIter); } else { - taosHashCancelIterate(pMgmt->hash, pIter); + taosHashCancelIterate(pMgmt->runngingHash, pIter); } } @@ -71,7 +71,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV (void)taosThreadRwlockRdlock(&pMgmt->lock); int32_t num = 0; - int32_t size = taosHashGetSize(pMgmt->hash); + int32_t size = taosHashGetSize(pMgmt->runngingHash); int32_t creatingSize = taosHashGetSize(pMgmt->creatingHash); size += creatingSize; SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *)); @@ -80,7 +80,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV return terrno; } - void *pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->runngingHash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; SVnodeObj *pVnode = *ppVnode; @@ -88,9 +88,9 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); dTrace("vgId:%d,acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount); pVnodes[num++] = (*ppVnode); - pIter = taosHashIterate(pMgmt->hash, pIter); + pIter = taosHashIterate(pMgmt->runngingHash, pIter); } else { - taosHashCancelIterate(pMgmt->hash, pIter); + taosHashCancelIterate(pMgmt->runngingHash, pIter); } } @@ -119,14 +119,14 @@ int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeOb (void)taosThreadRwlockRdlock(&pMgmt->lock); int32_t num = 0; - int32_t size = taosHashGetSize(pMgmt->hash); + int32_t size = taosHashGetSize(pMgmt->runngingHash); SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *)); if (pVnodes == NULL) { (void)taosThreadRwlockUnlock(&pMgmt->lock); return terrno; } - void *pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->runngingHash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; SVnodeObj *pVnode = *ppVnode; @@ -134,9 +134,9 @@ int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeOb int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount); pVnodes[num++] = (*ppVnode); - pIter = taosHashIterate(pMgmt->hash, pIter); + pIter = taosHashIterate(pMgmt->runngingHash, pIter); } else { - taosHashCancelIterate(pMgmt->hash, pIter); + taosHashCancelIterate(pMgmt->runngingHash, pIter); } } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 90b3f0025d9..fd30555c3be 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -27,7 +27,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { (void)taosThreadRwlockRdlock(&pMgmt->lock); - void *pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->runngingHash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL || *ppVnode == NULL) continue; @@ -43,7 +43,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { if (taosArrayPush(pInfo->pVloads, &vload) == NULL) { dError("failed to push vnode load"); } - pIter = taosHashIterate(pMgmt->hash, pIter); + pIter = taosHashIterate(pMgmt->runngingHash, pIter); } (void)taosThreadRwlockUnlock(&pMgmt->lock); @@ -55,7 +55,7 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { (void)taosThreadRwlockRdlock(&pMgmt->lock); - void *pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->runngingHash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL || *ppVnode == NULL) continue; @@ -71,7 +71,7 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { } } } - pIter = taosHashIterate(pMgmt->hash, pIter); + pIter = taosHashIterate(pMgmt->runngingHash, pIter); } (void)taosThreadRwlockUnlock(&pMgmt->lock); @@ -140,7 +140,7 @@ void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) { (void)taosThreadRwlockRdlock(&pMgmt->lock); for (int i = 0; i < list_size; i++) { int32_t vgroup_id = vgroup_ids[i]; - void *vnode = taosHashGet(pMgmt->hash, &vgroup_id, sizeof(int32_t)); + void *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t)); if (vnode == NULL) { r = taos_counter_delete(tsInsertCounter, keys[i]); if (r) { @@ -381,7 +381,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { if (vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs) < 0) { dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr()); vmReleaseVnode(pMgmt, pVnode); - vmRemoveFromCreatingHash(pMgmt, req.vgId); + vmCleanPrimaryDisk(pMgmt, req.vgId); (void)tFreeSCreateVnodeReq(&req); code = terrno != 0 ? terrno : -1; return code; @@ -423,25 +423,11 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } _OVER: - vmRemoveFromCreatingHash(pMgmt, req.vgId); + vmCleanPrimaryDisk(pMgmt, req.vgId); if (code != 0) { - int32_t r = 0; - r = taosThreadRwlockWrlock(&pMgmt->lock); - if (r != 0) { - dError("vgId:%d, failed to lock since %s", req.vgId, tstrerror(r)); - } - if (r == 0) { - dInfo("vgId:%d, remove from hash", req.vgId); - r = taosHashRemove(pMgmt->hash, &req.vgId, sizeof(int32_t)); - if (r != 0) { - dError("vgId:%d, failed to remove vnode since %s", req.vgId, tstrerror(r)); - } - } - r = taosThreadRwlockUnlock(&pMgmt->lock); - if (r != 0) { - dError("vgId:%d, failed to unlock since %s", req.vgId, tstrerror(r)); - } + vmCloseFailedVnode(pMgmt, req.vgId); + vnodeClose(pImpl); vnodeDestroy(0, path, pMgmt->pTfs, 0); } else { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index c0f15b88774..6a61cc38597 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -25,7 +25,7 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; (void)taosThreadRwlockRdlock(&pMgmt->lock); - int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); + int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode != NULL) { diskId = pVnode->diskPrimary; } @@ -33,6 +33,82 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { return diskId; } +static void vmFreeVnodeObj(SVnodeObj **ppVnode) { + if (!ppVnode || !(*ppVnode)) return; + + SVnodeObj *pVnode = *ppVnode; + + int32_t refCount = atomic_load_32(&pVnode->refCount); + while (refCount > 0) { + dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount); + taosMsleep(200); + refCount = atomic_load_32(&pVnode->refCount); + } + + taosMemoryFree(pVnode->path); + taosMemoryFree(pVnode); + ppVnode[0] = NULL; +} + +static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t diskId) { + int32_t code = 0; + SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj)); + if (pCreatingVnode == NULL) { + dError("failed to alloc vnode since %s", terrstr()); + return terrno; + } + (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj)); + + pCreatingVnode->vgId = vgId; + pCreatingVnode->diskPrimary = diskId; + + code = taosThreadRwlockWrlock(&pMgmt->lock); + if (code != 0) { + taosMemoryFree(pCreatingVnode); + return code; + } + + dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode); + code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *)); + if (code != 0) { + dError("vgId:%d, failed to put vnode to creatingHash", vgId); + taosMemoryFree(pCreatingVnode); + } + + int32_t r = taosThreadRwlockUnlock(&pMgmt->lock); + if (r != 0) { + dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r)); + } + + return code; +} + +static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) { + SVnodeObj *pOld = NULL; + + (void)taosThreadRwlockWrlock(&pMgmt->lock); + int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld); + if (r != 0) { + dError("vgId:%d, failed to get vnode from creating Hash", vgId); + } + dTrace("vgId:%d, remove from creating Hash", vgId); + r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t)); + if (r != 0) { + dError("vgId:%d, failed to remove vnode from hash", vgId); + } + (void)taosThreadRwlockUnlock(&pMgmt->lock); + + if (pOld) { + dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld); + vmFreeVnodeObj(&pOld); + } + +_OVER: + if (r != 0) { + dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r)); + } +} + int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { int32_t code = 0; STfs *pTfs = pMgmt->pTfs; @@ -91,45 +167,15 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { diskId = id; } } - - SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj)); - if (pCreatingVnode == NULL) { - code = -1; - if (terrno != 0) code = terrno; - dError("failed to alloc vnode since %s", tstrerror(code)); - int32_t r = taosThreadMutexUnlock(&pMgmt->mutex); - if (r != 0) { - dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r)); - } - goto _OVER; - } - (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj)); - - pCreatingVnode->vgId = vgId; - pCreatingVnode->diskPrimary = diskId; - - code = taosThreadRwlockWrlock(&pMgmt->lock); + code = vmRegisterCreatingState(pMgmt, vgId, diskId); if (code != 0) { int32_t r = taosThreadMutexUnlock(&pMgmt->mutex); if (r != 0) { dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r)); } - taosMemoryFree(pCreatingVnode); goto _OVER; } - dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode); - code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *)); - if (code != 0) { - dError("vgId:%d, failed to put vnode to creatingHash", vgId); - taosMemoryFree(pCreatingVnode); - } - - int32_t r = taosThreadRwlockUnlock(&pMgmt->lock); - if (r != 0) { - dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r)); - } - code = taosThreadMutexUnlock(&pMgmt->mutex); if (code != 0) { goto _OVER; @@ -154,11 +200,13 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { } } +void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingState(pMgmt, vgId); } + SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) { SVnodeObj *pVnode = NULL; (void)taosThreadRwlockRdlock(&pMgmt->lock); - int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); + int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; pVnode = NULL; @@ -182,21 +230,75 @@ void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { //(void)taosThreadRwlockUnlock(&pMgmt->lock); } -static void vmFreeVnodeObj(SVnodeObj **ppVnode) { - if (!ppVnode || !(*ppVnode)) return; +static int32_t vmRegisterRunningState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { + SVnodeObj *pOld = NULL; - SVnodeObj *pVnode = *ppVnode; + int32_t r = taosHashGetDup(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); + if (r != 0) { + dError("vgId:%d, failed to get vnode from hash", pVnode->vgId); + } + if (pOld) { + vmFreeVnodeObj(&pOld); + } + int32_t code = taosHashPut(pMgmt->runngingHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); - int32_t refCount = atomic_load_32(&pVnode->refCount); - while (refCount > 0) { - dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount); - taosMsleep(200); - refCount = atomic_load_32(&pVnode->refCount); + return code; +} + +static void vmUnRegisterRunningState(SVnodeMgmt *pMgmt, int32_t vgId) { + dInfo("vgId:%d, remove from hash", vgId); + int32_t r = taosHashRemove(pMgmt->runngingHash, &vgId, sizeof(int32_t)); + if (r != 0) { + dError("vgId:%d, failed to remove vnode since %s", vgId, tstrerror(r)); } +} - taosMemoryFree(pVnode->path); - taosMemoryFree(pVnode); - ppVnode[0] = NULL; +static int32_t vmRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { + int32_t code = 0; + SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj)); + if (pClosedVnode == NULL) { + dError("failed to alloc vnode since %s", terrstr()); + return terrno; + } + (void)memset(pClosedVnode, 0, sizeof(SVnodeObj)); + + pClosedVnode->vgId = pVnode->vgId; + pClosedVnode->dropped = pVnode->dropped; + pClosedVnode->vgVersion = pVnode->vgVersion; + pClosedVnode->diskPrimary = pVnode->diskPrimary; + pClosedVnode->toVgId = pVnode->toVgId; + + SVnodeObj *pOld = NULL; + int32_t r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); + if (r != 0) { + dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId); + } + if (pOld) { + vmFreeVnodeObj(&pOld); + } + dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId); + r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *)); + if (r != 0) { + dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId); + } + + return code; +} + +static void vmUnRegisterClosedState(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { + SVnodeObj *pOld = NULL; + int32_t r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); + if (r != 0) { + dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId); + } + if (pOld != NULL) { + vmFreeVnodeObj(&pOld); + dInfo("vgId:%d, remove from closedHash", pVnode->vgId); + r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t)); + if (r != 0) { + dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId); + } + } } int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { @@ -233,32 +335,8 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { } (void)taosThreadRwlockWrlock(&pMgmt->lock); - - SVnodeObj *pOld = NULL; - - int32_t r = taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); - if (r != 0) { - dError("vgId:%d, failed to get vnode from hash", pVnode->vgId); - } - if (pOld) { - vmFreeVnodeObj(&pOld); - } - int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); - - pOld = NULL; - r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); - if (r != 0) { - dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId); - } - if (pOld != NULL) { - vmFreeVnodeObj(&pOld); - dInfo("vgId:%d, remove from closedHash", pVnode->vgId); - r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t)); - if (r != 0) { - dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId); - } - } - + int32_t code = vmRegisterRunningState(pMgmt, pVnode); + vmUnRegisterClosedState(pMgmt, pVnode); (void)taosThreadRwlockUnlock(&pMgmt->lock); return code; @@ -273,38 +351,12 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, } (void)taosThreadRwlockWrlock(&pMgmt->lock); - int32_t r = taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); - if (r != 0) { - dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId); - } + vmUnRegisterRunningState(pMgmt, pVnode->vgId); if (keepClosed) { - SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj)); - if (pClosedVnode == NULL) { - dError("failed to alloc vnode since %s", terrstr()); + if (vmRegisterClosedState(pMgmt, pVnode) != 0) { (void)taosThreadRwlockUnlock(&pMgmt->lock); return; - } - (void)memset(pClosedVnode, 0, sizeof(SVnodeObj)); - - pClosedVnode->vgId = pVnode->vgId; - pClosedVnode->dropped = pVnode->dropped; - pClosedVnode->vgVersion = pVnode->vgVersion; - pClosedVnode->diskPrimary = pVnode->diskPrimary; - pClosedVnode->toVgId = pVnode->toVgId; - - SVnodeObj *pOld = NULL; - r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); - if (r != 0) { - dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId); - } - if (pOld) { - vmFreeVnodeObj(&pOld); - } - dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId); - r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *)); - if (r != 0) { - dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId); - } + }; } (void)taosThreadRwlockUnlock(&pMgmt->lock); @@ -391,6 +443,21 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, vmFreeVnodeObj(&pVnode); } +void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) { + int32_t r = 0; + r = taosThreadRwlockWrlock(&pMgmt->lock); + if (r != 0) { + dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r)); + } + if (r == 0) { + vmUnRegisterRunningState(pMgmt, vgId); + } + r = taosThreadRwlockUnlock(&pMgmt->lock); + if (r != 0) { + dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r)); + } +} + static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) { int32_t srcVgId = pCfg->vgId; int32_t dstVgId = pCfg->toVgId; @@ -482,8 +549,9 @@ static void *vmOpenVnodeInThread(void *param) { } static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { - pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (pMgmt->hash == NULL) { + pMgmt->runngingHash = + taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pMgmt->runngingHash == NULL) { dError("failed to init vnode hash since %s", terrstr()); return TSDB_CODE_OUT_OF_MEMORY; } @@ -579,32 +647,6 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { return 0; } -void vmRemoveFromCreatingHash(SVnodeMgmt *pMgmt, int32_t vgId) { - SVnodeObj *pOld = NULL; - - (void)taosThreadRwlockWrlock(&pMgmt->lock); - int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld); - if (r != 0) { - dError("vgId:%d, failed to get vnode from creating Hash", vgId); - } - dTrace("vgId:%d, remove from creating Hash", vgId); - r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t)); - if (r != 0) { - dError("vgId:%d, failed to remove vnode from hash", vgId); - } - (void)taosThreadRwlockUnlock(&pMgmt->lock); - - if (pOld) { - dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld); - vmFreeVnodeObj(&pOld); - } - -_OVER: - if (r != 0) { - dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r)); - } -} - static void *vmCloseVnodeInThread(void *param) { SVnodeThread *pThread = param; SVnodeMgmt *pMgmt = pThread->pMgmt; @@ -693,9 +735,9 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { taosMemoryFree(ppVnodes); } - if (pMgmt->hash != NULL) { - taosHashCleanup(pMgmt->hash); - pMgmt->hash = NULL; + if (pMgmt->runngingHash != NULL) { + taosHashCleanup(pMgmt->runngingHash); + pMgmt->runngingHash = NULL; } void *pIter = taosHashIterate(pMgmt->closedHash, NULL);