Skip to content

Commit

Permalink
[improvemment](profile) use one counter to represent MemoryUsage and …
Browse files Browse the repository at this point in the history
…MemoryUsagePeak
  • Loading branch information
jacktengg committed Nov 26, 2024
1 parent 71b7d3f commit 2e75559
Show file tree
Hide file tree
Showing 22 changed files with 79 additions and 115 deletions.
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() {

COUNTER_SET(_memory_used_counter,
arena_memory_usage + hash_table_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value());

COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage);
Expand Down Expand Up @@ -415,7 +414,6 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
void AggSinkLocalState::_update_memusage_without_key() {
int64_t arena_memory_usage = _agg_arena_pool->size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block
}

COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes());
COUNTER_SET(local_state._peak_memory_usage_counter, local_state._memory_used_counter->value());

//TODO: if need improvement, the is a tips to maintain a free queue,
//so the memory could reuse, no need to new/delete again;
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
RETURN_IF_ERROR(
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong());
COUNTER_SET(_parent->peak_memory_usage_counter(),
_parent->memory_used_counter()->value());
}
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
Expand Down
10 changes: 0 additions & 10 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (all_receiver_eof) {
return Status::EndOfFile("all data stream channels EOF");
}
Defer defer([&]() {
COUNTER_SET(local_state._peak_memory_usage_counter,
local_state._memory_used_counter->value());
});

if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) {
// 1. serialize depends on it is not local exchange
Expand Down Expand Up @@ -505,8 +501,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
COUNTER_UPDATE(local_state.memory_used_counter(),
new_channel_mem_usage - old_channel_mem_usage);
COUNTER_SET(local_state.peak_memory_usage_counter(),
local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
Expand Down Expand Up @@ -555,8 +549,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
COUNTER_UPDATE(local_state.memory_used_counter(),
new_channel_mem_usage - old_channel_mem_usage);
COUNTER_SET(local_state.peak_memory_usage_counter(),
local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state.channels) {
Expand All @@ -581,8 +573,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
COUNTER_UPDATE(local_state.memory_used_counter(),
new_channel_mem_usage - old_channel_mem_usage);
COUNTER_SET(local_state.peak_memory_usage_counter(),
local_state.memory_used_counter()->value());
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.
// 1. select channel
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
_build_blocks_memory_usage->value() +
(int64_t)(arg.hash_table->get_byte_size() +
arg.serialized_keys_size(true)));
COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value());
return st;
}},
_shared_state->hash_table_variants->method_variant, _shared_state->join_op_variants,
Expand Down Expand Up @@ -486,7 +485,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
std::move(*in_block)));
int64_t blocks_mem_usage = local_state._build_side_mutable_block.allocated_bytes();
COUNTER_SET(local_state._memory_used_counter, blocks_mem_usage);
COUNTER_SET(local_state._peak_memory_usage_counter, blocks_mem_usage);
COUNTER_SET(local_state._build_blocks_memory_usage, blocks_mem_usage);
}
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,6 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu
input_block->swap(local_state._probe_block);
COUNTER_SET(local_state._memory_used_counter,
(int64_t)local_state._probe_block.allocated_bytes());
COUNTER_SET(local_state._peak_memory_usage_counter,
local_state._memory_used_counter->value());
}
}
return Status::OK();
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_sid
int64_t arena_memory_usage = hash_table_ctx.serialized_keys_size(false);
COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage);
COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage);
COUNTER_SET(_parent->_peak_memory_usage_counter, _parent->_memory_used_counter->value());
}

return typename HashTableType::State(_parent->_probe_columns);
Expand Down
22 changes: 4 additions & 18 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::
return status;
}
status = get_block(state, block, eos);
local_state->_peak_memory_usage_counter->set(local_state->_memory_used_counter->value());
return status;
}

Expand Down Expand Up @@ -441,11 +440,7 @@ PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
}

PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: _num_rows_returned(0),
_rows_returned_counter(nullptr),
_peak_memory_usage_counter(nullptr),
_parent(parent),
_state(state) {
: _num_rows_returned(0), _rows_returned_counter(nullptr), _parent(parent), _state(state) {
_query_statistics = std::make_shared<QueryStatistics>();
}

Expand Down Expand Up @@ -484,9 +479,8 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1);
_close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
_memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", TUnit::BYTES, 1);
_peak_memory_usage_counter =
_runtime_profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1);
_memory_used_counter =
_runtime_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
return Status::OK();
}

Expand Down Expand Up @@ -519,9 +513,6 @@ Status PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
if constexpr (!std::is_same_v<SharedStateArg, FakeSharedState>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_memory_used_counter->value());
}
_closed = true;
// Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator).
// We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed.
Expand Down Expand Up @@ -560,9 +551,7 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
_memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", TUnit::BYTES, 1);
_peak_memory_usage_counter =
_profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1);
_memory_used_counter = _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
return Status::OK();
}

Expand All @@ -574,9 +563,6 @@ Status PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status e
if constexpr (!std::is_same_v<SharedState, FakeSharedState>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_memory_used_counter->value());
}
_closed = true;
return Status::OK();
}
Expand Down
14 changes: 3 additions & 11 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,6 @@ class PipelineXLocalStateBase {

RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; }
RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() {
return _peak_memory_usage_counter;
}
OperatorXBase* parent() { return _parent; }
RuntimeState* state() { return _state; }
vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
Expand Down Expand Up @@ -202,11 +199,10 @@ class PipelineXLocalStateBase {
RuntimeProfile::Counter* _rows_returned_counter = nullptr;
RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
// Account for current memory and peak memory used by this node
RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _projection_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
// Account for peak memory used by this node
RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
RuntimeProfile::Counter* _init_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
Expand Down Expand Up @@ -348,9 +344,6 @@ class PipelineXSinkLocalStateBase {
RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; }
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; }
RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() {
return _peak_memory_usage_counter;
}
virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }

// override in exchange sink , AsyncWriterSink
Expand Down Expand Up @@ -380,8 +373,7 @@ class PipelineXSinkLocalStateBase {
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;

std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,6 @@ Status ScanLocalState<Derived>::_init_profile() {
_total_throughput_counter =
profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter);
_num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
_scanner_peak_memory_usage = _peak_memory_usage_counter;
//_runtime_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES);

// 2. counters for scanners
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterCon
// Max num of scanner thread
RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
RuntimeProfile::HighWaterMarkCounter* _scanner_peak_memory_usage = nullptr;
// time of get block from scanner
RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in
int64_t data_size = local_state._shared_state->sorter->data_size();
COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
COUNTER_SET(local_state._memory_used_counter, data_size);
COUNTER_SET(local_state._peak_memory_usage_counter, data_size);

RETURN_IF_CANCELLED(state);

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc
int64_t data_size = local_state._shared_state->in_mem_shared_state->sorter->data_size();
COUNTER_SET(local_state._sort_blocks_memory_usage, data_size);
COUNTER_SET(local_state._memory_used_counter, data_size);
COUNTER_SET(local_state._peak_memory_usage_counter, data_size);

if (eos) {
if (local_state._shared_state->is_spilled) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ Status StreamingAggLocalState::_merge_without_key(vectorized::Block* block) {
void StreamingAggLocalState::_update_memusage_without_key() {
int64_t arena_memory_usage = _agg_arena_pool->size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}

Expand All @@ -378,8 +377,6 @@ void StreamingAggLocalState::_update_memusage_with_serialized_key() {

COUNTER_SET(_memory_used_counter,
arena_memory_usage + hash_table_memory_usage);
COUNTER_SET(_peak_memory_usage_counter,
arena_memory_usage + hash_table_memory_usage);

COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage);
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ void LoadChannel::_init_profile() {
_load_id.to_string(), _sender_ip, _backend_id),
true, true);
_add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT);
_peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES);
_add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime");
_handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", "AddBatchTime");
_add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", TUnit::UNIT);
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class LoadChannel {
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile* _self_profile = nullptr;
RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
RuntimeProfile::Counter* _add_batch_timer = nullptr;
RuntimeProfile::Counter* _add_batch_times = nullptr;
RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr;
Expand Down
Loading

0 comments on commit 2e75559

Please sign in to comment.