Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvemment](profile) use one counter to represent MemoryUsage and MemoryUsagePeak #44645

Merged
merged 1 commit into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() {

COUNTER_SET(_memory_used_counter,
arena_memory_usage + hash_table_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value());

COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage);
Expand Down Expand Up @@ -415,7 +414,6 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
void AggSinkLocalState::_update_memusage_without_key() {
int64_t arena_memory_usage = _agg_arena_pool->size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block
}

COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes());
COUNTER_SET(local_state._peak_memory_usage_counter, local_state._memory_used_counter->value());

//TODO: if need improvement, the is a tips to maintain a free queue,
//so the memory could reuse, no need to new/delete again;
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(_parent->peak_memory_usage_counter(),
_parent->memory_used_counter()->value());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
Expand Down
10 changes: 0 additions & 10 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (all_receiver_eof) {
return Status::EndOfFile("all data stream channels EOF");
}
Defer defer([&]() {
COUNTER_SET(local_state._peak_memory_usage_counter,
local_state._memory_used_counter->value());
});

if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) {
// 1. serialize depends on it is not local exchange
Expand Down Expand Up @@ -505,8 +501,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
COUNTER_UPDATE(local_state.memory_used_counter(),
new_channel_mem_usage - old_channel_mem_usage);
COUNTER_SET(local_state.peak_memory_usage_counter(),
local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
Expand Down Expand Up @@ -555,8 +549,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
COUNTER_UPDATE(local_state.memory_used_counter(),
new_channel_mem_usage - old_channel_mem_usage);
COUNTER_SET(local_state.peak_memory_usage_counter(),
local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
Expand All @@ -581,8 +573,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
COUNTER_UPDATE(local_state.memory_used_counter(),
new_channel_mem_usage - old_channel_mem_usage);
COUNTER_SET(local_state.peak_memory_usage_counter(),
local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.
// 1. select channel
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
_build_blocks_memory_usage->value() +
(int64_t)(arg.hash_table->get_byte_size() +
arg.serialized_keys_size(true)));
COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value());
return st;
}},
_shared_state->hash_table_variants->method_variant, _shared_state->join_op_variants,
Expand Down Expand Up @@ -486,7 +485,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
std::move(*in_block)));
int64_t blocks_mem_usage = local_state._build_side_mutable_block.allocated_bytes();
COUNTER_SET(local_state._memory_used_counter, blocks_mem_usage);
COUNTER_SET(local_state._peak_memory_usage_counter, blocks_mem_usage);
COUNTER_SET(local_state._build_blocks_memory_usage, blocks_mem_usage);
}
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,6 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu
input_block->swap(local_state._probe_block);
COUNTER_SET(local_state._memory_used_counter,
(int64_t)local_state._probe_block.allocated_bytes());
COUNTER_SET(local_state._peak_memory_usage_counter,
local_state._memory_used_counter->value());
}
}
return Status::OK();
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_sid
int64_t arena_memory_usage = hash_table_ctx.serialized_keys_size(false);
COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage);
COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage);
COUNTER_SET(_parent->_peak_memory_usage_counter, _parent->_memory_used_counter->value());
}

return typename HashTableType::State(_parent->_probe_columns);
Expand Down
22 changes: 4 additions & 18 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::
return status;
}
status = get_block(state, block, eos);
local_state->_peak_memory_usage_counter->set(local_state->_memory_used_counter->value());
return status;
}

Expand Down Expand Up @@ -441,11 +440,7 @@ PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
}

PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: _num_rows_returned(0),
_rows_returned_counter(nullptr),
_peak_memory_usage_counter(nullptr),
_parent(parent),
_state(state) {
: _num_rows_returned(0), _rows_returned_counter(nullptr), _parent(parent), _state(state) {
_query_statistics = std::make_shared<QueryStatistics>();
}

Expand Down Expand Up @@ -484,9 +479,8 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1);
_close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
_memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", TUnit::BYTES, 1);
_peak_memory_usage_counter =
_runtime_profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1);
_memory_used_counter =
_runtime_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
return Status::OK();
}

