diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 27400fba474eef..44e58535b75b71 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -231,7 +231,6 @@ void AggSinkLocalState::_update_memusage_with_serialized_key() { COUNTER_SET(_memory_used_counter, arena_memory_usage + hash_table_memory_usage); - COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value()); COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage); @@ -415,7 +414,6 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) { void AggSinkLocalState::_update_memusage_without_key() { int64_t arena_memory_usage = _agg_arena_pool->size(); COUNTER_SET(_memory_used_counter, arena_memory_usage); - COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage); COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); } diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 377aeb6fa12be4..7cc25eef9446d6 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -323,7 +323,6 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block } COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes()); - COUNTER_SET(local_state._peak_memory_usage_counter, local_state._memory_used_counter->value()); //TODO: if need improvement, the is a tips to maintain a free queue, //so the memory could reuse, no need to new/delete again; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 0f02ffc2b9a4b1..6e6108d13a919f 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -159,8 +159,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong()); - COUNTER_SET(_parent->peak_memory_usage_counter(), - _parent->memory_used_counter()->value()); } _instance_to_package_queue[ins_id].emplace(std::move(request)); _total_queue_size++; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 85f58417197d52..dfa6df392b74ba 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -385,10 +385,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (all_receiver_eof) { return Status::EndOfFile("all data stream channels EOF"); } - Defer defer([&]() { - COUNTER_SET(local_state._peak_memory_usage_counter, - local_state._memory_used_counter->value()); - }); if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) { // 1. serialize depends on it is not local exchange @@ -505,8 +501,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } COUNTER_UPDATE(local_state.memory_used_counter(), new_channel_mem_usage - old_channel_mem_usage); - COUNTER_SET(local_state.peak_memory_usage_counter(), - local_state.memory_used_counter()->value()); } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { int64_t old_channel_mem_usage = 0; for (const auto& channel : local_state.channels) { @@ -555,8 +549,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } COUNTER_UPDATE(local_state.memory_used_counter(), new_channel_mem_usage - old_channel_mem_usage); - COUNTER_SET(local_state.peak_memory_usage_counter(), - local_state.memory_used_counter()->value()); } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { int64_t old_channel_mem_usage = 0; for (const auto& channel : local_state.channels) { @@ -581,8 +573,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } COUNTER_UPDATE(local_state.memory_used_counter(), new_channel_mem_usage - old_channel_mem_usage); - COUNTER_SET(local_state.peak_memory_usage_counter(), - local_state.memory_used_counter()->value()); } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { // Control the number of channels according to the flow, thereby controlling the number of table sink writers. // 1. select channel diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 7d3f4da935099e..c538a9f181c015 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -304,7 +304,6 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, _build_blocks_memory_usage->value() + (int64_t)(arg.hash_table->get_byte_size() + arg.serialized_keys_size(true))); - COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value()); return st; }}, _shared_state->hash_table_variants->method_variant, _shared_state->join_op_variants, @@ -486,7 +485,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* std::move(*in_block))); int64_t blocks_mem_usage = local_state._build_side_mutable_block.allocated_bytes(); COUNTER_SET(local_state._memory_used_counter, blocks_mem_usage); - COUNTER_SET(local_state._peak_memory_usage_counter, blocks_mem_usage); COUNTER_SET(local_state._build_blocks_memory_usage, blocks_mem_usage); } } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 426bfcb219dc04..7c663b256832ed 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -483,8 +483,6 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu input_block->swap(local_state._probe_block); COUNTER_SET(local_state._memory_used_counter, (int64_t)local_state._probe_block.allocated_bytes()); - COUNTER_SET(local_state._peak_memory_usage_counter, - local_state._memory_used_counter->value()); } } return Status::OK(); diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 05cd3d7d9e0590..c6b726d0d78e98 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -180,7 +180,6 @@ typename HashTableType::State ProcessHashTableProbe::_init_probe_sid int64_t arena_memory_usage = hash_table_ctx.serialized_keys_size(false); COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage); COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage); - COUNTER_SET(_parent->_peak_memory_usage_counter, _parent->_memory_used_counter->value()); } return typename HashTableType::State(_parent->_probe_columns); diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 3b5174d87c0f7f..09d0eef0f04340 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -342,7 +342,6 @@ Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized:: return status; } status = get_block(state, block, eos); - local_state->_peak_memory_usage_counter->set(local_state->_memory_used_counter->value()); return status; } @@ -441,11 +440,7 @@ PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* } PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent) - : _num_rows_returned(0), - _rows_returned_counter(nullptr), - _peak_memory_usage_counter(nullptr), - _parent(parent), - _state(state) { + : _num_rows_returned(0), _rows_returned_counter(nullptr), _parent(parent), _state(state) { _query_statistics = std::make_shared(); } @@ -484,9 +479,8 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1); _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); - _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", TUnit::BYTES, 1); - _peak_memory_usage_counter = - _runtime_profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1); + _memory_used_counter = + _runtime_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1); return Status::OK(); } @@ -519,9 +513,6 @@ Status PipelineXLocalState::close(RuntimeState* state) { if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } - if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_memory_used_counter->value()); - } _closed = true; // Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator). // We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed. @@ -560,9 +551,7 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSink _close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); info.parent_profile->add_child(_profile, true, nullptr); - _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", TUnit::BYTES, 1); - _peak_memory_usage_counter = - _profile->AddHighWaterMarkCounter("MemoryUsagePeak", TUnit::BYTES, "", 1); + _memory_used_counter = _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1); return Status::OK(); } @@ -574,9 +563,6 @@ Status PipelineXSinkLocalState::close(RuntimeState* state, Status e if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } - if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_memory_used_counter->value()); - } _closed = true; return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 6053b1a2f48e87..c84c4e7b43f981 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -165,9 +165,6 @@ class PipelineXLocalStateBase { RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } - RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() { - return _peak_memory_usage_counter; - } OperatorXBase* parent() { return _parent; } RuntimeState* state() { return _state; } vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } @@ -202,11 +199,10 @@ class PipelineXLocalStateBase { RuntimeProfile::Counter* _rows_returned_counter = nullptr; RuntimeProfile::Counter* _blocks_returned_counter = nullptr; RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr; - RuntimeProfile::Counter* _memory_used_counter = nullptr; + // Account for current memory and peak memory used by this node + RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr; RuntimeProfile::Counter* _projection_timer = nullptr; RuntimeProfile::Counter* _exec_timer = nullptr; - // Account for peak memory used by this node - RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr; RuntimeProfile::Counter* _init_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; @@ -348,9 +344,6 @@ class PipelineXSinkLocalStateBase { RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; } RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } - RuntimeProfile::HighWaterMarkCounter* peak_memory_usage_counter() { - return _peak_memory_usage_counter; - } virtual std::vector dependencies() const { return {nullptr}; } // override in exchange sink , AsyncWriterSink @@ -380,8 +373,7 @@ class PipelineXSinkLocalStateBase { RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr; RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; RuntimeProfile::Counter* _exec_timer = nullptr; - RuntimeProfile::Counter* _memory_used_counter = nullptr; - RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr; std::shared_ptr _query_statistics = nullptr; }; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 21c3103fe5a708..ae4396b22c7eec 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1037,7 +1037,6 @@ Status ScanLocalState::_init_profile() { _total_throughput_counter = profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter); _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT); - _scanner_peak_memory_usage = _peak_memory_usage_counter; //_runtime_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES); // 2. counters for scanners diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 5d41c800383bd0..4519a3ca283f6f 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -108,7 +108,6 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterCon // Max num of scanner thread RuntimeProfile::Counter* _max_scanner_thread_num = nullptr; RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr; - RuntimeProfile::HighWaterMarkCounter* _scanner_peak_memory_usage = nullptr; // time of get block from scanner RuntimeProfile::Counter* _scan_timer = nullptr; RuntimeProfile::Counter* _scan_cpu_timer = nullptr; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index faec4961af93b7..072f28723a36ea 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -128,7 +128,6 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in int64_t data_size = local_state._shared_state->sorter->data_size(); COUNTER_SET(local_state._sort_blocks_memory_usage, data_size); COUNTER_SET(local_state._memory_used_counter, data_size); - COUNTER_SET(local_state._peak_memory_usage_counter, data_size); RETURN_IF_CANCELLED(state); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 267bcc83aad92c..6e6689d4134deb 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -160,7 +160,6 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc int64_t data_size = local_state._shared_state->in_mem_shared_state->sorter->data_size(); COUNTER_SET(local_state._sort_blocks_memory_usage, data_size); COUNTER_SET(local_state._memory_used_counter, data_size); - COUNTER_SET(local_state._peak_memory_usage_counter, data_size); if (eos) { if (local_state._shared_state->is_spilled) { diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index cf5071d62e4737..1c8d2c47bc698a 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -353,7 +353,6 @@ Status StreamingAggLocalState::_merge_without_key(vectorized::Block* block) { void StreamingAggLocalState::_update_memusage_without_key() { int64_t arena_memory_usage = _agg_arena_pool->size(); COUNTER_SET(_memory_used_counter, arena_memory_usage); - COUNTER_SET(_peak_memory_usage_counter, arena_memory_usage); COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); } @@ -378,8 +377,6 @@ void StreamingAggLocalState::_update_memusage_with_serialized_key() { COUNTER_SET(_memory_used_counter, arena_memory_usage + hash_table_memory_usage); - COUNTER_SET(_peak_memory_usage_counter, - arena_memory_usage + hash_table_memory_usage); COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 9369c0c833c53c..0cb313747b0373 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -98,7 +98,6 @@ void LoadChannel::_init_profile() { _load_id.to_string(), _sender_ip, _backend_id), true, true); _add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT); - _peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES); _add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime"); _handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", "AddBatchTime"); _add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", TUnit::UNIT); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 36a8f363ba9bac..2889bcf256515b 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -91,7 +91,6 @@ class LoadChannel { std::unique_ptr _profile; RuntimeProfile* _self_profile = nullptr; RuntimeProfile::Counter* _add_batch_number_counter = nullptr; - RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; RuntimeProfile::Counter* _add_batch_timer = nullptr; RuntimeProfile::Counter* _add_batch_times = nullptr; RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr; diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index e87301880d2479..45db607a342743 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -363,28 +363,24 @@ const std::string* RuntimeProfile::get_info_string(const std::string& key) { return &it->second; } -#define ADD_COUNTER_IMPL(NAME, T) \ - RuntimeProfile::T* RuntimeProfile::NAME(const std::string& name, TUnit::type unit, \ - const std::string& parent_counter_name, \ - int64_t level) { \ - DCHECK_EQ(_is_averaged_profile, false); \ - std::lock_guard l(_counter_map_lock); \ - if (_counter_map.find(name) != _counter_map.end()) { \ - return reinterpret_cast(_counter_map[name]); \ - } \ - DCHECK(parent_counter_name == ROOT_COUNTER || \ - _counter_map.find(parent_counter_name) != _counter_map.end()); \ - T* counter = _pool->add(new T(unit, level)); \ - _counter_map[name] = counter; \ - std::set* child_counters = \ - find_or_insert(&_child_counter_map, parent_counter_name, std::set()); \ - child_counters->insert(name); \ - return counter; \ - } - -//ADD_COUNTER_IMPL(AddCounter, Counter); -ADD_COUNTER_IMPL(AddHighWaterMarkCounter, HighWaterMarkCounter); -//ADD_COUNTER_IMPL(AddConcurrentTimerCounter, ConcurrentTimerCounter); +RuntimeProfile::HighWaterMarkCounter* RuntimeProfile::AddHighWaterMarkCounter( + const std::string& name, TUnit::type unit, const std::string& parent_counter_name, + int64_t level) { + DCHECK_EQ(_is_averaged_profile, false); + std::lock_guard l(_counter_map_lock); + if (_counter_map.find(name) != _counter_map.end()) { + return reinterpret_cast(_counter_map[name]); + } + DCHECK(parent_counter_name == ROOT_COUNTER || + _counter_map.find(parent_counter_name) != _counter_map.end()); + RuntimeProfile::HighWaterMarkCounter* counter = + _pool->add(new RuntimeProfile::HighWaterMarkCounter(unit, level, parent_counter_name)); + _counter_map[name] = counter; + std::set* child_counters = + find_or_insert(&_child_counter_map, parent_counter_name, std::set()); + child_counters->insert(name); + return counter; +} std::shared_ptr RuntimeProfile::AddSharedHighWaterMarkCounter( const std::string& name, TUnit::type unit, const std::string& parent_counter_name) { @@ -395,7 +391,8 @@ std::shared_ptr RuntimeProfile::AddSharedH } DCHECK(parent_counter_name == ROOT_COUNTER || _counter_map.find(parent_counter_name) != _counter_map.end()); - std::shared_ptr counter = std::make_shared(unit); + std::shared_ptr counter = + std::make_shared(unit, 2, parent_counter_name); _shared_counter_pool[name] = counter; DCHECK(_counter_map.find(name) == _counter_map.end()) @@ -697,17 +694,14 @@ void RuntimeProfile::print_child_counters(const std::string& prefix, const CounterMap& counter_map, const ChildCounterMap& child_counter_map, std::ostream* s) { - std::ostream& stream = *s; - ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name); + auto itr = child_counter_map.find(counter_name); if (itr != child_counter_map.end()) { const std::set& child_counters = itr->second; for (const std::string& child_counter : child_counters) { - CounterMap::const_iterator iter = counter_map.find(child_counter); + auto iter = counter_map.find(child_counter); DCHECK(iter != counter_map.end()); - stream << prefix << " - " << iter->first << ": " - << PrettyPrinter::print(iter->second->value(), iter->second->type()) - << std::endl; + iter->second->pretty_print(s, prefix, iter->first); RuntimeProfile::print_child_counters(prefix + " ", child_counter, counter_map, child_counter_map, s); } diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 955d77b72aa51c..6e393ac673a628 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -39,6 +39,7 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "util/binary_cast.hpp" +#include "util/container_util.hpp" #include "util/pretty_printer.h" #include "util/stopwatch.hpp" @@ -126,6 +127,14 @@ class RuntimeProfile { tcounters.push_back(std::move(counter)); } + virtual void pretty_print(std::ostream* s, const std::string& prefix, + const std::string& name) const { + std::ostream& stream = *s; + stream << prefix << " - " << name << ": " + << PrettyPrinter::print(_value.load(std::memory_order_relaxed), type()) + << std::endl; + } + TUnit::type type() const { return _type; } virtual int64_t level() { return _level; } @@ -142,15 +151,49 @@ class RuntimeProfile { /// as value()) and the current value. class HighWaterMarkCounter : public Counter { public: - HighWaterMarkCounter(TUnit::type unit, int64_t level = 2) - : Counter(unit, 0, level), current_value_(0) {} + HighWaterMarkCounter(TUnit::type unit, int64_t level, const std::string& parent_name) + : Counter(unit, 0, level), current_value_(0), _parent_name(parent_name) {} - virtual void add(int64_t delta) { + void add(int64_t delta) { current_value_.fetch_add(delta, std::memory_order_relaxed); if (delta > 0) { UpdateMax(current_value_); } } + virtual void update(int64_t delta) override { add(delta); } + + virtual void to_thrift( + const std::string& name, std::vector& tcounters, + std::map>& child_counters_map) override { + { + TCounter counter; + counter.name = name; + counter.value = this->current_value(); + counter.type = this->type(); + counter.__set_level(this->level()); + tcounters.push_back(std::move(counter)); + } + { + TCounter counter; + std::string peak_name = name + "Peak"; + counter.name = peak_name; + counter.value = this->value(); + counter.type = this->type(); + counter.__set_level(this->level()); + tcounters.push_back(std::move(counter)); + child_counters_map[_parent_name].insert(peak_name); + } + } + + virtual void pretty_print(std::ostream* s, const std::string& prefix, + const std::string& name) const override { + std::ostream& stream = *s; + stream << prefix << " - " << name << ": " + << PrettyPrinter::print(current_value(), type()) << std::endl; + stream << prefix << " - " << name << "Peak: " + << PrettyPrinter::print(_value.load(std::memory_order_relaxed), type()) + << std::endl; + } /// Tries to increase the current value by delta. If current_value() + delta /// exceeds max, return false and current_value is not changed. @@ -194,6 +237,8 @@ class RuntimeProfile { /// The current value of the counter. _value in the super class represents /// the high water mark. std::atomic current_value_; + + const std::string _parent_name; }; using DerivedCounterFunction = std::function; @@ -561,10 +606,6 @@ class RuntimeProfile { static void print_child_counters(const std::string& prefix, const std::string& counter_name, const CounterMap& counter_map, const ChildCounterMap& child_counter_map, std::ostream* s); - - static std::string print_counter(Counter* counter) { - return PrettyPrinter::print(counter->value(), counter->type()); - } }; // Utility class to update the counter at object construction and destruction. diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index d37d26b09f7815..65812cb428a8f8 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -236,7 +236,6 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) { _scanner_memory_used_counter->set(_block_memory_usage); // A free block is reused, so the memory usage should be decreased // The caller of get_free_block will increase the memory usage - update_peak_memory_usage(-block->allocated_bytes()); } else if (_block_memory_usage < _max_bytes_in_queue || force) { _newly_create_free_blocks_num->update(1); block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0, @@ -251,9 +250,7 @@ void ScannerContext::return_free_block(vectorized::BlockUPtr block) { _block_memory_usage += block_size_to_reuse; _scanner_memory_used_counter->set(_block_memory_usage); block->clear_column_data(); - if (_free_blocks.enqueue(std::move(block))) { - update_peak_memory_usage(block_size_to_reuse); - } + _free_blocks.enqueue(std::move(block)); } } @@ -324,7 +321,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo _estimated_block_size = block_size; } _block_memory_usage -= block_size; - update_peak_memory_usage(-current_block->allocated_bytes()); // consume current block block->swap(*current_block); return_free_block(std::move(current_block)); @@ -540,8 +536,4 @@ void ScannerContext::update_peak_running_scanner(int num) { _local_state->_peak_running_scanner->add(num); } -void ScannerContext::update_peak_memory_usage(int64_t usage) { - _local_state->_scanner_peak_memory_usage->add(usage); -} - } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 82e0a06799940b..d1cf06d56686ac 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -127,8 +127,6 @@ class ScannerContext : public std::enable_shared_from_this, // Caller should make sure the pipeline task is still running when calling this function void update_peak_running_scanner(int num); - // Caller should make sure the pipeline task is still running when calling this function - void update_peak_memory_usage(int64_t usage); // Get next block from blocks queue. Called by ScanNode/ScanOperator // Set eos to true if there is no more data to read. diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 385b581d2a5725..7c5aa8db0a77af 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -283,8 +283,6 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, break; } // We got a new created block or a reused block. - ctx->update_peak_memory_usage(free_block->allocated_bytes()); - ctx->update_peak_memory_usage(-free_block->allocated_bytes()); status = scanner->get_block_after_projects(state, free_block.get(), &eos); first_read = false; if (!status.ok()) { @@ -293,7 +291,6 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, } // Projection will truncate useless columns, makes block size change. auto free_block_bytes = free_block->allocated_bytes(); - ctx->update_peak_memory_usage(free_block_bytes); raw_bytes_read += free_block_bytes; if (!scan_task->cached_blocks.empty() && scan_task->cached_blocks.back().first->rows() + free_block->rows() <= @@ -301,9 +298,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes(); vectorized::MutableBlock mutable_block( scan_task->cached_blocks.back().first.get()); - ctx->update_peak_memory_usage(-mutable_block.allocated_bytes()); status = mutable_block.merge(*free_block); - ctx->update_peak_memory_usage(mutable_block.allocated_bytes()); if (!status.ok()) { LOG(WARNING) << "Block merge failed: " << status.to_string(); break; @@ -313,7 +308,6 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, std::move(mutable_block.mutable_columns())); // Return block succeed or not, this free_block is not used by this scan task any more. - ctx->update_peak_memory_usage(-free_block_bytes); // If block can be reused, its memory usage will be added back. ctx->return_free_block(std::move(free_block)); ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() - diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index b48b9f780b8754..ddaf0e45cb8909 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -210,8 +210,6 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num *done = nullptr; } _recvr->_parent->memory_used_counter()->update(block_byte_size); - _recvr->_parent->peak_memory_usage_counter()->set( - _recvr->_parent->memory_used_counter()->value()); add_blocks_memory_usage(block_byte_size); return Status::OK(); } @@ -251,8 +249,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { try_set_dep_ready_without_lock(); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); _recvr->_parent->memory_used_counter()->update(block_mem_size); - _recvr->_parent->peak_memory_usage_counter()->set( - _recvr->_parent->memory_used_counter()->value()); add_blocks_memory_usage(block_mem_size); } }