Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reclaim from join table directly from hash join bridge #11300

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 12 additions & 42 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ HashBuild::HashBuild(
}

tableType_ = ROW(std::move(names), std::move(types));
joinBridge_->maybeSetTableType(tableType_);
joinBridge_->maybeSetSpillConfig(spillConfig());
joinBridge_->maybeSetJoinNode(joinNode_);
setupTable();
setupSpiller();
stateCleared_ = false;
Expand Down Expand Up @@ -780,6 +783,7 @@ bool HashBuild::finishHashBuild() {
addRuntimeStats();
joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_);
joinBridge_->maybeSetSpillStatsRecorder(&spillStats_);
if (canSpill()) {
stateCleared_ = true;
}
Expand Down Expand Up @@ -1091,6 +1095,7 @@ void HashBuild::reclaim(
VELOX_CHECK(task->pauseRequested());
const std::vector<Operator*> operators =
task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this);

for (auto* op : operators) {
HashBuild* buildOp = dynamic_cast<HashBuild*>(op);
VELOX_CHECK_NOT_NULL(buildOp);
Expand All @@ -1108,53 +1113,18 @@ void HashBuild::reclaim(
}
}

struct SpillResult {
const std::exception_ptr error{nullptr};

explicit SpillResult(std::exception_ptr _error) : error(_error) {}
};

std::vector<std::shared_ptr<AsyncSource<SpillResult>>> spillTasks;
auto* spillExecutor = spillConfig()->executor;
std::vector<Spiller*> spillers;
for (auto* op : operators) {
HashBuild* buildOp = static_cast<HashBuild*>(op);
spillTasks.push_back(
memory::createAsyncMemoryReclaimTask<SpillResult>([buildOp]() {
try {
buildOp->spiller_->spill();
buildOp->table_->clear();
// Release the minimum reserved memory.
buildOp->pool()->release();
return std::make_unique<SpillResult>(nullptr);
} catch (const std::exception& e) {
LOG(ERROR) << "Spill from hash build pool "
<< buildOp->pool()->name() << " failed: " << e.what();
// The exception is captured and thrown by the caller.
return std::make_unique<SpillResult>(std::current_exception());
}
}));
if ((operators.size() > 1) && (spillExecutor != nullptr)) {
spillExecutor->add([source = spillTasks.back()]() { source->prepare(); });
}
spillers.push_back(buildOp->spiller_.get());
}

auto syncGuard = folly::makeGuard([&]() {
for (auto& spillTask : spillTasks) {
// We consume the result for the pending tasks. This is a cleanup in the
// guard and must not throw. The first error is already captured before
// this runs.
try {
spillTask->move();
} catch (const std::exception&) {
}
}
});
joinBridge_->spillTableFromSpillers(spillers);

for (auto& spillTask : spillTasks) {
const auto result = spillTask->move();
if (result->error) {
std::rethrow_exception(result->error);
}
for (auto* op : operators) {
HashBuild* buildOp = static_cast<HashBuild*>(op);
buildOp->table_->clear();
buildOp->pool()->release();
}
}

Expand Down
208 changes: 207 additions & 1 deletion velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/exec/HashJoinBridge.h"
#include "velox/common/memory/MemoryArbitrator.h"

