From 5aa9531ea6ec606ebac77b383a429356dbc8fc3e Mon Sep 17 00:00:00 2001 From: "zihe.liu" Date: Wed, 27 Nov 2024 19:27:05 +0800 Subject: [PATCH] [BugFix] Fix getNextWorker overflow (#53213) Signed-off-by: zihe.liu (cherry picked from commit 4e70ef57dd556527c8a0076597888d73b4eefeb0) # Conflicts: # fe/fe-core/src/main/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProvider.java # fe/fe-core/src/main/java/com/starrocks/qe/scheduler/DefaultWorkerProvider.java # fe/fe-core/src/test/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProviderTest.java # fe/fe-core/src/test/java/com/starrocks/qe/scheduler/DefaultWorkerProviderTest.java --- .../DefaultSharedDataWorkerProvider.java | 289 ++++++++ .../starrocks/qe/WorkerProviderHelper.java | 34 + .../qe/scheduler/DefaultWorkerProvider.java | 422 +++++++++++ .../DefaultSharedDataWorkerProviderTest.java | 678 ++++++++++++++++++ .../scheduler/DefaultWorkerProviderTest.java | 398 ++++++++++ 5 files changed, 1821 insertions(+) create mode 100644 fe/fe-core/src/main/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProvider.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/qe/WorkerProviderHelper.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/qe/scheduler/DefaultWorkerProvider.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProviderTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/qe/scheduler/DefaultWorkerProviderTest.java diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProvider.java b/fe/fe-core/src/main/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProvider.java new file mode 100644 index 0000000000000..2b3f50744d267 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProvider.java @@ -0,0 +1,289 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.lake.qe.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.starrocks.common.ErrorCode; +import com.starrocks.common.ErrorReportException; +import com.starrocks.common.FeConstants; +import com.starrocks.qe.SessionVariableConstants.ComputationFragmentSchedulingPolicy; +import com.starrocks.qe.SimpleScheduler; +import com.starrocks.qe.scheduler.NonRecoverableException; +import com.starrocks.qe.scheduler.WorkerProvider; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.WarehouseManager; +import com.starrocks.system.ComputeNode; +import com.starrocks.system.SystemInfoService; +import com.starrocks.warehouse.Warehouse; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static com.starrocks.qe.WorkerProviderHelper.getNextWorker; + +/** + * WorkerProvider for SHARED_DATA mode. Compared to its counterpart for SHARED_NOTHING mode: + * 1. All Backends and ComputeNodes are treated the same as ComputeNodes. + * 2. Allows using backup node, when any of the initial workers in scan location is not available. + * Be noticed that, + * - All the worker nodes and available worker nodes are captured at the time of this provider creation. It + * is possible that the worker may not be available later when calling the interfaces of this provider. + * - All the nodes will be considered as available after the snapshot nodes info are captured, even though it + * may not be true all the time. + * - Specifically, when calling `selectBackupWorker()`, the selected node will be checked again if it is in + * `SimpleScheduler.isInBlocklist()` or not, to make sure that the backup node is in the available node list + * and not in the block list. + * Also in shared-data mode, all nodes will be treated as compute nodes. so the session variable @@prefer_compute_node + * will be always true, and @@use_compute_nodes will be always -1 which means using all the available compute nodes. + */ +public class DefaultSharedDataWorkerProvider implements WorkerProvider { + private static final Logger LOG = LogManager.getLogger(DefaultSharedDataWorkerProvider.class); + private static final AtomicInteger NEXT_COMPUTE_NODE_INDEX = new AtomicInteger(0); + + public static class Factory implements WorkerProvider.Factory { + @Override + public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService systemInfoService, + boolean preferComputeNode, + int numUsedComputeNodes, + ComputationFragmentSchedulingPolicy computationFragmentSchedulingPolicy, + long warehouseId) { + + WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr(); + ImmutableMap.Builder builder = ImmutableMap.builder(); + List computeNodeIds = warehouseManager.getAllComputeNodeIds(warehouseId); + computeNodeIds.forEach(nodeId -> builder.put(nodeId, + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendOrComputeNode(nodeId))); + ImmutableMap idToComputeNode = builder.build(); + if (LOG.isDebugEnabled()) { + LOG.debug("idToComputeNode: {}", idToComputeNode); + } + + ImmutableMap availableComputeNodes = filterAvailableWorkers(idToComputeNode); + if (availableComputeNodes.isEmpty()) { + Warehouse warehouse = warehouseManager.getWarehouse(warehouseId); + throw ErrorReportException.report(ErrorCode.ERR_NO_NODES_IN_WAREHOUSE, warehouse.getName()); + } + + return new DefaultSharedDataWorkerProvider(idToComputeNode, availableComputeNodes); + } + } + + /** + * All the compute nodes (including backends), including those that are not alive or in block list. + */ + private final ImmutableMap id2ComputeNode; + /** + * The available compute nodes, which are alive and not in the block list when creating the snapshot. It is still + * possible that the node becomes unavailable later, it will be checked again in some of the interfaces. + */ + private final ImmutableMap availableID2ComputeNode; + + /** + * List of the compute node ids, used to select buddy node in case some of the nodes are not available. + */ + private ImmutableList allComputeNodeIds; + + private final Set selectedWorkerIds; + + @VisibleForTesting + public DefaultSharedDataWorkerProvider(ImmutableMap id2ComputeNode, + ImmutableMap availableID2ComputeNode + ) { + this.id2ComputeNode = id2ComputeNode; + this.availableID2ComputeNode = availableID2ComputeNode; + this.selectedWorkerIds = Sets.newConcurrentHashSet(); + this.allComputeNodeIds = null; + } + + @Override + public long selectNextWorker() throws NonRecoverableException { + ComputeNode worker; + worker = getNextWorker(availableID2ComputeNode, DefaultSharedDataWorkerProvider::getNextComputeNodeIndex); + + if (worker == null) { + reportWorkerNotFoundException(); + } + Preconditions.checkNotNull(worker); + selectWorkerUnchecked(worker.getId()); + return worker.getId(); + } + + @Override + public void selectWorker(long workerId) throws NonRecoverableException { + if (getWorkerById(workerId) == null) { + reportWorkerNotFoundException(workerId); + } + selectWorkerUnchecked(workerId); + } + + @Override + public List selectAllComputeNodes() { + List nodeIds = availableID2ComputeNode.values().stream() + .map(ComputeNode::getId) + .collect(Collectors.toList()); + nodeIds.forEach(this::selectWorkerUnchecked); + return nodeIds; + } + + @Override + public Collection getAllWorkers() { + return availableID2ComputeNode.values(); + } + + @Override + public ComputeNode getWorkerById(long workerId) { + return availableID2ComputeNode.get(workerId); + } + + @Override + public boolean isDataNodeAvailable(long dataNodeId) { + // DataNode and ComputeNode is exchangeable in SHARED_DATA mode + return availableID2ComputeNode.containsKey(dataNodeId); + } + + @Override + public void reportDataNodeNotFoundException() throws NonRecoverableException { + reportWorkerNotFoundException(); + } + + @Override + public boolean isWorkerSelected(long workerId) { + return selectedWorkerIds.contains(workerId); + } + + @Override + public List getSelectedWorkerIds() { + return new ArrayList<>(selectedWorkerIds); + } + + @Override + public List getAllAvailableNodes() { + return Lists.newArrayList(availableID2ComputeNode.keySet()); + } + + @Override + public boolean isPreferComputeNode() { + return true; + } + + @Override + public void selectWorkerUnchecked(long workerId) { + selectedWorkerIds.add(workerId); + } + + @Override + public void reportWorkerNotFoundException() throws NonRecoverableException { + reportWorkerNotFoundException(-1); + } + + private void reportWorkerNotFoundException(long workerId) throws NonRecoverableException { + throw new NonRecoverableException( + FeConstants.getNodeNotFoundError(true) + " nodeId: " + workerId + " " + computeNodesToString(false)); + } + + @Override + public boolean allowUsingBackupNode() { + return true; + } + + /** + * Try to select the next workerId in the sorted list just after the workerId, if the next one is not available, + * e.g. also in BlockList, then try the next one in the list, until all nodes have benn tried. + */ + @Override + public long selectBackupWorker(long workerId) { + if (availableID2ComputeNode.isEmpty() || !id2ComputeNode.containsKey(workerId)) { + return -1; + } + if (allComputeNodeIds == null) { + createAvailableIdList(); + } + Preconditions.checkNotNull(allComputeNodeIds); + Preconditions.checkState(allComputeNodeIds.contains(workerId)); + + int startPos = allComputeNodeIds.indexOf(workerId); + int attempts = allComputeNodeIds.size(); + while (attempts-- > 0) { + // ensure the buddyId selection is stable, that is, giving the same input, the output is always the same. + // TODO: call StarOSAgent interface, let starmgr to choose a buddy node or trigger scheduling as necessary. + startPos = (startPos + 1) % allComputeNodeIds.size(); + long buddyId = allComputeNodeIds.get(startPos); + if (buddyId != workerId && availableID2ComputeNode.containsKey(buddyId) && + !SimpleScheduler.isInBlocklist(buddyId)) { + return buddyId; + } + } + return -1; + } + + @Override + public String toString() { + return computeNodesToString(true); + } + + private String computeNodesToString(boolean allowNormalNodes) { + StringBuilder out = new StringBuilder("compute node: "); + + id2ComputeNode.forEach((backendID, backend) -> { + if (allowNormalNodes || !backend.isAlive() || !availableID2ComputeNode.containsKey(backendID) || + SimpleScheduler.isInBlocklist(backendID)) { + out.append( + String.format("[%s alive: %b, available: %b, inBlacklist: %b] ", backend.getHost(), + backend.isAlive(), availableID2ComputeNode.containsKey(backendID), + SimpleScheduler.isInBlocklist(backendID))); + } + }); + return out.toString(); + } + + private void createAvailableIdList() { + List ids = new ArrayList<>(id2ComputeNode.keySet()); + Collections.sort(ids); + this.allComputeNodeIds = ImmutableList.copyOf(ids); + } + + @VisibleForTesting + static int getNextComputeNodeIndex() { + return NEXT_COMPUTE_NODE_INDEX.getAndIncrement(); + } + + @VisibleForTesting + static AtomicInteger getNextComputeNodeIndexer() { + return NEXT_COMPUTE_NODE_INDEX; + } + + private static ImmutableMap filterAvailableWorkers(ImmutableMap workers) { + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + for (Map.Entry entry : workers.entrySet()) { + if (entry.getValue().isAlive() && !SimpleScheduler.isInBlocklist(entry.getKey())) { + builder.put(entry); + } + } + return builder.build(); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/WorkerProviderHelper.java b/fe/fe-core/src/main/java/com/starrocks/qe/WorkerProviderHelper.java new file mode 100644 index 0000000000000..e55fc4aaa371a --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/qe/WorkerProviderHelper.java @@ -0,0 +1,34 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe; + +import com.google.common.collect.ImmutableMap; +import com.starrocks.system.ComputeNode; + +import java.util.function.IntSupplier; + +public class WorkerProviderHelper { + public static C getNextWorker(ImmutableMap workers, + IntSupplier getNextWorkerNodeIndex) { + if (workers.isEmpty()) { + return null; + } + int index = getNextWorkerNodeIndex.getAsInt() % workers.size(); + if (index < 0) { + index = -index; + } + return workers.values().asList().get(index); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/DefaultWorkerProvider.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/DefaultWorkerProvider.java new file mode 100644 index 0000000000000..2a1ff96da73d8 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/DefaultWorkerProvider.java @@ -0,0 +1,422 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.starrocks.common.FeConstants; +import com.starrocks.qe.ConnectContext; +import com.starrocks.qe.SessionVariableConstants.ComputationFragmentSchedulingPolicy; +import com.starrocks.qe.SimpleScheduler; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.RunMode; +import com.starrocks.server.WarehouseManager; +import com.starrocks.system.ComputeNode; +import com.starrocks.system.SystemInfoService; +import org.apache.commons.collections4.MapUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static com.starrocks.qe.WorkerProviderHelper.getNextWorker; + +/** + * DefaultWorkerProvider handles ComputeNode/Backend selection in SHARED_NOTHING mode. + * NOTE: remember to update DefaultSharedDataWorkerProvider if the change applies to both run modes. + */ +public class DefaultWorkerProvider implements WorkerProvider { + private static final Logger LOG = LogManager.getLogger(DefaultWorkerProvider.class); + + private static final AtomicInteger NEXT_COMPUTE_NODE_INDEX = new AtomicInteger(0); + private static final AtomicInteger NEXT_BACKEND_INDEX = new AtomicInteger(0); + + /** + * All the backend nodes, including those that are not alive or in blacklist. + */ + private final ImmutableMap id2Backend; + /** + * All the compute nodes, including those that are not alive or in blacklist. + */ + private final ImmutableMap id2ComputeNode; + /** + * The available backend nodes, which are alive and not in the blacklist. + */ + private final ImmutableMap availableID2Backend; + /** + * The available compute nodes, which are alive and not in the blacklist. + */ + private final ImmutableMap availableID2ComputeNode; + + /** + * These three members record the workers used by the related job. + * They are updated when calling {@code chooseXXX} methods. + */ + private final Set selectedWorkerIds; + + /** + * Indicates whether there are available compute nodes. + */ + private final boolean hasComputeNode; + /** + * Indicates whether compute nodes should be used for non-HDFS operators. + * The HDFS operator always prefers to use compute nodes even if {@code usedComputeNode} is false. + */ + private final boolean usedComputeNode; + + private final boolean preferComputeNode; + + public static class Factory implements WorkerProvider.Factory { + @Override + public DefaultWorkerProvider captureAvailableWorkers(SystemInfoService systemInfoService, + boolean preferComputeNode, int numUsedComputeNodes, + ComputationFragmentSchedulingPolicy computationFragmentSchedulingPolicy, + long warehouseId) { + + ImmutableMap idToComputeNode = + buildComputeNodeInfo(systemInfoService, numUsedComputeNodes, + computationFragmentSchedulingPolicy, warehouseId); + + ImmutableMap idToBackend = ImmutableMap.copyOf(systemInfoService.getIdToBackend()); + + if (LOG.isDebugEnabled()) { + LOG.debug("idToBackend size={}", idToBackend.size()); + for (Map.Entry entry : idToBackend.entrySet()) { + Long backendID = entry.getKey(); + ComputeNode backend = entry.getValue(); + LOG.debug("backend: {}-{}-{}", backendID, backend.getHost(), backend.getBePort()); + } + + LOG.debug("idToComputeNode: {}", idToComputeNode); + } + + return new DefaultWorkerProvider(idToBackend, idToComputeNode, + filterAvailableWorkers(idToBackend), filterAvailableWorkers(idToComputeNode), + preferComputeNode); + } + } + + @VisibleForTesting + public DefaultWorkerProvider(ImmutableMap id2Backend, + ImmutableMap id2ComputeNode, + ImmutableMap availableID2Backend, + ImmutableMap availableID2ComputeNode, + boolean preferComputeNode) { + this.id2Backend = id2Backend; + this.id2ComputeNode = id2ComputeNode; + + this.availableID2Backend = availableID2Backend; + this.availableID2ComputeNode = availableID2ComputeNode; + + this.selectedWorkerIds = Sets.newConcurrentHashSet(); + + this.hasComputeNode = MapUtils.isNotEmpty(availableID2ComputeNode); + if (MapUtils.isEmpty(availableID2Backend) && hasComputeNode) { + this.usedComputeNode = true; + } else { + this.usedComputeNode = hasComputeNode && preferComputeNode; + } + this.preferComputeNode = preferComputeNode; + } + + @VisibleForTesting + public DefaultWorkerProvider( + ImmutableMap id2ComputeNode, + ImmutableMap availableID2ComputeNode) { + this.id2Backend = ImmutableMap.of(); + this.id2ComputeNode = id2ComputeNode; + + this.availableID2Backend = ImmutableMap.of(); + this.availableID2ComputeNode = availableID2ComputeNode; + + this.selectedWorkerIds = Sets.newConcurrentHashSet(); + + this.hasComputeNode = true; + this.preferComputeNode = true; + this.usedComputeNode = true; + } + + @Override + public long selectNextWorker() throws NonRecoverableException { + ComputeNode worker; + if (usedComputeNode) { + worker = getNextWorker(availableID2ComputeNode, DefaultWorkerProvider::getNextComputeNodeIndex); + } else { + worker = getNextWorker(availableID2Backend, DefaultWorkerProvider::getNextBackendIndex); + } + + if (worker == null) { + reportWorkerNotFoundException(); + } + Preconditions.checkNotNull(worker); + + selectWorkerUnchecked(worker.getId()); + return worker.getId(); + } + + @Override + public void selectWorker(long workerId) throws NonRecoverableException { + if (getWorkerById(workerId) == null) { + reportWorkerNotFoundException(); + } + selectWorkerUnchecked(workerId); + } + + @Override + public List selectAllComputeNodes() { + if (!usedComputeNode) { + return Collections.emptyList(); + } + + List nodeIds = availableID2ComputeNode.values().stream() + .map(ComputeNode::getId) + .collect(Collectors.toList()); + nodeIds.forEach(this::selectWorkerUnchecked); + + return nodeIds; + } + + @Override + public Collection getAllWorkers() { + if (hasComputeNode) { + return availableID2ComputeNode.values(); + } else { + return ImmutableList.copyOf(availableID2Backend.values()); + } + } + + @Override + public ComputeNode getWorkerById(long workerId) { + ComputeNode worker = availableID2Backend.get(workerId); + if (worker != null) { + return worker; + } + return availableID2ComputeNode.get(workerId); + } + + @Override + public boolean isDataNodeAvailable(long dataNodeId) { + return getBackend(dataNodeId) != null || getComputeNode(dataNodeId) != null; + } + + @Override + public void reportDataNodeNotFoundException() throws NonRecoverableException { + reportWorkerNotFoundException(false); + } + + @Override + public void reportWorkerNotFoundException() throws NonRecoverableException { + reportWorkerNotFoundException(usedComputeNode); + } + + @Override + public boolean isWorkerSelected(long workerId) { + return selectedWorkerIds.contains(workerId); + } + + @Override + public List getSelectedWorkerIds() { + return new ArrayList<>(selectedWorkerIds); + } + + /** + * if usedComputeNode turns on or no backend, we add all compute nodes to the result. + * if perferComputeNode turns on, we just return computeNode set + * else add backend set and return + * + * @return + */ + @Override + public List getAllAvailableNodes() { + List nodeIds = Lists.newArrayList(); + if (usedComputeNode || availableID2Backend.isEmpty()) { + nodeIds.addAll(availableID2ComputeNode.keySet()); + } + + if (preferComputeNode) { + return nodeIds; + } + nodeIds.addAll(availableID2Backend.keySet()); + return nodeIds; + } + + @Override + public boolean isPreferComputeNode() { + return preferComputeNode; + } + + @Override + public void selectWorkerUnchecked(long workerId) { + selectedWorkerIds.add(workerId); + } + + @Override + public String toString() { + return toString(usedComputeNode, true); + } + + @VisibleForTesting + ComputeNode getBackend(Long backendID) { + return availableID2Backend.get(backendID); + } + + @VisibleForTesting + ComputeNode getComputeNode(Long computeNodeID) { + return availableID2ComputeNode.get(computeNodeID); + } + + @Override + public long selectBackupWorker(long workerId) { + // not allowed to have backup node + return -1; + } + + private String toString(boolean chooseComputeNode, boolean allowNormalNodes) { + return chooseComputeNode ? computeNodesToString(allowNormalNodes) : + backendsToString(allowNormalNodes); + } + + private void reportWorkerNotFoundException(boolean chooseComputeNode) throws NonRecoverableException { + throw new NonRecoverableException( + FeConstants.getNodeNotFoundError(chooseComputeNode) + toString(chooseComputeNode, false)); + } + + private String computeNodesToString(boolean allowNormalNodes) { + StringBuilder out = new StringBuilder("compute node: "); + + id2ComputeNode.forEach((backendID, backend) -> { + if (shouldIncludeNode(backend, backendID, allowNormalNodes)) { + out.append( + String.format("[%s alive: %b inBlacklist: %b] ", backend.getHost(), + backend.isAlive(), SimpleScheduler.isInBlocklist(backendID))); + } + }); + return out.toString(); + } + + private String backendsToString(boolean allowNormalNodes) { + StringBuilder out = new StringBuilder("backend: "); + id2Backend.forEach((backendID, backend) -> { + if (shouldIncludeNode(backend, backendID, allowNormalNodes)) { + out.append( + formatNodeInfo(backend.getHost(), backend.isAlive(), SimpleScheduler.isInBlocklist(backendID))); + } + }); + return out.toString(); + } + + private boolean shouldIncludeNode(ComputeNode node, Long nodeId, boolean allowNormalNodes) { + return allowNormalNodes || !node.isAlive() || SimpleScheduler.isInBlocklist(nodeId); + } + + private String formatNodeInfo(String host, boolean isAlive, boolean isInBlacklist) { + return String.format("[%s alive: %b inBlacklist: %b] ", + host, isAlive, isInBlacklist); + } + + @VisibleForTesting + static int getNextComputeNodeIndex() { + if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) { + long currentWh = WarehouseManager.DEFAULT_WAREHOUSE_ID; + if (ConnectContext.get() != null) { + currentWh = ConnectContext.get().getCurrentWarehouseId(); + } + return GlobalStateMgr.getCurrentState().getWarehouseMgr(). + getNextComputeNodeIndexFromWarehouse(currentWh).getAndIncrement(); + } + return NEXT_COMPUTE_NODE_INDEX.getAndIncrement(); + } + + @VisibleForTesting + static int getNextBackendIndex() { + return NEXT_BACKEND_INDEX.getAndIncrement(); + } + + private static ImmutableMap buildComputeNodeInfo(SystemInfoService systemInfoService, + int numUsedComputeNodes, + ComputationFragmentSchedulingPolicy computationFragmentSchedulingPolicy, + long warehouseId) { + //define Node Pool + Map computeNodes = new HashMap<>(); + + //get CN and BE from systemInfoService + ImmutableMap idToComputeNode + = ImmutableMap.copyOf(systemInfoService.getIdComputeNode()); + ImmutableMap idToBackend + = ImmutableMap.copyOf(systemInfoService.getIdToBackend()); + + //add CN and BE to Node Pool + if (numUsedComputeNodes <= 0) { + computeNodes.putAll(idToComputeNode); + if (computationFragmentSchedulingPolicy == ComputationFragmentSchedulingPolicy.ALL_NODES) { + computeNodes.putAll(idToBackend); + } + } else { + for (int i = 0; i < idToComputeNode.size() && computeNodes.size() < numUsedComputeNodes; i++) { + ComputeNode computeNode = + getNextWorker(idToComputeNode, DefaultWorkerProvider::getNextComputeNodeIndex); + Preconditions.checkNotNull(computeNode); + if (!isWorkerAvailable(computeNode)) { + continue; + } + computeNodes.put(computeNode.getId(), computeNode); + } + if (computationFragmentSchedulingPolicy == ComputationFragmentSchedulingPolicy.ALL_NODES) { + for (int i = 0; i < idToBackend.size() && computeNodes.size() < numUsedComputeNodes; i++) { + ComputeNode backend = + getNextWorker(idToBackend, DefaultWorkerProvider::getNextBackendIndex); + Preconditions.checkNotNull(backend); + if (!isWorkerAvailable(backend)) { + continue; + } + computeNodes.put(backend.getId(), backend); + } + + } + } + + //return Node Pool + return ImmutableMap.copyOf(computeNodes); + } + + public static boolean isWorkerAvailable(ComputeNode worker) { + return worker.isAlive() && !SimpleScheduler.isInBlocklist(worker.getId()); + } + + @VisibleForTesting + static AtomicInteger getNextComputeNodeIndexer() { + return NEXT_COMPUTE_NODE_INDEX; + } + + private static ImmutableMap filterAvailableWorkers(ImmutableMap workers) { + return ImmutableMap.copyOf( + workers.entrySet().stream() + .filter(entry -> isWorkerAvailable(entry.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + ); + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProviderTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProviderTest.java new file mode 100644 index 0000000000000..df67e30cd5c6c --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProviderTest.java @@ -0,0 +1,678 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.lake.qe.scheduler; + +import com.google.api.client.util.Lists; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.starrocks.analysis.TupleDescriptor; +import com.starrocks.analysis.TupleId; +import com.starrocks.catalog.HashDistributionInfo; +import com.starrocks.catalog.OlapTable; +import com.starrocks.common.ExceptionChecker; +import com.starrocks.common.UserException; +import com.starrocks.planner.OlapScanNode; +import com.starrocks.planner.PlanNodeId; +import com.starrocks.qe.ColocatedBackendSelector; +import com.starrocks.qe.FragmentScanRangeAssignment; +import com.starrocks.qe.HostBlacklist; +import com.starrocks.qe.NormalBackendSelector; +import com.starrocks.qe.SessionVariableConstants.ComputationFragmentSchedulingPolicy; +import com.starrocks.qe.SimpleScheduler; +import com.starrocks.qe.scheduler.NonRecoverableException; +import com.starrocks.qe.scheduler.WorkerProvider; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.WarehouseManager; +import com.starrocks.system.Backend; +import com.starrocks.system.ComputeNode; +import com.starrocks.system.SystemInfoService; +import com.starrocks.thrift.TInternalScanRange; +import com.starrocks.thrift.TScanRange; +import com.starrocks.thrift.TScanRangeLocation; +import com.starrocks.thrift.TScanRangeLocations; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import org.assertj.core.util.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public class DefaultSharedDataWorkerProviderTest { + private Map id2Backend; + private Map id2ComputeNode; + private Map id2AllNodes; + private DefaultSharedDataWorkerProvider.Factory factory; + + private static Map genWorkers(long startId, long endId, + Supplier factory) { + Map res = new HashMap<>(); + for (long i = startId; i < endId; i++) { + C worker = factory.get(); + worker.setId(i); + worker.setAlive(true); + worker.setHost("host#" + i); + worker.setBePort(80); + res.put(i, worker); + } + return res; + } + + @BeforeClass + public static void setUpTestSuite() { + SimpleScheduler.getHostBlacklist().disableAutoUpdate(); + } + + @Before + public void setUp() { + // clear the block list + SimpleScheduler.getHostBlacklist().hostBlacklist.clear(); + factory = new DefaultSharedDataWorkerProvider.Factory(); + + // Generate mock Workers + // BE, 1-10 + id2Backend = genWorkers(1, 11, Backend::new); + // CN, 11-15 + id2ComputeNode = genWorkers(11, 16, ComputeNode::new); + // all nodes + id2AllNodes = Maps.newHashMap(id2Backend); + id2AllNodes.putAll(id2ComputeNode); + + // Setup MockUp + WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr(); + new Expectations(warehouseManager) { + { + warehouseManager.getAllComputeNodeIds(anyLong); + result = Lists.newArrayList(id2AllNodes.keySet()); + minTimes = 0; + } + }; + + new MockUp() { + @Mock + public ComputeNode getBackendOrComputeNode(long nodeId) { + ComputeNode node = id2ComputeNode.get(nodeId); + if (node == null) { + node = id2Backend.get(nodeId); + } + return node; + } + }; + } + + private WorkerProvider newWorkerProvider() { + return factory.captureAvailableWorkers( + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(), true, + -1, ComputationFragmentSchedulingPolicy.COMPUTE_NODES_ONLY, + WarehouseManager.DEFAULT_WAREHOUSE_ID); + } + + private static void testUsingWorkerHelper(WorkerProvider workerProvider, Long workerId) { + Assert.assertTrue(workerProvider.isWorkerSelected(workerId)); + Assert.assertTrue(workerProvider.getSelectedWorkerIds().contains(workerId)); + } + + private List prepareNodeAliveAndBlock(SystemInfoService sysInfo, HostBlacklist blockList) { + // for every even number of worker, take in turn to set to alive=false and inBlock=true. + // [0:alive=false, 1, 2:inBlock=true, 3, 4:alive=false, ...] + List availList = Lists.newArrayList(id2AllNodes.keySet()); + boolean flip = true; + for (int i = 0; i < id2AllNodes.size(); i += 2) { + ComputeNode node = sysInfo.getBackendOrComputeNode(i); + if (node != null) { + if (flip) { + node.setAlive(false); + } else { + blockList.add(node.getId()); + } + flip = !flip; + availList.remove(node.getId()); + } + } + return availList; + } + + @Test + public void testCaptureAvailableWorkers() { + long deadBEId = 1L; + long deadCNId = 11L; + long inBlacklistBEId = 3L; + long inBlacklistCNId = 13L; + + HostBlacklist blockList = SimpleScheduler.getHostBlacklist(); + + blockList.add(inBlacklistBEId); + blockList.add(inBlacklistCNId); + id2Backend.get(deadBEId).setAlive(false); + id2ComputeNode.get(deadCNId).setAlive(false); + + Set nonAvailableWorkerId = ImmutableSet.of(deadBEId, deadCNId, inBlacklistBEId, inBlacklistCNId); + WorkerProvider workerProvider = newWorkerProvider(); + + Optional maxId = id2Backend.keySet().stream().max(Comparator.naturalOrder()); + Assert.assertFalse(maxId.isEmpty()); + for (long id : id2AllNodes.keySet()) { + ComputeNode worker = workerProvider.getWorkerById(id); + if (nonAvailableWorkerId.contains(id)) { + Assert.assertNull(worker); + } else { + Assert.assertEquals(id, worker.getId()); + if (id <= maxId.get()) { + Assert.assertTrue(worker instanceof Backend); + } else { + Assert.assertFalse(worker instanceof Backend); + } + } + } + } + + @Test + public void testSelectWorker() throws UserException { + HostBlacklist blockList = SimpleScheduler.getHostBlacklist(); + SystemInfoService sysInfo = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + + List availList = prepareNodeAliveAndBlock(sysInfo, blockList); + WorkerProvider provider = newWorkerProvider(); + + // intend to iterate the id out of the actual range. + for (long id = -1; id < id2AllNodes.size() + 5; id++) { + if (availList.contains(id)) { + provider.selectWorker(id); + testUsingWorkerHelper(provider, id); + } else { + long finalId = id; + Assert.assertThrows(NonRecoverableException.class, () -> provider.selectWorker(finalId)); + } + } + } + + @Test + public void testGetAllWorkers() { + HostBlacklist blockList = SimpleScheduler.getHostBlacklist(); + SystemInfoService sysInfo = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + + List availList = prepareNodeAliveAndBlock(sysInfo, blockList); + WorkerProvider provider = newWorkerProvider(); + // allWorkers returns only available workers + Collection allWorkers = provider.getAllWorkers(); + + Assert.assertEquals(availList.size(), allWorkers.size()); + for (ComputeNode node : allWorkers) { + Assert.assertTrue(availList.contains(node.getId())); + } + List allWorkerIds = allWorkers.stream().map(ComputeNode::getId).collect(Collectors.toList()); + for (long availId : availList) { + Assert.assertTrue(allWorkerIds.contains(availId)); + } + // strictly the same + List allAvailNodeIds = provider.getAllAvailableNodes(); + Assert.assertEquals(allWorkerIds, allAvailNodeIds); + } + + private static void testSelectNextWorkerHelper(WorkerProvider workerProvider, + Map id2Worker) + throws UserException { + Set selectedWorkers = new HashSet<>(id2Worker.size()); + for (int i = 0; i < id2Worker.size(); i++) { + long workerId = workerProvider.selectNextWorker(); + Assert.assertFalse(selectedWorkers.contains(workerId)); + selectedWorkers.add(workerId); + testUsingWorkerHelper(workerProvider, workerId); + } + } + + @Test + public void testSelectNextWorker() throws UserException { + HostBlacklist blockList = SimpleScheduler.getHostBlacklist(); + blockList.hostBlacklist.clear(); + SystemInfoService sysInfo = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + + List availList = prepareNodeAliveAndBlock(sysInfo, blockList); + { // test backend nodes only + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + for (long backendId : id2Backend.keySet()) { + if (availList.contains(backendId)) { + builder.put(backendId, id2Backend.get(backendId)); + } + } + ImmutableMap availableId2ComputeNode = builder.build(); + WorkerProvider workerProvider = + new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2Backend), availableId2ComputeNode); + testSelectNextWorkerHelper(workerProvider, availableId2ComputeNode); + } + + { // test compute nodes only + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + for (long backendId : id2ComputeNode.keySet()) { + if (availList.contains(backendId)) { + builder.put(backendId, id2ComputeNode.get(backendId)); + } + } + ImmutableMap availableId2ComputeNode = builder.build(); + WorkerProvider workerProvider = + new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2ComputeNode), availableId2ComputeNode); + testSelectNextWorkerHelper(workerProvider, availableId2ComputeNode); + } + + { // test both backends and compute nodes + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + for (long backendId : id2AllNodes.keySet()) { + if (availList.contains(backendId)) { + builder.put(backendId, id2AllNodes.get(backendId)); + } + } + ImmutableMap availableId2ComputeNode = builder.build(); + WorkerProvider workerProvider = + new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2AllNodes), availableId2ComputeNode); + testSelectNextWorkerHelper(workerProvider, availableId2ComputeNode); + } + + { // test no available worker to select + WorkerProvider workerProvider = + new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2AllNodes), ImmutableMap.of()); + + Exception e = Assert.assertThrows(NonRecoverableException.class, workerProvider::selectNextWorker); + Assert.assertEquals( + "Compute node not found. Check if any compute node is down. nodeId: -1 " + + "compute node: [host#1 alive: true, available: false, inBlacklist: false] " + + "[host#2 alive: false, available: false, inBlacklist: false] " + + "[host#3 alive: true, available: false, inBlacklist: false] " + + "[host#4 alive: true, available: false, inBlacklist: true] " + + "[host#5 alive: true, available: false, inBlacklist: false] " + + "[host#6 alive: false, available: false, inBlacklist: false] " + + "[host#7 alive: true, available: false, inBlacklist: false] " + + "[host#8 alive: true, available: false, inBlacklist: true] " + + "[host#9 alive: true, available: false, inBlacklist: false] " + + "[host#10 alive: false, available: false, inBlacklist: false] " + + "[host#11 alive: true, available: false, inBlacklist: false] " + + "[host#12 alive: true, available: false, inBlacklist: true] " + + "[host#13 alive: true, available: false, inBlacklist: false] " + + "[host#14 alive: false, available: false, inBlacklist: false] " + + "[host#15 alive: true, available: false, inBlacklist: false] ", + e.getMessage()); + } + } + + @Test + public void testChooseAllComputedNodes() { + { // empty compute nodes + WorkerProvider workerProvider = + new DefaultSharedDataWorkerProvider(ImmutableMap.of(), ImmutableMap.of()); + Assert.assertTrue(workerProvider.selectAllComputeNodes().isEmpty()); + } + + { // both compute nodes and backend are treated as compute nodes + WorkerProvider workerProvider = + new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2AllNodes), + ImmutableMap.copyOf(id2AllNodes)); + + List computeNodeIds = workerProvider.selectAllComputeNodes(); + Assert.assertEquals(id2AllNodes.size(), computeNodeIds.size()); + Set computeNodeIdSet = new HashSet<>(computeNodeIds); + for (ComputeNode computeNode : id2AllNodes.values()) { + Assert.assertTrue(computeNodeIdSet.contains(computeNode.getId())); + testUsingWorkerHelper(workerProvider, computeNode.getId()); + } + } + } + + @Test + public void testIsDataNodeAvailable() { + HostBlacklist blockList = SimpleScheduler.getHostBlacklist(); + blockList.hostBlacklist.clear(); + SystemInfoService sysInfo = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + + List availList = prepareNodeAliveAndBlock(sysInfo, blockList); + WorkerProvider provider = newWorkerProvider(); + + for (long id = -1; id < 16; id++) { + boolean isAvail = provider.isDataNodeAvailable(id); + ComputeNode worker = provider.getWorkerById(id); + if (!availList.contains(id)) { + Assert.assertFalse(isAvail); + } else { + Assert.assertEquals(id2AllNodes.get(id), worker); + Assert.assertTrue(isAvail); + } + } + } + + @Test + public void testReportBackendNotFoundException() { + WorkerProvider workerProvider = newWorkerProvider(); + Assert.assertThrows(NonRecoverableException.class, workerProvider::reportDataNodeNotFoundException); + } + + @Test + public void testSelectBackupWorkersEvenlySelected() { + Set counters = Sets.newHashSet(); + WorkerProvider workerProvider = newWorkerProvider(); + Assert.assertTrue(workerProvider.allowUsingBackupNode()); + for (long id : id2AllNodes.keySet()) { + long backupId = -1; + for (int j = 0; j < 100; ++j) { + long selectedId = workerProvider.selectBackupWorker(id); + Assert.assertTrue(selectedId > 0); + // cannot choose itself + Assert.assertNotEquals(id, selectedId); + if (backupId == -1) { + backupId = selectedId; + } else { + // always get the same node + Assert.assertEquals(backupId, selectedId); + } + } + Assert.assertTrue(backupId != -1); + Assert.assertFalse(counters.contains(id)); + counters.add(id); + } + // every node is chosen as a backup node once + Assert.assertEquals(id2AllNodes.size(), counters.size()); + } + + @Test + public void testIsPreferComputeNode() { + WorkerProvider provider = newWorkerProvider(); + Assert.assertTrue(provider.isPreferComputeNode()); + } + + @Test + public void testSelectBackupWorkerStable() { + HostBlacklist blockList = SimpleScheduler.getHostBlacklist(); + SystemInfoService sysInfo = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + List availList = prepareNodeAliveAndBlock(sysInfo, blockList); + + Optional unavail = id2AllNodes.keySet().stream().filter(x -> !availList.contains(x)).findAny(); + Assert.assertTrue(unavail.isPresent()); + + long unavailWorkerId = unavail.get(); + WorkerProvider provider = newWorkerProvider(); + + int selectCount = 0; + List selectedNodeId = Lists.newArrayList(); + + while (selectCount < availList.size() * 2 + 1) { // make sure the while loop will stop + Assert.assertFalse(provider.isDataNodeAvailable(unavailWorkerId)); + long alterNodeId = provider.selectBackupWorker(unavailWorkerId); + if (alterNodeId == -1) { + break; + } + // the backup node is not itself + Assert.assertTrue(alterNodeId != unavailWorkerId); + // the backup node is not any of the node before + Assert.assertFalse(selectedNodeId.contains(alterNodeId)); + + for (int j = 0; j < 10; ++j) { + long selectAgainId = provider.selectBackupWorker(unavailWorkerId); + Assert.assertEquals(alterNodeId, selectAgainId); + } + ++selectCount; + // make it in blockList, so next time it will choose a different node + blockList.add(alterNodeId); + selectedNodeId.add(alterNodeId); + } + // all nodes are in block list, no nodes can be selected anymore + Assert.assertEquals(-1, provider.selectBackupWorker(unavailWorkerId)); + // all the nodes are selected ever + Assert.assertEquals(selectedNodeId.size(), availList.size()); + + // a random workerId that doesn't exist in workerProvider + Assert.assertEquals(-1, provider.selectBackupWorker(15678)); + } + + private OlapScanNode newOlapScanNode(int id, int numBuckets) { + // copy from fe/fe-core/src/test/java/com/starrocks/qe/ColocatedBackendSelectorTest.java + TupleDescriptor desc = new TupleDescriptor(new TupleId(0)); + OlapTable table = new OlapTable(); + table.setDefaultDistributionInfo(new HashDistributionInfo(numBuckets, Collections.emptyList())); + desc.setTable(table); + return new OlapScanNode(new PlanNodeId(id), desc, "OlapScanNode"); + } + + private ArrayListMultimap genBucketSeq2Locations( + Map> bucketSeqToBackends, + int numTabletsPerBucket) { + // copy from fe/fe-core/src/test/java/com/starrocks/qe/ColocatedBackendSelectorTest.java + ArrayListMultimap bucketSeq2locations = ArrayListMultimap.create(); + bucketSeqToBackends.forEach((bucketSeq, backends) -> { + for (int i = 0; i < numTabletsPerBucket; i++) { + TScanRangeLocations bucketLocations = new TScanRangeLocations(); + + bucketLocations.setScan_range(new TScanRange().setInternal_scan_range(new TInternalScanRange())); + + List locations = backends.stream() + .map(backendId -> new TScanRangeLocation().setBackend_id(backendId)) + .collect(Collectors.toList()); + bucketLocations.setLocations(locations); + + bucketSeq2locations.put(bucketSeq, bucketLocations); + } + }); + + return bucketSeq2locations; + } + + /** + * Generate a list of ScanRangeLocations, contains n element for bucketNum + * + * @param n number of ScanRangeLocations + * @param bucketNum number of buckets + * @return lists of ScanRangeLocations + */ + private List generateScanRangeLocations(Map nodes, int n, int bucketNum) { + List locations = Lists.newArrayList(); + int currentBucketIndex = 0; + Iterator> iterator = nodes.entrySet().iterator(); + for (int i = 0; i < n; ++i) { + if (!iterator.hasNext()) { + iterator = nodes.entrySet().iterator(); + } + TInternalScanRange internalRange = new TInternalScanRange(); + internalRange.setBucket_sequence(currentBucketIndex); + internalRange.setRow_count(1); + + TScanRange range = new TScanRange(); + range.setInternal_scan_range(internalRange); + + TScanRangeLocations loc = new TScanRangeLocations(); + loc.setScan_range(range); + + TScanRangeLocation location = new TScanRangeLocation(); + ComputeNode node = iterator.next().getValue(); + location.setBackend_id(node.getId()); + location.setServer(node.getAddress()); + loc.addToLocations(location); + + locations.add(loc); + currentBucketIndex = (currentBucketIndex + 1) % bucketNum; + } + return locations; + } + + @Test + public void testNormalBackendSelectorWithSharedDataWorkerProvider() { + HostBlacklist blockList = SimpleScheduler.getHostBlacklist(); + blockList.hostBlacklist.clear(); + SystemInfoService sysInfo = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + List availList = prepareNodeAliveAndBlock(sysInfo, blockList); + + int bucketNum = 3; + OlapScanNode scanNode = newOlapScanNode(1, bucketNum); + List scanLocations = generateScanRangeLocations(id2AllNodes, 10, bucketNum); + + WorkerProvider provider = newWorkerProvider(); + + int nonAvailNum = 0; + for (TScanRangeLocations locations : scanLocations) { + for (TScanRangeLocation location : locations.getLocations()) { + if (!provider.isDataNodeAvailable(location.getBackend_id())) { + ++nonAvailNum; + } + } + } + // the scanRangeLocations contains non-avail locations + Assert.assertTrue(nonAvailNum > 0); + + { // normal case + FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment(); + NormalBackendSelector selector = + new NormalBackendSelector(scanNode, scanLocations, assignment, provider, false); + // the computation will not fail even though there are non-available locations + ExceptionChecker.expectThrowsNoException(selector::computeScanRangeAssignment); + + // check the assignment, should be all in the availList + for (long id : assignment.keySet()) { + Assert.assertTrue(availList.contains(id)); + } + } + + { // make only one node available, the final assignment will be all on the single available node + ComputeNode availNode = id2AllNodes.get(availList.get(0)); + WorkerProvider provider1 = new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2AllNodes), + ImmutableMap.of(availNode.getId(), availNode)); + + FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment(); + NormalBackendSelector selector = + new NormalBackendSelector(scanNode, scanLocations, assignment, provider1, false); + // the computation will not fail even though there are non-available locations + ExceptionChecker.expectThrowsNoException(selector::computeScanRangeAssignment); + + Assert.assertEquals(1, assignment.size()); + // check the assignment, should be all in the availList + for (long id : assignment.keySet()) { + Assert.assertEquals(availNode.getId(), id); + } + } + + { // make no node available. Exception throws + WorkerProvider providerNoAvailNode = new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2AllNodes), + ImmutableMap.of()); + FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment(); + NormalBackendSelector selector = + new NormalBackendSelector(scanNode, scanLocations, assignment, providerNoAvailNode, false); + Assert.assertThrows(NonRecoverableException.class, selector::computeScanRangeAssignment); + } + } + + @Test + public void testCollocationBackendSelectorWithSharedDataWorkerProvider() { + HostBlacklist blockList = SimpleScheduler.getHostBlacklist(); + blockList.hostBlacklist.clear(); + SystemInfoService sysInfo = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + List availList = prepareNodeAliveAndBlock(sysInfo, blockList); + + int bucketNum = 6; + OlapScanNode scanNode = newOlapScanNode(10, bucketNum); + final Map> bucketSeqToBackends = ImmutableMap.of( + 0, ImmutableList.of(1L), + 1, ImmutableList.of(2L), + 2, ImmutableList.of(3L), + 3, ImmutableList.of(4L), + 4, ImmutableList.of(5L), + 5, ImmutableList.of(6L) + ); + scanNode.bucketSeq2locations = genBucketSeq2Locations(bucketSeqToBackends, 3); + List scanLocations = generateScanRangeLocations(id2AllNodes, 10, bucketNum); + WorkerProvider provider = newWorkerProvider(); + + int nonAvailNum = 0; + for (TScanRangeLocations locations : scanLocations) { + for (TScanRangeLocation location : locations.getLocations()) { + if (!provider.isDataNodeAvailable(location.getBackend_id())) { + ++nonAvailNum; + } + } + } + // the scanRangeLocations contains non-avail locations + Assert.assertTrue(nonAvailNum > 0); + + { // normal case + FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment(); + ColocatedBackendSelector.Assignment colAssignment = new ColocatedBackendSelector.Assignment(scanNode, 1); + ColocatedBackendSelector selector = + new ColocatedBackendSelector(scanNode, assignment, colAssignment, false, provider, 1); + // the computation will not fail even though there are non-available locations + ExceptionChecker.expectThrowsNoException(selector::computeScanRangeAssignment); + + // check the assignment, should be all in the availList + for (long id : assignment.keySet()) { + Assert.assertTrue(availList.contains(id)); + } + } + + { // make only one node available, the final assignment will be all on the single available node + ComputeNode availNode = id2AllNodes.get(availList.get(0)); + WorkerProvider provider1 = new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2AllNodes), + ImmutableMap.of(availNode.getId(), availNode)); + + FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment(); + ColocatedBackendSelector.Assignment colAssignment = new ColocatedBackendSelector.Assignment(scanNode, 1); + ColocatedBackendSelector selector = + new ColocatedBackendSelector(scanNode, assignment, colAssignment, false, provider1, 1); + // the computation will not fail even though there are non-available locations + ExceptionChecker.expectThrowsNoException(selector::computeScanRangeAssignment); + + Assert.assertEquals(1, assignment.size()); + // check the assignment, should be all in the availList + for (long id : assignment.keySet()) { + Assert.assertEquals(availNode.getId(), id); + } + } + + { // make no node available. Exception throws + WorkerProvider providerNoAvailNode = new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2AllNodes), + ImmutableMap.of()); + FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment(); + ColocatedBackendSelector.Assignment colAssignment = new ColocatedBackendSelector.Assignment(scanNode, 1); + ColocatedBackendSelector selector = + new ColocatedBackendSelector(scanNode, assignment, colAssignment, false, providerNoAvailNode, 1); + Assert.assertThrows(NonRecoverableException.class, selector::computeScanRangeAssignment); + } + } + + @Test + public void testNextWorkerOverflow() throws NonRecoverableException { + WorkerProvider provider = + new DefaultSharedDataWorkerProvider(ImmutableMap.copyOf(id2AllNodes), ImmutableMap.copyOf(id2AllNodes)); + for (int i = 0; i < 100; i++) { + Long workerId = provider.selectNextWorker(); + assertThat(workerId).isNotNegative(); + } + DefaultSharedDataWorkerProvider.getNextComputeNodeIndexer().set(Integer.MAX_VALUE); + for (int i = 0; i < 100; i++) { + Long workerId = provider.selectNextWorker(); + assertThat(workerId).isNotNegative(); + } + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/scheduler/DefaultWorkerProviderTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/scheduler/DefaultWorkerProviderTest.java new file mode 100644 index 0000000000000..bc4c3f84469bc --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/qe/scheduler/DefaultWorkerProviderTest.java @@ -0,0 +1,398 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe.scheduler; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.starrocks.common.Reference; +import com.starrocks.common.UserException; +import com.starrocks.qe.SessionVariableConstants.ComputationFragmentSchedulingPolicy; +import com.starrocks.qe.SimpleScheduler; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.WarehouseManager; +import com.starrocks.system.Backend; +import com.starrocks.system.ComputeNode; +import com.starrocks.system.SystemInfoService; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +public class DefaultWorkerProviderTest { + private final ImmutableMap id2Backend = genWorkers(0, 10, Backend::new, false); + private final ImmutableMap id2ComputeNode = genWorkers(10, 15, ComputeNode::new, false); + private final ImmutableMap availableId2Backend = ImmutableMap.of( + 0L, id2Backend.get(0L), + 2L, id2Backend.get(2L), + 3L, id2Backend.get(3L), + 5L, id2Backend.get(5L), + 7L, id2Backend.get(7L)); + private final ImmutableMap availableId2ComputeNode = ImmutableMap.of( + 10L, id2ComputeNode.get(10L), + 12L, id2ComputeNode.get(12L), + 14L, id2ComputeNode.get(14L)); + private final Map availableId2Worker = Stream.of(availableId2Backend, availableId2ComputeNode) + .map(Map::entrySet) + .flatMap(Collection::stream) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + private static ImmutableMap genWorkers(long startId, long endId, + Supplier factory, boolean halfDead) { + Map res = new TreeMap<>(); + for (long i = startId; i < endId; i++) { + C worker = factory.get(); + worker.setId(i); + if (halfDead && i % 2 == 0) { + worker.setAlive(false); + } else { + worker.setAlive(true); + } + worker.setHost("host#" + i); + worker.setBePort(80); + res.put(i, worker); + } + return ImmutableMap.copyOf(res); + } + + @Test + public void testCaptureAvailableWorkers() { + + long deadBEId = 1L; + long deadCNId = 11L; + long inBlacklistBEId = 3L; + long inBlacklistCNId = 13L; + Set nonAvailableWorkerId = ImmutableSet.of(deadBEId, deadCNId, inBlacklistBEId, inBlacklistCNId); + id2Backend.get(deadBEId).setAlive(false); + id2ComputeNode.get(deadCNId).setAlive(false); + new MockUp() { + @Mock + public boolean isInBlocklist(long backendId) { + return backendId == inBlacklistBEId || backendId == inBlacklistCNId; + } + }; + + Reference nextComputeNodeIndex = new Reference<>(0); + new MockUp() { + @Mock + int getNextComputeNodeIndex() { + int next = nextComputeNodeIndex.getRef(); + nextComputeNodeIndex.setRef(next + 1); + return next; + } + }; + + new MockUp() { + @Mock + public ImmutableMap getIdToBackend() { + return id2Backend; + } + + @Mock + public ImmutableMap getIdComputeNode() { + return id2ComputeNode; + } + }; + + DefaultWorkerProvider.Factory workerProviderFactory = new DefaultWorkerProvider.Factory(); + DefaultWorkerProvider workerProvider; + List numUsedComputeNodesList = ImmutableList.of(100, 0, -1, 1, 2, 3, 4, 5, 6); + for (Integer numUsedComputeNodes : numUsedComputeNodesList) { + // Reset nextComputeNodeIndex. + nextComputeNodeIndex.setRef(0); + + workerProvider = + workerProviderFactory.captureAvailableWorkers(GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(), + true, numUsedComputeNodes, ComputationFragmentSchedulingPolicy.COMPUTE_NODES_ONLY, + WarehouseManager.DEFAULT_WAREHOUSE_ID); + + int numAvailableComputeNodes = 0; + for (long id = 0; id < 15; id++) { + ComputeNode worker = workerProvider.getWorkerById(id); + if (nonAvailableWorkerId.contains(id) + // Exceed the limitation of numUsedComputeNodes. + || (numUsedComputeNodes > 0 && numAvailableComputeNodes >= numUsedComputeNodes)) { + Assert.assertNull(worker); + } else { + Assert.assertNotNull("numUsedComputeNodes=" + numUsedComputeNodes + ",id=" + id, worker); + Assert.assertEquals(id, worker.getId()); + + if (id2ComputeNode.containsKey(id)) { + numAvailableComputeNodes++; + } + } + } + } + } + + /** + * The schedule policy is suitable in shared nothing mode. + */ + @Test + public void testSelectBackendAndComputeNode() { + new MockUp() { + @Mock + public ImmutableMap getIdToBackend() { + return availableId2Backend; + } + + @Mock + public ImmutableMap getIdComputeNode() { + return availableId2ComputeNode; + } + }; + + DefaultWorkerProvider.Factory workerProviderFactory = new DefaultWorkerProvider.Factory(); + DefaultWorkerProvider workerProvider; + List numUsedComputeNodesList = ImmutableList.of(-1, 0, 2, 3, 5, 8, 10); + + // test ComputeNode only + for (Integer numUsedComputeNodes : numUsedComputeNodesList) { + workerProvider = + workerProviderFactory.captureAvailableWorkers(GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(), + true, numUsedComputeNodes, ComputationFragmentSchedulingPolicy.COMPUTE_NODES_ONLY, + WarehouseManager.DEFAULT_WAREHOUSE_ID); + List selectedWorkerIdsList = workerProvider.getAllAvailableNodes(); + for (Long selectedWorkerId : selectedWorkerIdsList) { + Assert.assertTrue("selectedWorkerId:" + selectedWorkerId, + availableId2ComputeNode.containsKey(selectedWorkerId)); + } + } + // test Backend only + for (Integer numUsedComputeNodes : numUsedComputeNodesList) { + workerProvider = + workerProviderFactory.captureAvailableWorkers(GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(), + false, numUsedComputeNodes, ComputationFragmentSchedulingPolicy.COMPUTE_NODES_ONLY, + WarehouseManager.DEFAULT_WAREHOUSE_ID); + List selectedWorkerIdsList = workerProvider.getAllAvailableNodes(); + Assert.assertEquals(availableId2Backend.size(), selectedWorkerIdsList.size()); + for (Long selectedWorkerId : selectedWorkerIdsList) { + Assert.assertTrue("selectedWorkerId:" + selectedWorkerId, + availableId2Backend.containsKey(selectedWorkerId)); + } + } + // test Backend and ComputeNode + for (Integer numUsedComputeNodes : numUsedComputeNodesList) { + workerProvider = + workerProviderFactory.captureAvailableWorkers(GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(), + true, numUsedComputeNodes, ComputationFragmentSchedulingPolicy.ALL_NODES, + WarehouseManager.DEFAULT_WAREHOUSE_ID); + List selectedWorkerIdsList = workerProvider.getAllAvailableNodes(); + Collections.reverse(selectedWorkerIdsList); //put ComputeNode id to the front,Backend id to the back + //test ComputeNode + for (int i = 0; i < availableId2ComputeNode.size() && i < selectedWorkerIdsList.size(); i++) { + Assert.assertTrue("selectedWorkerId:" + selectedWorkerIdsList.get(i), + availableId2ComputeNode.containsKey(selectedWorkerIdsList.get(i))); + } + //test Backend + for (int i = availableId2ComputeNode.size(); i < selectedWorkerIdsList.size(); i++) { + Assert.assertTrue("selectedWorkerId:" + selectedWorkerIdsList.get(i), + availableId2Backend.containsKey(selectedWorkerIdsList.get(i))); + } + } + } + + @Test + public void testSelectWorker() throws UserException { + DefaultWorkerProvider workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, availableId2ComputeNode, + true); + for (long id = -1; id < 20; id++) { + if (availableId2Worker.containsKey(id)) { + workerProvider.selectWorker(id); + testUsingWorkerHelper(workerProvider, id); + } else { + long finalId = id; + Assert.assertThrows(NonRecoverableException.class, () -> workerProvider.selectWorker(finalId)); + } + } + } + + private static void testSelectNextWorkerHelper(DefaultWorkerProvider workerProvider, + Map id2Worker) + throws UserException { + + Set selectedWorkers = new HashSet<>(id2Worker.size()); + for (int i = 0; i < id2Worker.size(); i++) { + long workerId = workerProvider.selectNextWorker(); + + Assert.assertFalse(selectedWorkers.contains(workerId)); + selectedWorkers.add(workerId); + + testUsingWorkerHelper(workerProvider, workerId); + } + } + + @Test + public void testSelectNextWorker() throws UserException { + DefaultWorkerProvider workerProvider; + + workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, availableId2ComputeNode, + true); + testSelectNextWorkerHelper(workerProvider, availableId2ComputeNode); + + workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, ImmutableMap.of(), true); + testSelectNextWorkerHelper(workerProvider, availableId2Backend); + + workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, availableId2ComputeNode, + false); + testSelectNextWorkerHelper(workerProvider, availableId2Backend); + + ImmutableMap id2BackendHalfDead = genWorkers(0, 10, Backend::new, true); + workerProvider = + new DefaultWorkerProvider(id2BackendHalfDead, id2ComputeNode, ImmutableMap.of(), ImmutableMap.of(), + false); + DefaultWorkerProvider finalWorkerProvider = workerProvider; + + SchedulerException e = Assert.assertThrows(SchedulerException.class, finalWorkerProvider::selectNextWorker); + Assert.assertEquals( + "Backend node not found. Check if any backend node is down.backend:" + + " [host#0 alive: false inBlacklist: false] " + + "[host#2 alive: false inBlacklist: false]" + + " [host#4 alive: false inBlacklist: false]" + + " [host#6 alive: false inBlacklist: false]" + + " [host#8 alive: false inBlacklist: false] ", + e.getMessage()); + ImmutableMap id2ComputeNodeHalfDead = genWorkers(10, 15, ComputeNode::new, true); + workerProvider = + new DefaultWorkerProvider(id2ComputeNodeHalfDead, ImmutableMap.of()); + finalWorkerProvider = workerProvider; + e = Assert.assertThrows(SchedulerException.class, finalWorkerProvider::selectNextWorker); + Assert.assertEquals( + "Compute node not found. Check if any compute node is down.compute node:" + + " [host#10 alive: false inBlacklist: false]" + + " [host#12 alive: false inBlacklist: false]" + + " [host#14 alive: false inBlacklist: false] ", + e.getMessage()); + } + + @Test + public void testChooseAllComputedNodes() { + DefaultWorkerProvider workerProvider; + List computeNodes; + + workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, availableId2ComputeNode, + false); + computeNodes = workerProvider.selectAllComputeNodes(); + Assert.assertTrue(computeNodes.isEmpty()); + + workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, availableId2ComputeNode, + true); + computeNodes = workerProvider.selectAllComputeNodes(); + Assert.assertEquals(availableId2ComputeNode.size(), computeNodes.size()); + Set computeNodeSet = new HashSet<>(computeNodes); + for (ComputeNode computeNode : availableId2ComputeNode.values()) { + Assert.assertTrue(computeNodeSet.contains(computeNode.getId())); + + testUsingWorkerHelper(workerProvider, computeNode.getId()); + } + } + + private static void testGetBackendHelper(DefaultWorkerProvider workerProvider, + Map availableId2Worker) { + // not allow using backup node + Assert.assertFalse(workerProvider.allowUsingBackupNode()); + for (long id = -1; id < 10; id++) { + ComputeNode backend = workerProvider.getBackend(id); + boolean isContained = workerProvider.isDataNodeAvailable(id); + if (!availableId2Worker.containsKey(id)) { + Assert.assertNull(backend); + Assert.assertFalse(isContained); + } else { + Assert.assertNotNull("id=" + id, backend); + Assert.assertEquals(availableId2Worker.get(id), backend); + Assert.assertTrue(isContained); + } + // chooseBackupNode always returns -1 + Assert.assertEquals(-1, workerProvider.selectBackupWorker(id)); + } + } + + @Test + public void testGetBackend() { + DefaultWorkerProvider workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, availableId2ComputeNode, + true); + testGetBackendHelper(workerProvider, availableId2Backend); + } + + @Test + public void testGetWorkersPreferringComputeNode() { + DefaultWorkerProvider workerProvider; + + workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, availableId2ComputeNode, + true); + assertThat(workerProvider.getAllWorkers()) + .containsOnlyOnceElementsOf(availableId2ComputeNode.values()); + + workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, ImmutableMap.of(), true); + assertThat(workerProvider.getAllWorkers()) + .containsOnlyOnceElementsOf(availableId2Backend.values()); + + workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, availableId2ComputeNode, + false); + assertThat(workerProvider.getAllWorkers()) + .containsOnlyOnceElementsOf(availableId2ComputeNode.values()); + } + + @Test + public void testReportBackendNotFoundException() { + DefaultWorkerProvider workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, availableId2ComputeNode, + true); + Assert.assertThrows(SchedulerException.class, workerProvider::reportDataNodeNotFoundException); + } + + @Test + public void testNextWorkerOverflow() throws NonRecoverableException { + DefaultWorkerProvider workerProvider = + new DefaultWorkerProvider(id2Backend, id2ComputeNode, availableId2Backend, availableId2ComputeNode, true); + for (int i = 0; i < 100; i++) { + Long workerId = workerProvider.selectNextWorker(); + assertThat(workerId).isNotNegative(); + } + DefaultWorkerProvider.getNextComputeNodeIndexer().set(Integer.MAX_VALUE); + for (int i = 0; i < 100; i++) { + Long workerId = workerProvider.selectNextWorker(); + assertThat(workerId).isNotNegative(); + } + } + + public static void testUsingWorkerHelper(DefaultWorkerProvider workerProvider, Long workerId) { + Assert.assertTrue(workerProvider.isWorkerSelected(workerId)); + assertThat(workerProvider.getSelectedWorkerIds()).contains(workerId); + } +}