Expand Down Expand Up @@ -519,9 +513,6 @@ Status PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
if constexpr (!std::is_same_v<SharedStateArg, FakeSharedState>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_memory_used_counter->value());
}
_closed = true;
// Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator).
// We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed.
Expand Down Expand Up @@ -560,9 +551,7 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
_memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", TUnit::BYTES, 1);
_peak_memory_usage_counter =
_profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1);
_memory_used_counter = _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
return Status::OK();
}

Expand All @@ -574,9 +563,6 @@ Status PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status e
if constexpr (!std::is_same_v<SharedState, FakeSharedState>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_memory_used_counter->value());
}
_closed = true;
return Status::OK();
}
Expand Down
14 changes: 3 additions & 11 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,6 @@ class PipelineXLocalStateBase {

RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; }
RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() {
return _peak_memory_usage_counter;
}
OperatorXBase* parent() { return _parent; }
RuntimeState* state() { return _state; }
vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
Expand Down Expand Up @@ -202,11 +199,10 @@ class PipelineXLocalStateBase {
RuntimeProfile::Counter* _rows_returned_counter = nullptr;
RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
// Account for current memory and peak memory used by this node
RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _projection_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
// Account for peak memory used by this node
RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
RuntimeProfile::Counter* _init_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
Expand Down Expand Up @@ -348,9 +344,6 @@ class PipelineXSinkLocalStateBase {
RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; }
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; }
RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() {
return _peak_memory_usage_counter;
}
virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }

// override in exchange sink , AsyncWriterSink
Expand Down Expand Up @@ -380,8 +373,7 @@ class PipelineXSinkLocalStateBase {
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;

std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,6 @@ Status ScanLocalState<Derived>::_init_profile() {
_total_throughput_counter =
profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter);
_num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
_scanner_peak_memory_usage = _peak_memory_usage_counter;
//_runtime_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES);

// 2. counters for scanners
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterCon
// Max num of scanner thread
RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
RuntimeProfile::HighWaterMarkCounter* _scanner_peak_memory_usage = nullptr;
// time of get block from scanner
RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in
int64_t data_size = local_state._shared_state->sorter->data_size();
COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
COUNTER_SET(local_state._memory_used_counter, data_size);
COUNTER_SET(local_state._peak_memory_usage_counter, data_size);

RETURN_IF_CANCELLED(state);

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc
int64_t data_size = local_state._shared_state->in_mem_shared_state->sorter->data_size();
COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
COUNTER_SET(local_state._memory_used_counter, data_size);
COUNTER_SET(local_state._peak_memory_usage_counter, data_size);

if (eos) {
if (local_state._shared_state->is_spilled) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ Status StreamingAggLocalState::_merge_without_key(vectorized::Block* block) {
void StreamingAggLocalState::_update_memusage_without_key() {
int64_t arena_memory_usage = _agg_arena_pool->size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}

Expand All @@ -378,8 +377,6 @@ void StreamingAggLocalState::_update_memusage_with_serialized_key() {

COUNTER_SET(_memory_used_counter,
arena_memory_usage + hash_table_memory_usage);
COUNTER_SET(_peak_memory_usage_counter,
arena_memory_usage + hash_table_memory_usage);

COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage);
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ void LoadChannel::_init_profile() {
_load_id.to_string(), _sender_ip, _backend_id),
true, true);
_add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT);
_peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES);
_add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime");
_handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", "AddBatchTime");
_add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", TUnit::UNIT);
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class LoadChannel {
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile* _self_profile = nullptr;
RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
RuntimeProfile::Counter* _add_batch_timer = nullptr;
RuntimeProfile::Counter* _add_batch_times = nullptr;
RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr;
Expand Down
Loading
Loading