namespace facebook::velox::exec {
namespace {
Expand All @@ -33,6 +34,205 @@ void HashJoinBridge::addBuilder() {
++numBuilders_;
}

void HashJoinBridge::maybeSetTableType(const RowTypePtr& tableType) {
std::lock_guard<std::mutex> l(mutex_);
if (tableType_ == nullptr) {
tableType_ = tableType;
} else {
VELOX_CHECK(*tableType_ == *tableType);
}
}

void HashJoinBridge::maybeSetSpillConfig(
const common::SpillConfig* spillConfig) {
std::lock_guard<std::mutex> l(mutex_);
if (!spillConfig_.has_value() && spillConfig != nullptr) {
spillConfig_ = *spillConfig;
}
}

void HashJoinBridge::maybeSetJoinNode(
const std::shared_ptr<const core::HashJoinNode>& joinNode) {
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK_NOT_NULL(joinNode);
if (joinNode_ == nullptr) {
joinNode_ = joinNode;
} else {
VELOX_CHECK(joinNode_ == joinNode);
}
}

void HashJoinBridge::maybeSetSpillStatsRecorder(
folly::Synchronized<common::SpillStats>* stats) {
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK_NOT_NULL(stats);
spillStats_ = stats;
}

bool HashJoinBridge::canReclaim() const {
return spillConfig_.has_value() && !probeStarted_.load() &&
buildResult_.has_value() && buildResult_->table != nullptr &&
buildResult_->table->numDistinct() != 0;
}

uint64_t HashJoinBridge::reclaim() {
VELOX_CHECK(buildResult_.has_value());
VELOX_CHECK_NOT_NULL(buildResult_->table);
VELOX_CHECK_NOT_NULL(spillStats_);

auto computeTableReservedBytes = [](std::vector<RowContainer*> allRows) {
uint64_t totalReservedBytes{0};
for (const auto* rowContainer : allRows) {
totalReservedBytes += rowContainer->pool()->reservedBytes();
}
return totalReservedBytes;
};
const auto oldMemUsage =
computeTableReservedBytes(buildResult_->table->allRows());

auto spillPartitionSet = spillTable(buildResult_->table, spillStats_);
buildResult_->table->clear(true);

const auto reclaimedBytes =
oldMemUsage - computeTableReservedBytes(buildResult_->table->allRows());

auto spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet);
if (restoringSpillPartitionId_.has_value()) {
for (const auto& id : spillPartitionIdSet) {
VELOX_DCHECK_LT(
restoringSpillPartitionId_->partitionBitOffset(),
id.partitionBitOffset());
}
}
for (auto& partitionEntry : spillPartitionSet) {
const auto id = partitionEntry.first;
VELOX_CHECK_EQ(spillPartitionSets_.count(id), 0);
spillPartitionSets_.emplace(id, std::move(partitionEntry.second));
}
buildResult_->restoredPartitionId = restoringSpillPartitionId_;
buildResult_->spillPartitionIds = spillPartitionIdSet;
restoringSpillPartitionId_.reset();

return reclaimedBytes;
}

const common::SpillConfig* HashJoinBridge::spillConfig() const {
return spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
}

HashBitRange HashJoinBridge::tableSpillHashBitRange() const {
const auto* config = spillConfig();
uint8_t startPartitionBit = config->startPartitionBit;
if (buildResult_->restoredPartitionId.has_value()) {
startPartitionBit =
buildResult_->restoredPartitionId->partitionBitOffset() +
config->numPartitionBits;
}
return HashBitRange(
startPartitionBit, startPartitionBit + config->numPartitionBits);
}

std::unique_ptr<Spiller> HashJoinBridge::createSpiller(
RowContainer* subTableRows,
folly::Synchronized<common::SpillStats>* stats) {
VELOX_CHECK_NOT_NULL(tableType_);
return std::make_unique<Spiller>(
Spiller::Type::kHashJoinBuild,
joinNode_->joinType(),
subTableRows,
hashJoinTableSpillType(tableType_, joinNode_->joinType()),
tableSpillHashBitRange(),
spillConfig(),
stats);
}

std::vector<std::unique_ptr<HashJoinBridge::SpillResult>>
HashJoinBridge::spillTableFromSpillers(const std::vector<Spiller*>& spillers) {
const auto* config = spillConfig();
VELOX_CHECK_NOT_NULL(config);
auto* spillExecutor = config->executor;
std::vector<std::shared_ptr<AsyncSource<HashJoinBridge::SpillResult>>>
spillTasks;
for (auto* spiller : spillers) {
spillTasks.push_back(
memory::createAsyncMemoryReclaimTask<HashJoinBridge::SpillResult>(
[spiller]() {
try {
spiller->spill();
return std::make_unique<SpillResult>(spiller);
} catch (const std::exception& e) {
LOG(ERROR) << "Spill from hash join bridge failed: "
<< e.what();
// The exception is captured and thrown by the caller.
return std::make_unique<SpillResult>(std::current_exception());
}
}));
if ((spillTasks.size() > 1) && (spillExecutor != nullptr)) {
spillExecutor->add([source = spillTasks.back()]() { source->prepare(); });
}
}

auto syncGuard = folly::makeGuard([&]() {
for (auto& spillTask : spillTasks) {
// We consume the result for the pending tasks. This is a cleanup in the
// guard and must not throw. The first error is already captured before
// this runs.
try {
spillTask->move();
} catch (const std::exception&) {
}
}
});

std::vector<std::unique_ptr<HashJoinBridge::SpillResult>> spillResults;
for (auto& spillTask : spillTasks) {
auto result = spillTask->move();
if (result->error) {
std::rethrow_exception(result->error);
}
spillResults.push_back(std::move(result));
}
return spillResults;
}

SpillPartitionSet HashJoinBridge::spillTable(
std::shared_ptr<BaseHashTable> table,
folly::Synchronized<common::SpillStats>* stats) {
VELOX_CHECK_NOT_NULL(table);
VELOX_CHECK(spillConfig_.has_value());
if (table->numDistinct() == 0) {
// Empty build side.
return {};
}

std::vector<std::unique_ptr<Spiller>> spillersHolder;
std::vector<Spiller*> spillers;
const std::vector<RowContainer*> rowContainers = table->allRows();
for (auto* rowContainer : rowContainers) {
if (rowContainer->numRows() == 0) {
continue;
}
spillersHolder.push_back(createSpiller(rowContainer, stats));
spillers.push_back(spillersHolder.back().get());
}
if (spillersHolder.empty()) {
return {};
}

auto spillResults = spillTableFromSpillers(spillers);

SpillPartitionSet spillPartitions;
for (auto& spillResult : spillResults) {
VELOX_CHECK_NULL(spillResult->error);
spillResult->spiller->finishSpill(spillPartitions);
}

// Remove the spilled partitions which are empty so as we don't need to
// trigger unnecessary spilling at hash probe side.
removeEmptyPartitions(spillPartitions);
return spillPartitions;
}

void HashJoinBridge::setHashTable(
std::unique_ptr<BaseHashTable> table,
SpillPartitionSet spillPartitionSet,
Expand Down Expand Up @@ -114,7 +314,7 @@ std::optional<HashJoinBridge::HashBuildResult> HashJoinBridge::tableOrFuture(
!buildResult_.has_value() ||
(!restoringSpillPartitionId_.has_value() &&
restoringSpillShards_.empty()));

probeStarted_ = true;
if (buildResult_.has_value()) {
return buildResult_.value();
}
Expand All @@ -134,6 +334,7 @@ bool HashJoinBridge::probeFinished() {
!restoringSpillPartitionId_.has_value() &&
restoringSpillShards_.empty());
VELOX_CHECK_GT(numBuilders_, 0);
probeStarted_ = false;

// NOTE: we are clearing the hash table as it has been fully processed and
// not needed anymore. We'll wait for the HashBuild operator to build a new
Expand Down Expand Up @@ -222,6 +423,11 @@ uint64_t HashJoinMemoryReclaimer::reclaim(
}
return !hasReclaimedFromBuild;
});
auto joinBridge = joinBridge_.lock();
VELOX_CHECK_NOT_NULL(joinBridge);
if (reclaimedBytes == 0 && joinBridge->canReclaim()) {
reclaimedBytes = joinBridge->reclaim();
}
return reclaimedBytes;
}

Expand Down
Loading
Loading