diff --git a/src/common/arrow/src/arrow/array/binview/mutable.rs b/src/common/arrow/src/arrow/array/binview/mutable.rs index 64ef7880bcf8..38c088ec88b1 100644 --- a/src/common/arrow/src/arrow/array/binview/mutable.rs +++ b/src/common/arrow/src/arrow/array/binview/mutable.rs @@ -41,9 +41,9 @@ pub struct MutableBinaryViewArray { pub(super) validity: Option, pub(super) phantom: std::marker::PhantomData, /// Total bytes length if we would concatenate them all. - pub(super) total_bytes_len: usize, + pub total_bytes_len: usize, /// Total bytes in the buffer (excluding remaining capacity) - pub(super) total_buffer_len: usize, + pub total_buffer_len: usize, } impl Clone for MutableBinaryViewArray { diff --git a/src/query/expression/src/converts/arrow2/from.rs b/src/query/expression/src/converts/arrow2/from.rs index 0a264afce0d0..6989b95ed324 100644 --- a/src/query/expression/src/converts/arrow2/from.rs +++ b/src/query/expression/src/converts/arrow2/from.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_arrow::arrow::array::BinaryArray; +use databend_common_arrow::arrow::array::FixedSizeBinaryArray; +use databend_common_arrow::arrow::array::Utf8Array; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_arrow::arrow::datatypes::DataType as ArrowDataType; use databend_common_arrow::arrow::datatypes::Field as ArrowField; use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; use databend_common_arrow::arrow::datatypes::TimeUnit; +use databend_common_arrow::arrow::types::Offset; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -28,6 +32,7 @@ use super::ARROW_EXT_TYPE_GEOMETRY; use super::ARROW_EXT_TYPE_VARIANT; use crate::types::array::ArrayColumn; use crate::types::binary::BinaryColumn; +use crate::types::binary::BinaryColumnBuilder; use crate::types::decimal::DecimalColumn; use crate::types::geography::GeographyColumn; use crate::types::nullable::NullableColumn; @@ -348,16 +353,7 @@ impl Column { .expect( "fail to read `Binary` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Binary(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Binary(binary_array_to_binary_column(arrow_col)) } (DataType::Binary, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -366,25 +362,16 @@ impl Column { .expect( "fail to read `Binary` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Binary(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Binary(binary_array_to_binary_column(arrow_col)) } - (DataType::Binary, ArrowDataType::FixedSizeBinary(size)) => { + (DataType::Binary, ArrowDataType::FixedSizeBinary(_)) => { let arrow_col = arrow_col .as_any() .downcast_ref::() .expect( "fail to read `Binary` from arrow: array should be `FixedSizeBinaryArray`", ); - let offsets = (0..arrow_col.len() as u64 + 1) - .map(|x| x * (*size) as u64) - .collect::>(); - Column::Binary(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Binary(fixed_size_binary_array_to_binary_column(arrow_col)) } (DataType::Binary, ArrowDataType::Utf8) => { let arrow_col = arrow_col @@ -393,16 +380,7 @@ impl Column { .expect( "fail to read `Binary` from arrow: array should be `Utf8Array`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Binary(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Binary(utf8_array_to_binary_column(arrow_col)) } (DataType::Binary, ArrowDataType::LargeUtf8) => { let arrow_col = arrow_col @@ -411,10 +389,16 @@ impl Column { .expect( "fail to read `Binary` from arrow: array should be `Utf8Array`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Binary(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Binary(utf8_array_to_binary_column(arrow_col)) + } + (DataType::Binary, ArrowDataType::BinaryView) => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `String` from arrow: array should be `BinaryViewArray`", + ); + Column::Binary(BinaryColumn::new(arrow_col.clone())) } (DataType::String, ArrowDataType::Binary) => { let arrow_col = arrow_col @@ -423,14 +407,8 @@ impl Column { .expect( "fail to read `String` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - let column = StringColumn::new(arrow_col.values().clone(), offsets.into()); - Column::String(column) + let binary_col = binary_array_to_binary_column(arrow_col); + Column::String(StringColumn::try_from(binary_col)?) } (DataType::String, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -439,24 +417,18 @@ impl Column { .expect( "fail to read `String` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - let column = StringColumn::new(arrow_col.values().clone(), offsets); - Column::String(column) + let binary_col = binary_array_to_binary_column(arrow_col); + Column::String(StringColumn::try_from(binary_col)?) } - (DataType::String, ArrowDataType::FixedSizeBinary(size)) => { + (DataType::String, ArrowDataType::FixedSizeBinary(_)) => { let arrow_col = arrow_col .as_any() .downcast_ref::() .expect( "fail to read `String` from arrow: array should be `FixedSizeBinaryArray`", ); - let offsets = (0..arrow_col.len() as u64 + 1) - .map(|x| x * (*size) as u64) - .collect::>(); - let column = StringColumn::new(arrow_col.values().clone(), offsets.into()); - Column::String(column) + let binary_col = fixed_size_binary_array_to_binary_column(arrow_col); + Column::String(StringColumn::try_from(binary_col)?) } (DataType::String, ArrowDataType::Utf8) => { let arrow_col = arrow_col @@ -465,18 +437,8 @@ impl Column { .expect( "fail to read `String` from arrow: array should be `Utf8Array`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - unsafe { - Column::String(StringColumn::new_unchecked( - arrow_col.values().clone(), - offsets.into(), - )) - } + let binary_col = utf8_array_to_binary_column(arrow_col); + Column::String(StringColumn::try_from(binary_col)?) } (DataType::String, ArrowDataType::LargeUtf8) => { let arrow_col = arrow_col @@ -485,15 +447,18 @@ impl Column { .expect( "fail to read `String` from arrow: array should be `Utf8Array`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - unsafe { - Column::String(StringColumn::new_unchecked( - arrow_col.values().clone(), - offsets, - )) - } + let binary_col = utf8_array_to_binary_column(arrow_col); + Column::String(StringColumn::try_from(binary_col)?) + } + (DataType::String, ArrowDataType::Utf8View) => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `String` from arrow: array should be `Utf8ViewArray`", + ); + let binary_col = BinaryColumn::new(arrow_col.to_binview()); + Column::String(StringColumn::try_from(binary_col)?) } (DataType::Timestamp, ArrowDataType::Timestamp(uint, _)) => { let values = arrow_col @@ -534,32 +499,14 @@ impl Column { .as_any() .downcast_ref::>() .expect("fail to read from arrow: array should be `BinaryArray`"); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Variant(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Variant(binary_array_to_binary_column(arrow_col)) } (DataType::Variant, ArrowDataType::Binary) => { let arrow_col = arrow_col .as_any() .downcast_ref::>() .expect("fail to read from arrow: array should be `BinaryArray`"); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Variant(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Variant(binary_array_to_binary_column(arrow_col)) } ( DataType::Variant, @@ -571,10 +518,19 @@ impl Column { .expect( "fail to read `Variant` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Variant(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Variant(binary_array_to_binary_column(arrow_col)) + } + ( + DataType::Variant, + ArrowDataType::Extension(name, box ArrowDataType::BinaryView, None), + ) if name == ARROW_EXT_TYPE_VARIANT => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `Variant` from arrow: array should be `BinaryViewArray`", + ); + Column::Variant(BinaryColumn::new(arrow_col.clone())) } (DataType::Variant, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -583,10 +539,7 @@ impl Column { .expect( "fail to read `Variant` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Variant(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Variant(binary_array_to_binary_column(arrow_col)) } (DataType::Array(ty), ArrowDataType::List(_)) => { let values_col = arrow_col @@ -660,16 +613,7 @@ impl Column { .expect( "fail to read `Bitmap` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Bitmap(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Bitmap(binary_array_to_binary_column(arrow_col)) } ( DataType::Bitmap, @@ -681,10 +625,7 @@ impl Column { .expect( "fail to read `Bitmap` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Bitmap(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Bitmap(binary_array_to_binary_column(arrow_col)) } (DataType::Bitmap, ArrowDataType::Binary) => { let arrow_col = arrow_col @@ -693,16 +634,7 @@ impl Column { .expect( "fail to read `Bitmap` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Bitmap(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Bitmap(binary_array_to_binary_column(arrow_col)) } (DataType::Bitmap, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -711,10 +643,16 @@ impl Column { .expect( "fail to read `Bitmap` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Bitmap(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Bitmap(binary_array_to_binary_column(arrow_col)) + } + (DataType::Bitmap, ArrowDataType::BinaryView) => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `Bitmap` from arrow: array should be `BinaryViewArray`", + ); + Column::Bitmap(BinaryColumn::new(arrow_col.clone())) } ( DataType::Geometry, @@ -726,16 +664,7 @@ impl Column { .expect( "fail to read `Geometry` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Geometry(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Geometry(binary_array_to_binary_column(arrow_col)) } ( DataType::Geometry, @@ -747,10 +676,19 @@ impl Column { .expect( "fail to read `Geometry` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Geometry(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Geometry(binary_array_to_binary_column(arrow_col)) + } + ( + DataType::Geometry, + ArrowDataType::Extension(name, box ArrowDataType::BinaryView, None), + ) if name == ARROW_EXT_TYPE_GEOMETRY => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `Geometry` from arrow: array should be `BinaryViewArray`", + ); + Column::Geometry(BinaryColumn::new(arrow_col.clone())) } (DataType::Geometry, ArrowDataType::Binary) => { let arrow_col = arrow_col @@ -759,16 +697,7 @@ impl Column { .expect( "fail to read `Geometry` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Geometry(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Geometry(binary_array_to_binary_column(arrow_col)) } (DataType::Geometry, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -777,10 +706,7 @@ impl Column { .expect( "fail to read `Geometry` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Geometry(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Geometry(binary_array_to_binary_column(arrow_col)) } (DataType::Geography, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -789,13 +715,16 @@ impl Column { .expect( "fail to read `Geography` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Geography(GeographyColumn(BinaryColumn::new( - arrow_col.values().clone(), - offsets, - ))) + Column::Geography(GeographyColumn(binary_array_to_binary_column(arrow_col))) + } + (DataType::Geography, ArrowDataType::BinaryView) => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `Geography` from arrow: array should be `BinaryViewArray`", + ); + Column::Geography(GeographyColumn(BinaryColumn::new(arrow_col.clone()))) } (data_type, ArrowDataType::Extension(_, arrow_type, _)) => { from_arrow_with_arrow_type(arrow_col, arrow_type, data_type)? @@ -820,3 +749,30 @@ impl Column { from_arrow_with_arrow_type(arrow_col, arrow_col.data_type(), data_type) } } + +fn binary_array_to_binary_column(array: &BinaryArray) -> BinaryColumn { + let mut builder = BinaryColumnBuilder::with_capacity(array.len(), array.values().len()); + for value in array.values_iter() { + builder.put_slice(value); + builder.commit_row(); + } + builder.build() +} + +fn utf8_array_to_binary_column(array: &Utf8Array) -> BinaryColumn { + let mut builder = BinaryColumnBuilder::with_capacity(array.len(), array.values().len()); + for value in array.values_iter() { + builder.put_str(value); + builder.commit_row(); + } + builder.build() +} + +fn fixed_size_binary_array_to_binary_column(array: &FixedSizeBinaryArray) -> BinaryColumn { + let mut builder = BinaryColumnBuilder::with_capacity(array.len(), array.values().len()); + for value in array.values_iter() { + builder.put_slice(value); + builder.commit_row(); + } + builder.build() +} diff --git a/src/query/expression/src/converts/arrow2/to.rs b/src/query/expression/src/converts/arrow2/to.rs index 5d01b76fe873..e154406aae57 100644 --- a/src/query/expression/src/converts/arrow2/to.rs +++ b/src/query/expression/src/converts/arrow2/to.rs @@ -92,8 +92,8 @@ fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { None, ), TableDataType::Boolean => ArrowDataType::Boolean, - TableDataType::Binary => ArrowDataType::LargeBinary, - TableDataType::String => ArrowDataType::LargeUtf8, + TableDataType::Binary => ArrowDataType::BinaryView, + TableDataType::String => ArrowDataType::Utf8View, TableDataType::Number(ty) => with_number_type!(|TYPE| match ty { NumberDataType::TYPE => ArrowDataType::TYPE, }), @@ -135,7 +135,7 @@ fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { } TableDataType::Bitmap => ArrowDataType::Extension( ARROW_EXT_TYPE_BITMAP.to_string(), - Box::new(ArrowDataType::LargeBinary), + Box::new(ArrowDataType::BinaryView), None, ), TableDataType::Tuple { @@ -157,17 +157,17 @@ fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { } TableDataType::Variant => ArrowDataType::Extension( ARROW_EXT_TYPE_VARIANT.to_string(), - Box::new(ArrowDataType::LargeBinary), + Box::new(ArrowDataType::BinaryView), None, ), TableDataType::Geometry => ArrowDataType::Extension( ARROW_EXT_TYPE_GEOMETRY.to_string(), - Box::new(ArrowDataType::LargeBinary), + Box::new(ArrowDataType::BinaryView), None, ), TableDataType::Geography => ArrowDataType::Extension( ARROW_EXT_TYPE_GEOGRAPHY.to_string(), - Box::new(ArrowDataType::LargeBinary), + Box::new(ArrowDataType::BinaryView), None, ), } @@ -304,32 +304,10 @@ impl Column { ) .unwrap(), ), - Column::Binary(col) => { - let offsets: Buffer = - col.offsets().iter().map(|offset| *offset as i64).collect(); - Box::new( - databend_common_arrow::arrow::array::BinaryArray::::try_new( - arrow_type, - unsafe { OffsetsBuffer::new_unchecked(offsets) }, - col.data().clone(), - None, - ) - .unwrap(), - ) - } - Column::String(col) => { - let offsets: Buffer = - col.offsets().iter().map(|offset| *offset as i64).collect(); - Box::new( - databend_common_arrow::arrow::array::Utf8Array::::try_new( - arrow_type, - unsafe { OffsetsBuffer::new_unchecked(offsets) }, - col.data().clone(), - None, - ) - .unwrap(), - ) - } + Column::Binary(col) => Box::new(col.clone().into_inner()), + Column::String(col) => unsafe { + Box::new(col.clone().into_inner().to_utf8view_unchecked()) + }, Column::Timestamp(col) => Box::new( databend_common_arrow::arrow::array::PrimitiveArray::::try_new( arrow_type, @@ -401,19 +379,7 @@ impl Column { Column::Bitmap(col) | Column::Variant(col) | Column::Geometry(col) - | Column::Geography(GeographyColumn(col)) => { - let offsets: Buffer = - col.offsets().iter().map(|offset| *offset as i64).collect(); - Box::new( - databend_common_arrow::arrow::array::BinaryArray::::try_new( - arrow_type, - unsafe { OffsetsBuffer::new_unchecked(offsets) }, - col.data().clone(), - None, - ) - .unwrap(), - ) - } + | Column::Geography(GeographyColumn(col)) => Box::new(col.clone().into_inner()), } } } diff --git a/src/query/expression/src/filter/filter_executor.rs b/src/query/expression/src/filter/filter_executor.rs index 007f6d319e9c..495ff1f80cc1 100644 --- a/src/query/expression/src/filter/filter_executor.rs +++ b/src/query/expression/src/filter/filter_executor.rs @@ -125,7 +125,7 @@ impl FilterExecutor { if self.keep_order && self.has_or { self.true_selection[0..result_count].sort(); } - data_block.take(&self.true_selection[0..result_count], &mut None) + data_block.take(&self.true_selection[0..result_count]) } } diff --git a/src/query/expression/src/filter/select_value/select_column_scalar.rs b/src/query/expression/src/filter/select_value/select_column_scalar.rs index 3d36884cde64..8c2bad57e108 100644 --- a/src/query/expression/src/filter/select_value/select_column_scalar.rs +++ b/src/query/expression/src/filter/select_value/select_column_scalar.rs @@ -239,48 +239,19 @@ impl<'a> Selector<'a> { Some(validity) => { // search the whole string buffer if let LikePattern::SurroundByPercent(searcher) = like_pattern { - let needle = searcher.needle(); - let needle_byte_len = needle.len(); - let data = column.data().as_slice(); - let offsets = column.offsets().as_slice(); - let mut idx = 0; - let mut pos = (*offsets.first().unwrap()) as usize; - let end = (*offsets.last().unwrap()) as usize; - - while pos < end && idx < count { - if let Some(p) = searcher.search(&data[pos..end]) { - while offsets[idx + 1] as usize <= pos + p { - let ret = NOT && validity.get_bit_unchecked(idx); - update_index( - ret, - idx as u32, - true_selection, - false_selection, - ); - idx += 1; - } - - // check if the substring is in bound - let ret = - pos + p + needle_byte_len <= offsets[idx + 1] as usize; - - let ret = if NOT { - validity.get_bit_unchecked(idx) && !ret - } else { - validity.get_bit_unchecked(idx) && ret - }; - update_index(ret, idx as u32, true_selection, false_selection); - - pos = offsets[idx + 1] as usize; - idx += 1; + for idx in 0u32..count as u32 { + let ret = if NOT { + validity.get_bit_unchecked(idx as usize) + && searcher + .search(column.index_unchecked_bytes(idx as usize)) + .is_none() } else { - break; - } - } - while idx < count { - let ret = NOT && validity.get_bit_unchecked(idx); - update_index(ret, idx as u32, true_selection, false_selection); - idx += 1; + validity.get_bit_unchecked(idx as usize) + && searcher + .search(column.index_unchecked_bytes(idx as usize)) + .is_some() + }; + update_index(ret, idx, true_selection, false_selection); } } else { for idx in 0u32..count as u32 { @@ -300,40 +271,17 @@ impl<'a> Selector<'a> { None => { // search the whole string buffer if let LikePattern::SurroundByPercent(searcher) = like_pattern { - let needle = searcher.needle(); - let needle_byte_len = needle.len(); - let data = column.data().as_slice(); - let offsets = column.offsets().as_slice(); - let mut idx = 0; - let mut pos = (*offsets.first().unwrap()) as usize; - let end = (*offsets.last().unwrap()) as usize; - - while pos < end && idx < count { - if let Some(p) = searcher.search(&data[pos..end]) { - while offsets[idx + 1] as usize <= pos + p { - update_index( - NOT, - idx as u32, - true_selection, - false_selection, - ); - idx += 1; - } - // check if the substring is in bound - let ret = - pos + p + needle_byte_len <= offsets[idx + 1] as usize; - let ret = if NOT { !ret } else { ret }; - update_index(ret, idx as u32, true_selection, false_selection); - - pos = offsets[idx + 1] as usize; - idx += 1; + for idx in 0u32..count as u32 { + let ret = if NOT { + searcher + .search(column.index_unchecked_bytes(idx as usize)) + .is_none() } else { - break; - } - } - while idx < count { - update_index(NOT, idx as u32, true_selection, false_selection); - idx += 1; + searcher + .search(column.index_unchecked_bytes(idx as usize)) + .is_some() + }; + update_index(ret, idx, true_selection, false_selection); } } else { for idx in 0u32..count as u32 { diff --git a/src/query/expression/src/kernels/concat.rs b/src/query/expression/src/kernels/concat.rs index 957fb93e6180..11fd9f26abca 100644 --- a/src/query/expression/src/kernels/concat.rs +++ b/src/query/expression/src/kernels/concat.rs @@ -23,11 +23,11 @@ use itertools::Itertools; use crate::copy_continuous_bits; use crate::kernels::take::BIT_MASK; -use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; use crate::store_advance_aligned; use crate::types::array::ArrayColumnBuilder; use crate::types::binary::BinaryColumn; +use crate::types::binary::BinaryColumnBuilder; use crate::types::decimal::DecimalColumn; use crate::types::geography::GeographyColumn; use crate::types::geometry::GeometryType; @@ -387,37 +387,11 @@ impl Column { cols: impl Iterator + Clone, num_rows: usize, ) -> BinaryColumn { - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - let mut data_size = 0; - - // Build [`offset`] and calculate `data_size` required by [`data`]. - offsets.push(0); - for col in cols.clone() { - let mut start = col.offsets()[0]; - for end in col.offsets()[1..].iter() { - data_size += end - start; - start = *end; - offsets.push(data_size); - } - } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for col in cols { - let offsets = col.offsets(); - let col_data = &(col.data().as_slice()) - [offsets[0] as usize..offsets[offsets.len() - 1] as usize]; - copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len()); - } - set_vec_len_by_ptr(&mut data, data_ptr); + let mut builder = BinaryColumnBuilder::with_capacity(num_rows, 0); + for col in cols { + builder.append_column(&col); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } pub fn concat_string_types( diff --git a/src/query/expression/src/kernels/filter.rs b/src/query/expression/src/kernels/filter.rs index 1b730323a24f..aee664480123 100644 --- a/src/query/expression/src/kernels/filter.rs +++ b/src/query/expression/src/kernels/filter.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use binary::BinaryColumnBuilder; use databend_common_arrow::arrow::bitmap::utils::BitChunkIterExact; use databend_common_arrow::arrow::bitmap::utils::BitChunksExact; use databend_common_arrow::arrow::bitmap::Bitmap; @@ -519,124 +520,15 @@ impl<'a> FilterVisitor<'a> { } fn filter_binary_types(&mut self, values: &BinaryColumn) -> BinaryColumn { - // Each element of `items` is (string pointer(u64), string length). - let mut items: Vec<(u64, usize)> = Vec::with_capacity(self.num_rows); - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let values_offset = values.offsets().as_slice(); - let values_data_ptr = values.data().as_slice().as_ptr(); - let mut offsets: Vec = Vec::with_capacity(self.num_rows + 1); - let mut offsets_ptr = offsets.as_mut_ptr(); - let mut items_ptr = items.as_mut_ptr(); - let mut data_size = 0; - - // Build [`offset`] and calculate `data_size` required by [`data`]. - unsafe { - store_advance_aligned::(0, &mut offsets_ptr); - let mut idx = 0; - let (mut slice, offset, mut length) = self.filter.as_slice(); - if offset > 0 { - let mut mask = slice[0]; - while mask != 0 { - let n = mask.trailing_zeros() as usize; - // If `offset` > 0, the valid bits of this byte start at `offset`, we also - // need to ensure that we cannot iterate more than `length` bits. - if n >= offset && n < offset + length { - let start = *values_offset.get_unchecked(n - offset) as usize; - let len = *values_offset.get_unchecked(n - offset + 1) as usize - start; - data_size += len as u64; - store_advance_aligned(data_size, &mut offsets_ptr); - store_advance_aligned( - (values_data_ptr.add(start) as u64, len), - &mut items_ptr, - ); - } - mask = mask & (mask - 1); + let mut builder = BinaryColumnBuilder::with_capacity(self.num_rows, 0); + for i in 0..self.num_rows { + if self.filter.get_bit(i) { + unsafe { + builder.put_slice(values.index_unchecked(i)); + builder.commit_row(); } - let bits_to_align = 8 - offset; - length = if length >= bits_to_align { - length - bits_to_align - } else { - 0 - }; - slice = &slice[1..]; - idx += bits_to_align; } - - const CHUNK_SIZE: usize = 64; - let mut mask_chunks = BitChunksExact::::new(slice, length); - let mut continuous_selected = 0; - for mut mask in mask_chunks.by_ref() { - if mask == u64::MAX { - continuous_selected += CHUNK_SIZE; - } else { - if continuous_selected > 0 { - let start = *values_offset.get_unchecked(idx) as usize; - let len = *values_offset.get_unchecked(idx + continuous_selected) as usize - - start; - store_advance_aligned( - (values_data_ptr.add(start) as u64, len), - &mut items_ptr, - ); - for i in 0..continuous_selected { - data_size += *values_offset.get_unchecked(idx + i + 1) - - *values_offset.get_unchecked(idx + i); - store_advance_aligned(data_size, &mut offsets_ptr); - } - idx += continuous_selected; - continuous_selected = 0; - } - while mask != 0 { - let n = mask.trailing_zeros() as usize; - let start = *values_offset.get_unchecked(idx + n) as usize; - let len = *values_offset.get_unchecked(idx + n + 1) as usize - start; - data_size += len as u64; - store_advance_aligned( - (values_data_ptr.add(start) as u64, len), - &mut items_ptr, - ); - store_advance_aligned(data_size, &mut offsets_ptr); - mask = mask & (mask - 1); - } - idx += CHUNK_SIZE; - } - } - if continuous_selected > 0 { - let start = *values_offset.get_unchecked(idx) as usize; - let len = *values_offset.get_unchecked(idx + continuous_selected) as usize - start; - store_advance_aligned((values_data_ptr.add(start) as u64, len), &mut items_ptr); - for i in 0..continuous_selected { - data_size += *values_offset.get_unchecked(idx + i + 1) - - *values_offset.get_unchecked(idx + i); - store_advance_aligned(data_size, &mut offsets_ptr); - } - idx += continuous_selected; - } - - for (i, is_selected) in mask_chunks.remainder_iter().enumerate() { - if is_selected { - let start = *values_offset.get_unchecked(idx + i) as usize; - let len = *values_offset.get_unchecked(idx + i + 1) as usize - start; - data_size += len as u64; - store_advance_aligned((values_data_ptr.add(start) as u64, len), &mut items_ptr); - store_advance_aligned(data_size, &mut offsets_ptr); - } - } - set_vec_len_by_ptr(&mut items, items_ptr); - set_vec_len_by_ptr(&mut offsets, offsets_ptr); } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for (str_ptr, len) in items.iter() { - copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); - } - set_vec_len_by_ptr(&mut data, data_ptr); - } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } } diff --git a/src/query/expression/src/kernels/group_by_hash/method_serializer.rs b/src/query/expression/src/kernels/group_by_hash/method_serializer.rs index 4cb277bf416c..d90b1e700aa9 100644 --- a/src/query/expression/src/kernels/group_by_hash/method_serializer.rs +++ b/src/query/expression/src/kernels/group_by_hash/method_serializer.rs @@ -62,8 +62,8 @@ impl HashMethod for HashMethodSerializer { ) -> Result>> { match keys_state { KeysState::Column(Column::Binary(col)) => { - let (data, offsets) = col.into_buffer(); - Ok(Box::new(BinaryKeyAccessor::new(data, offsets))) + let data = col.into_inner(); + Ok(Box::new(BinaryKeyAccessor::new(data))) } _ => unreachable!(), } diff --git a/src/query/expression/src/kernels/group_by_hash/method_single_string.rs b/src/query/expression/src/kernels/group_by_hash/method_single_string.rs index 3c77a7bd58af..ac3f5192f18c 100644 --- a/src/query/expression/src/kernels/group_by_hash/method_single_string.rs +++ b/src/query/expression/src/kernels/group_by_hash/method_single_string.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_arrow::arrow::buffer::Buffer; +use databend_common_arrow::arrow::array::BinaryViewArray; use databend_common_exception::Result; use databend_common_hashtable::hash_join_fast_string_hash; @@ -57,12 +57,12 @@ impl HashMethod for HashMethodSingleBinary { KeysState::Column(Column::Binary(col)) | KeysState::Column(Column::Variant(col)) | KeysState::Column(Column::Bitmap(col)) => { - let (data, offsets) = col.into_buffer(); - Ok(Box::new(BinaryKeyAccessor::new(data, offsets))) + let data = col.into_inner(); + Ok(Box::new(BinaryKeyAccessor::new(data))) } KeysState::Column(Column::String(col)) => { - let (data, offsets) = col.into_buffer(); - Ok(Box::new(BinaryKeyAccessor::new(data, offsets))) + let data = col.into_inner(); + Ok(Box::new(BinaryKeyAccessor::new(data))) } _ => unreachable!(), } @@ -84,13 +84,12 @@ impl HashMethod for HashMethodSingleBinary { } pub struct BinaryKeyAccessor { - data: Buffer, - offsets: Buffer, + data: BinaryViewArray, } impl BinaryKeyAccessor { - pub fn new(data: Buffer, offsets: Buffer) -> Self { - Self { data, offsets } + pub fn new(data: BinaryViewArray) -> Self { + Self { data } } } @@ -100,9 +99,7 @@ impl KeyAccessor for BinaryKeyAccessor { /// # Safety /// Calling this method with an out-of-bounds index is *[undefined behavior]*. unsafe fn key_unchecked(&self, index: usize) -> &Self::Key { - debug_assert!(index + 1 < self.offsets.len()); - - &self.data[*self.offsets.get_unchecked(index) as usize - ..*self.offsets.get_unchecked(index + 1) as usize] + debug_assert!(index < self.data.len()); + self.data.value_unchecked(index) } } diff --git a/src/query/expression/src/kernels/group_by_hash/utils.rs b/src/query/expression/src/kernels/group_by_hash/utils.rs index 7bd7f6240614..9de74a45a73d 100644 --- a/src/query/expression/src/kernels/group_by_hash/utils.rs +++ b/src/query/expression/src/kernels/group_by_hash/utils.rs @@ -15,10 +15,9 @@ use ethnum::i256; use crate::kernels::utils::copy_advance_aligned; -use crate::kernels::utils::set_vec_len_by_ptr; use crate::kernels::utils::store_advance; -use crate::kernels::utils::store_advance_aligned; use crate::types::binary::BinaryColumn; +use crate::types::binary::BinaryColumnBuilder; use crate::types::decimal::DecimalColumn; use crate::types::NumberColumn; use crate::with_decimal_mapped_type; @@ -32,29 +31,15 @@ pub fn serialize_group_columns( num_rows: usize, serialize_size: usize, ) -> BinaryColumn { - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let mut data: Vec = Vec::with_capacity(serialize_size); - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - let mut data_ptr = data.as_mut_ptr(); - let mut offsets_ptr = offsets.as_mut_ptr(); - let mut offset = 0; - - unsafe { - store_advance_aligned::(0, &mut offsets_ptr); - for i in 0..num_rows { - let old_ptr = data_ptr; - for col in columns.iter() { - serialize_column_binary(col, i, &mut data_ptr); + let mut builder = BinaryColumnBuilder::with_capacity(num_rows, serialize_size); + for i in 0..num_rows { + for col in columns.iter() { + unsafe { + serialize_column_binary(col, i, &mut builder.data.as_mut_ptr()); } - offset += data_ptr as u64 - old_ptr as u64; - store_advance_aligned::(offset, &mut offsets_ptr); } - set_vec_len_by_ptr(&mut data, data_ptr); - set_vec_len_by_ptr(&mut offsets, offsets_ptr); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } /// This function must be consistent with the `push_binary` function of `src/query/expression/src/values.rs`. diff --git a/src/query/expression/src/kernels/scatter.rs b/src/query/expression/src/kernels/scatter.rs index 11d4ffbea53f..2edb5353eb00 100644 --- a/src/query/expression/src/kernels/scatter.rs +++ b/src/query/expression/src/kernels/scatter.rs @@ -33,24 +33,9 @@ impl DataBlock { let scatter_indices = Self::divide_indices_by_scatter_size(indices, scatter_size); - let has_string_column = self - .columns() - .iter() - .any(|col| col.data_type.is_string_column()); - let mut string_items_buf = if has_string_column { - let max_num_rows = scatter_indices - .iter() - .map(|indices| indices.len()) - .max() - .unwrap(); - Some(vec![(0, 0); max_num_rows]) - } else { - None - }; - let mut results = Vec::with_capacity(scatter_size); for indices in scatter_indices.iter().take(scatter_size) { - let block = self.take(indices, &mut string_items_buf)?; + let block = self.take(indices)?; results.push(block); } diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index 9c4c478834f4..ef6c51fd6fdd 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -115,7 +115,7 @@ impl DataBlock { } let permutations = sort_compare.take_permutation(); - DataBlock::take(block, &permutations, &mut None) + DataBlock::take(block, &permutations) } } diff --git a/src/query/expression/src/kernels/take.rs b/src/query/expression/src/kernels/take.rs index 84d120b0390c..2021d8edb80a 100644 --- a/src/query/expression/src/kernels/take.rs +++ b/src/query/expression/src/kernels/take.rs @@ -14,12 +14,11 @@ use std::sync::Arc; +use binary::BinaryColumnBuilder; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; -use crate::kernels::utils::copy_advance_aligned; -use crate::kernels::utils::set_vec_len_by_ptr; use crate::types::binary::BinaryColumn; use crate::types::nullable::NullableColumn; use crate::types::string::StringColumn; @@ -33,19 +32,13 @@ use crate::Value; pub const BIT_MASK: [u8; 8] = [1, 2, 4, 8, 16, 32, 64, 128]; impl DataBlock { - pub fn take( - &self, - indices: &[I], - string_items_buf: &mut Option>, - ) -> Result - where - I: databend_common_arrow::arrow::types::Index, - { + pub fn take(&self, indices: &[I]) -> Result + where I: databend_common_arrow::arrow::types::Index { if indices.is_empty() { return Ok(self.slice(0..0)); } - let mut taker = TakeVisitor::new(indices, string_items_buf); + let mut taker = TakeVisitor::new(indices); let after_columns = self .columns() @@ -72,17 +65,15 @@ struct TakeVisitor<'a, I> where I: databend_common_arrow::arrow::types::Index { indices: &'a [I], - string_items_buf: &'a mut Option>, result: Option>, } impl<'a, I> TakeVisitor<'a, I> where I: databend_common_arrow::arrow::types::Index { - fn new(indices: &'a [I], string_items_buf: &'a mut Option>) -> Self { + fn new(indices: &'a [I]) -> Self { Self { indices, - string_items_buf, result: None, } } @@ -243,50 +234,13 @@ where I: databend_common_arrow::arrow::types::Index fn take_binary_types(&mut self, col: &BinaryColumn) -> BinaryColumn { let num_rows = self.indices.len(); - - // Each element of `items` is (string pointer(u64), string length), if `string_items_buf` - // can be reused, we will not re-allocate memory. - let mut items: Option> = match &self.string_items_buf { - Some(string_items_buf) if string_items_buf.capacity() >= num_rows => None, - _ => Some(Vec::with_capacity(num_rows)), - }; - let items = match items.is_some() { - true => items.as_mut().unwrap(), - false => self.string_items_buf.as_mut().unwrap(), - }; - - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let col_offset = col.offsets().as_slice(); - let col_data_ptr = col.data().as_slice().as_ptr(); - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - let mut data_size = 0; - - // Build [`offset`] and calculate `data_size` required by [`data`]. - unsafe { - items.set_len(num_rows); - offsets.set_len(num_rows + 1); - *offsets.get_unchecked_mut(0) = 0; - for (i, index) in self.indices.iter().enumerate() { - let start = *col_offset.get_unchecked(index.to_usize()) as usize; - let len = *col_offset.get_unchecked(index.to_usize() + 1) as usize - start; - data_size += len as u64; - *items.get_unchecked_mut(i) = (col_data_ptr.add(start) as u64, len); - *offsets.get_unchecked_mut(i + 1) = data_size; - } - } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for (str_ptr, len) in items.iter() { - copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); + let mut builder = BinaryColumnBuilder::with_capacity(num_rows, 0); + for index in self.indices.iter() { + unsafe { + builder.put_slice(col.index_unchecked(index.to_usize())); + builder.commit_row(); } - set_vec_len_by_ptr(&mut data, data_ptr); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } } diff --git a/src/query/expression/src/kernels/take_chunks.rs b/src/query/expression/src/kernels/take_chunks.rs index 34b0d2598fb9..343b577add02 100644 --- a/src/query/expression/src/kernels/take_chunks.rs +++ b/src/query/expression/src/kernels/take_chunks.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use binary::BinaryColumnBuilder; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_arrow::arrow::compute::merge_sort::MergeSlice; @@ -21,8 +22,6 @@ use databend_common_hashtable::RowPtr; use itertools::Itertools; use crate::kernels::take::BIT_MASK; -use crate::kernels::utils::copy_advance_aligned; -use crate::kernels::utils::set_vec_len_by_ptr; use crate::types::array::ArrayColumnBuilder; use crate::types::binary::BinaryColumn; use crate::types::bitmap::BitmapType; @@ -111,7 +110,6 @@ impl DataBlock { build_columns_data_type: &[DataType], indices: &[RowPtr], result_size: usize, - binary_items_buf: &mut Option>, ) -> Self { let num_columns = build_columns.len(); let result_columns = (0..num_columns) @@ -122,7 +120,6 @@ impl DataBlock { data_type.clone(), indices, result_size, - binary_items_buf, ); BlockEntry::new(data_type.clone(), Value::Column(column)) }) @@ -631,7 +628,6 @@ impl Column { data_type: DataType, indices: &[RowPtr], result_size: usize, - binary_items_buf: &mut Option>, ) -> Column { match &columns { ColumnVec::Null { .. } => Column::Null { len: result_size }, @@ -655,12 +651,12 @@ impl Column { ColumnVec::Boolean(columns) => { Column::Boolean(Self::take_block_vec_boolean_types(columns, indices)) } - ColumnVec::Binary(columns) => BinaryType::upcast_column( - Self::take_block_vec_binary_types(columns, indices, binary_items_buf.as_mut()), - ), - ColumnVec::String(columns) => StringType::upcast_column( - Self::take_block_vec_string_types(columns, indices, binary_items_buf.as_mut()), - ), + ColumnVec::Binary(columns) => { + BinaryType::upcast_column(Self::take_block_vec_binary_types(columns, indices)) + } + ColumnVec::String(columns) => { + StringType::upcast_column(Self::take_block_vec_string_types(columns, indices)) + } ColumnVec::Timestamp(columns) => { let builder = Self::take_block_vec_primitive_types(columns, indices); let ts = >::upcast_column(>::column_from_vec( @@ -713,9 +709,9 @@ impl Column { columns, builder, indices, ) } - ColumnVec::Bitmap(columns) => BitmapType::upcast_column( - Self::take_block_vec_binary_types(columns, indices, binary_items_buf.as_mut()), - ), + ColumnVec::Bitmap(columns) => { + BitmapType::upcast_column(Self::take_block_vec_binary_types(columns, indices)) + } ColumnVec::Nullable(columns) => { let inner_data_type = data_type.as_nullable().unwrap(); let inner_column = Self::take_column_vec_indices( @@ -723,7 +719,6 @@ impl Column { *inner_data_type.clone(), indices, result_size, - binary_items_buf, ); let inner_bitmap = Self::take_column_vec_indices( @@ -731,7 +726,6 @@ impl Column { DataType::Boolean, indices, result_size, - binary_items_buf, ); NullableColumn::new_column( @@ -750,25 +744,22 @@ impl Column { ty.clone(), indices, result_size, - binary_items_buf, ) }) .collect(); Column::Tuple(fields) } - ColumnVec::Variant(columns) => VariantType::upcast_column( - Self::take_block_vec_binary_types(columns, indices, binary_items_buf.as_mut()), - ), - ColumnVec::Geometry(columns) => GeometryType::upcast_column( - Self::take_block_vec_binary_types(columns, indices, binary_items_buf.as_mut()), - ), + ColumnVec::Variant(columns) => { + VariantType::upcast_column(Self::take_block_vec_binary_types(columns, indices)) + } + ColumnVec::Geometry(columns) => { + GeometryType::upcast_column(Self::take_block_vec_binary_types(columns, indices)) + } ColumnVec::Geography(columns) => { let columns = columns.iter().map(|x| x.0.clone()).collect::>(); GeographyType::upcast_column(GeographyColumn(Self::take_block_vec_binary_types( - &columns, - indices, - binary_items_buf.as_mut(), + &columns, indices, ))) } } @@ -784,62 +775,20 @@ impl Column { builder } - pub fn take_block_vec_binary_types( - col: &[BinaryColumn], - indices: &[RowPtr], - binary_items_buf: Option<&mut Vec<(u64, usize)>>, - ) -> BinaryColumn { - let num_rows = indices.len(); - - // Each element of `items` is (string pointer(u64), string length), if `binary_items_buf` - // can be reused, we will not re-allocate memory. - let mut items: Option> = match &binary_items_buf { - Some(binary_items_buf) if binary_items_buf.capacity() >= num_rows => None, - _ => Some(Vec::with_capacity(num_rows)), - }; - let items = match items.is_some() { - true => items.as_mut().unwrap(), - false => binary_items_buf.unwrap(), - }; - - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - let mut data_size = 0; - - // Build [`offset`] and calculate `data_size` required by [`data`]. - unsafe { - items.set_len(num_rows); - offsets.set_len(num_rows + 1); - *offsets.get_unchecked_mut(0) = 0; - for (i, row_ptr) in indices.iter().enumerate() { - let item = - col[row_ptr.chunk_index as usize].index_unchecked(row_ptr.row_index as usize); - data_size += item.len() as u64; - *items.get_unchecked_mut(i) = (item.as_ptr() as u64, item.len()); - *offsets.get_unchecked_mut(i + 1) = data_size; - } - } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for (str_ptr, len) in items.iter() { - copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); + pub fn take_block_vec_binary_types(col: &[BinaryColumn], indices: &[RowPtr]) -> BinaryColumn { + let mut builder = BinaryColumnBuilder::with_capacity(indices.len(), 0); + for row_ptr in indices { + unsafe { + builder.put_slice( + col[row_ptr.chunk_index as usize].index_unchecked(row_ptr.row_index as usize), + ); + builder.commit_row(); } - set_vec_len_by_ptr(&mut data, data_ptr); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } - pub fn take_block_vec_string_types( - cols: &[StringColumn], - indices: &[RowPtr], - binary_items_buf: Option<&mut Vec<(u64, usize)>>, - ) -> StringColumn { + pub fn take_block_vec_string_types(cols: &[StringColumn], indices: &[RowPtr]) -> StringColumn { let binary_cols = cols .iter() .map(|col| col.clone().into()) @@ -848,7 +797,6 @@ impl Column { StringColumn::from_binary_unchecked(Self::take_block_vec_binary_types( &binary_cols, indices, - binary_items_buf, )) } } diff --git a/src/query/expression/src/kernels/take_compact.rs b/src/query/expression/src/kernels/take_compact.rs index a2f97b894956..1ed81cd76a28 100644 --- a/src/query/expression/src/kernels/take_compact.rs +++ b/src/query/expression/src/kernels/take_compact.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use binary::BinaryColumnBuilder; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; -use crate::kernels::utils::store_advance_aligned; use crate::types::binary::BinaryColumn; use crate::types::nullable::NullableColumn; use crate::types::string::StringColumn; @@ -218,68 +218,16 @@ impl<'a> TakeCompactVisitor<'a> { } fn take_binary_types(&mut self, col: &BinaryColumn) -> BinaryColumn { - // Each element of `items` is (string(&[u8]), repeat times). - let mut items = Vec::with_capacity(self.indices.len()); - let mut items_ptr = items.as_mut_ptr(); - - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let mut offsets = Vec::with_capacity(self.num_rows + 1); - let mut offsets_ptr = offsets.as_mut_ptr(); - let mut data_size = 0; - - // Build [`offset`] and calculate `data_size` required by [`data`]. - unsafe { - store_advance_aligned::(0, &mut offsets_ptr); - for (index, cnt) in self.indices.iter() { - let item = col.index_unchecked(*index as usize); - store_advance_aligned((item, *cnt), &mut items_ptr); - for _ in 0..*cnt { - data_size += item.len() as u64; - store_advance_aligned(data_size, &mut offsets_ptr); - } - } - set_vec_len_by_ptr(&mut offsets, offsets_ptr); - set_vec_len_by_ptr(&mut items, items_ptr); - } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - let mut remain; - - unsafe { - for (item, cnt) in items { - let len = item.len(); - if cnt == 1 { - copy_advance_aligned(item.as_ptr(), &mut data_ptr, len); - continue; - } - - // Using the doubling method to copy the max segment memory. - // [___________] => [x__________] => [xx_________] => [xxxx_______] => [xxxxxxxx___] - // Since cnt > 0, then 31 - cnt.leading_zeros() >= 0. - let max_bit_num = 1 << (31 - cnt.leading_zeros()); - let max_segment = max_bit_num * len; - let base_data_ptr = data_ptr; - copy_advance_aligned(item.as_ptr(), &mut data_ptr, len); - let mut cur_segment = len; - while cur_segment < max_segment { - copy_advance_aligned(base_data_ptr, &mut data_ptr, cur_segment); - cur_segment <<= 1; - } - - // Copy the remaining memory directly. - // [xxxxxxxxxx____] => [xxxxxxxxxxxxxx] - // ^^^^ ---> ^^^^ - remain = cnt as usize - max_bit_num; - if remain > 0 { - copy_advance_aligned(base_data_ptr, &mut data_ptr, remain * len); + let num_rows = self.num_rows; + let mut builder = BinaryColumnBuilder::with_capacity(num_rows, 0); + for (index, cnt) in self.indices.iter() { + for _ in 0..*cnt { + unsafe { + builder.put_slice(col.index_unchecked(*index as usize)); + builder.commit_row(); } } - set_vec_len_by_ptr(&mut data, data_ptr); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } } diff --git a/src/query/expression/src/kernels/take_ranges.rs b/src/query/expression/src/kernels/take_ranges.rs index 71fbd0acc1af..7a5c769fb954 100644 --- a/src/query/expression/src/kernels/take_ranges.rs +++ b/src/query/expression/src/kernels/take_ranges.rs @@ -15,13 +15,13 @@ use core::ops::Range; use std::sync::Arc; +use binary::BinaryColumnBuilder; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; use crate::copy_continuous_bits; use crate::kernels::take::BIT_MASK; -use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; use crate::kernels::utils::store_advance_aligned; use crate::types::binary::BinaryColumn; @@ -249,36 +249,14 @@ impl<'a> TakeRangeVisitor<'a> { } fn take_binary_types(&mut self, values: &BinaryColumn) -> BinaryColumn { - let mut offsets: Vec = Vec::with_capacity(self.num_rows + 1); - let mut data_size = 0; - - let value_data = values.data().as_slice(); - let values_offset = values.offsets().as_slice(); - // Build [`offset`] and calculate `data_size` required by [`data`]. - offsets.push(0); + let mut builder = BinaryColumnBuilder::with_capacity(self.num_rows, 0); for range in self.ranges { - let mut offset_start = values_offset[range.start as usize]; - for offset_end in values_offset[range.start as usize + 1..range.end as usize + 1].iter() - { - data_size += offset_end - offset_start; - offset_start = *offset_end; - offsets.push(data_size); - } - } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for range in self.ranges { - let col_data = &value_data[values_offset[range.start as usize] as usize - ..values_offset[range.end as usize] as usize]; - copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len()); + for index in range.start as usize..range.end as usize { + let value = unsafe { values.index_unchecked(index) }; + builder.put_slice(value); + builder.commit_row(); } - set_vec_len_by_ptr(&mut data, data_ptr); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } } diff --git a/src/query/expression/src/row/row_converter.rs b/src/query/expression/src/row/row_converter.rs index 3c4d967ae5c0..f259df073192 100644 --- a/src/query/expression/src/row/row_converter.rs +++ b/src/query/expression/src/row/row_converter.rs @@ -84,10 +84,7 @@ impl RowConverter { encode_column(&mut builder, column, field.asc, field.nulls_first); } - let rows = builder.build(); - debug_assert_eq!(*rows.offsets().last().unwrap(), rows.data().len() as u64); - debug_assert!(rows.offsets().windows(2).all(|w| w[0] <= w[1])); - rows + builder.build() } fn new_empty_rows(&self, cols: &[Column], num_rows: usize) -> BinaryColumnBuilder { diff --git a/src/query/expression/src/types/binary.rs b/src/query/expression/src/types/binary.rs index cf2708bfc1d7..3676ed76b8c9 100644 --- a/src/query/expression/src/types/binary.rs +++ b/src/query/expression/src/types/binary.rs @@ -14,13 +14,11 @@ use std::cmp::Ordering; use std::iter::once; -use std::marker::PhantomData; use std::ops::Range; -use databend_common_arrow::arrow::buffer::Buffer; +use databend_common_arrow::arrow::array::BinaryViewArray; +use databend_common_arrow::arrow::array::MutableBinaryViewArray; use databend_common_arrow::arrow::trusted_len::TrustedLen; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; use serde::Deserialize; use serde::Serialize; @@ -30,7 +28,6 @@ use crate::types::DataType; use crate::types::DecimalSize; use crate::types::GenericMap; use crate::types::ValueType; -use crate::utils::arrow::buffer_into_mut; use crate::values::Column; use crate::values::Scalar; use crate::ColumnBuilder; @@ -167,7 +164,7 @@ impl ValueType for BinaryType { } fn column_memory_size(col: &Self::Column) -> usize { - col.data().len() + col.offsets().len() * 8 + col.memory_size() } #[inline(always)] @@ -190,45 +187,32 @@ impl ArgType for BinaryType { #[derive(Clone, PartialEq)] pub struct BinaryColumn { - pub(crate) data: Buffer, - pub(crate) offsets: Buffer, + pub(crate) data: BinaryViewArray, } impl BinaryColumn { - pub fn new(data: Buffer, offsets: Buffer) -> Self { - debug_assert!({ offsets.windows(2).all(|w| w[0] <= w[1]) }); - - BinaryColumn { data, offsets } + pub fn new(data: BinaryViewArray) -> Self { + BinaryColumn { data } } pub fn len(&self) -> usize { - self.offsets.len() - 1 + self.data.len() } pub fn current_buffer_len(&self) -> usize { - (*self.offsets().last().unwrap() - *self.offsets().first().unwrap()) as _ - } - - pub fn data(&self) -> &Buffer { - &self.data - } - - pub fn offsets(&self) -> &Buffer { - &self.offsets + self.data.total_bytes_len() } pub fn memory_size(&self) -> usize { - let offsets = self.offsets.as_slice(); - let len = offsets.len(); - len * 8 + (offsets[len - 1] - offsets[0]) as usize + self.data.total_buffer_len() } pub fn index(&self, index: usize) -> Option<&[u8]> { - if index + 1 < self.offsets.len() { - Some(&self.data[(self.offsets[index] as usize)..(self.offsets[index + 1] as usize)]) - } else { - None + if index >= self.len() { + return None; } + + Some(unsafe { self.index_unchecked(index) }) } /// # Safety @@ -236,93 +220,53 @@ impl BinaryColumn { /// Calling this method with an out-of-bounds index is *[undefined behavior]* #[inline] pub unsafe fn index_unchecked(&self, index: usize) -> &[u8] { - let start = *self.offsets.get_unchecked(index) as usize; - let end = *self.offsets.get_unchecked(index + 1) as usize; - self.data.get_unchecked(start..end) + debug_assert!(index < self.data.len()); + self.data.value_unchecked(index) } pub fn slice(&self, range: Range) -> Self { - let offsets = self - .offsets + let data = self + .data .clone() - .sliced(range.start, range.end - range.start + 1); - BinaryColumn { - data: self.data.clone(), - offsets, - } + .sliced(range.start, range.end - range.start); + BinaryColumn { data } } pub fn iter(&self) -> BinaryIterator { BinaryIterator { - data: &self.data, - offsets: self.offsets.windows(2), - _t: PhantomData, + col: &self.data, + index: 0, } } - pub fn into_buffer(self) -> (Buffer, Buffer) { - (self.data, self.offsets) - } - - pub fn check_valid(&self) -> Result<()> { - let offsets = self.offsets.as_slice(); - let len = offsets.len(); - if len < 1 { - return Err(ErrorCode::Internal(format!( - "BinaryColumn offsets length must be equal or greater than 1, but got {}", - len - ))); - } - - for i in 1..len { - if offsets[i] < offsets[i - 1] { - return Err(ErrorCode::Internal(format!( - "BinaryColumn offsets value must be equal or greater than previous value, but got {}", - offsets[i] - ))); - } - } - Ok(()) - } -} - -pub type BinaryIterator<'a> = BinaryLikeIterator<'a, &'a [u8]>; - -pub trait BinaryLike<'a> { - fn from(value: &'a [u8]) -> Self; -} - -impl<'a> BinaryLike<'a> for &'a [u8] { - fn from(value: &'a [u8]) -> Self { - value + pub fn into_inner(self) -> BinaryViewArray { + self.data } } -pub struct BinaryLikeIterator<'a, T> -where T: BinaryLike<'a> -{ - pub(crate) data: &'a [u8], - pub(crate) offsets: std::slice::Windows<'a, u64>, - pub(crate) _t: PhantomData, +pub struct BinaryIterator<'a> { + pub(crate) col: &'a BinaryViewArray, + pub(crate) index: usize, } -impl<'a, T: BinaryLike<'a>> Iterator for BinaryLikeIterator<'a, T> { - type Item = T; +impl<'a> Iterator for BinaryIterator<'a> { + type Item = &'a [u8]; fn next(&mut self) -> Option { - self.offsets - .next() - .map(|range| T::from(&self.data[(range[0] as usize)..(range[1] as usize)])) + let value = self.col.value(self.index); + self.index += 1; + Some(value) } fn size_hint(&self) -> (usize, Option) { - self.offsets.size_hint() + let remaining = self.col.len() - self.index; + (remaining, Some(remaining)) } } -unsafe impl<'a, T: BinaryLike<'a>> TrustedLen for BinaryLikeIterator<'a, T> {} +unsafe impl<'a> TrustedLen for BinaryIterator<'a> {} -unsafe impl<'a, T: BinaryLike<'a>> std::iter::TrustedLen for BinaryLikeIterator<'a, T> {} +unsafe impl<'a> std::iter::TrustedLen for BinaryIterator<'a> {} #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct BinaryColumnBuilder { @@ -344,11 +288,12 @@ impl BinaryColumnBuilder { } pub fn from_column(col: BinaryColumn) -> Self { - BinaryColumnBuilder { - need_estimated: col.data.is_empty(), - data: buffer_into_mut(col.data), - offsets: col.offsets.to_vec(), + let mut builder = BinaryColumnBuilder::with_capacity(col.len(), col.current_buffer_len()); + for item in col.iter() { + builder.put_slice(item); + builder.commit_row(); } + builder } pub fn from_data(data: Vec, offsets: Vec) -> Self { @@ -443,23 +388,25 @@ impl BinaryColumnBuilder { } pub fn append_column(&mut self, other: &BinaryColumn) { - // the first offset of other column may not be zero - let other_start = *other.offsets.first().unwrap(); - let other_last = *other.offsets.last().unwrap(); - let start = self.offsets.last().cloned().unwrap(); - self.data - .extend_from_slice(&other.data[(other_start as usize)..(other_last as usize)]); - self.offsets.extend( - other - .offsets - .iter() - .skip(1) - .map(|offset| start + offset - other_start), - ); + self.data.reserve(other.current_buffer_len()); + for item in other.iter() { + self.put_slice(item); + self.commit_row(); + } } pub fn build(self) -> BinaryColumn { - BinaryColumn::new(self.data.into(), self.offsets.into()) + let mut bulider = MutableBinaryViewArray::with_capacity(self.len()); + for (start, end) in self + .offsets + .windows(2) + .map(|w| (w[0] as usize, w[1] as usize)) + { + bulider.push_value(&self.data[start..end]); + } + BinaryColumn { + data: bulider.into(), + } } pub fn build_scalar(self) -> Vec { diff --git a/src/query/expression/src/types/bitmap.rs b/src/query/expression/src/types/bitmap.rs index 1823941ba2b7..ab411346980a 100644 --- a/src/query/expression/src/types/bitmap.rs +++ b/src/query/expression/src/types/bitmap.rs @@ -161,7 +161,7 @@ impl ValueType for BitmapType { } fn column_memory_size(col: &Self::Column) -> usize { - col.data().len() + col.offsets().len() * 8 + col.memory_size() } #[inline(always)] diff --git a/src/query/expression/src/types/geography.rs b/src/query/expression/src/types/geography.rs index 8c0d95e92c4e..921dd7de0c06 100644 --- a/src/query/expression/src/types/geography.rs +++ b/src/query/expression/src/types/geography.rs @@ -19,6 +19,7 @@ use std::ops::Range; use borsh::BorshDeserialize; use borsh::BorshSerialize; +use databend_common_arrow::arrow::trusted_len::TrustedLen; use databend_common_exception::Result; use databend_common_io::geography::*; use databend_common_io::wkb::make_point; @@ -29,8 +30,7 @@ use geozero::ToWkt; use serde::Deserialize; use serde::Serialize; -use super::binary::BinaryLike; -use super::binary::BinaryLikeIterator; +use super::binary::BinaryIterator; use crate::property::Domain; use crate::types::binary::BinaryColumn; use crate::types::binary::BinaryColumnBuilder; @@ -83,12 +83,6 @@ impl<'a> GeographyRef<'a> { } } -impl<'a> BinaryLike<'a> for GeographyRef<'a> { - fn from(value: &'a [u8]) -> Self { - GeographyRef(value) - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct GeographyType; @@ -281,16 +275,24 @@ impl GeographyColumn { } pub fn iter(&self) -> GeographyIterator<'_> { - BinaryLikeIterator { - data: &self.0.data, - offsets: self.0.offsets.windows(2), - _t: std::marker::PhantomData, + GeographyIterator { + inner: self.0.iter(), } } +} - pub fn check_valid(&self) -> Result<()> { - self.0.check_valid() +pub struct GeographyIterator<'a> { + inner: BinaryIterator<'a>, +} + +impl<'a> Iterator for GeographyIterator<'a> { + type Item = GeographyRef<'a>; + + fn next(&mut self) -> Option { + self.inner.next().map(GeographyRef) } } -pub type GeographyIterator<'a> = BinaryLikeIterator<'a, GeographyRef<'a>>; +unsafe impl<'a> TrustedLen for GeographyIterator<'a> {} + +unsafe impl<'a> std::iter::TrustedLen for GeographyIterator<'a> {} diff --git a/src/query/expression/src/types/geometry.rs b/src/query/expression/src/types/geometry.rs index 67f1afc95ec9..d1dfe01911cc 100644 --- a/src/query/expression/src/types/geometry.rs +++ b/src/query/expression/src/types/geometry.rs @@ -165,7 +165,7 @@ impl ValueType for GeometryType { } fn column_memory_size(col: &Self::Column) -> usize { - col.data().len() + col.offsets().len() * 8 + col.memory_size() } #[inline(always)] diff --git a/src/query/expression/src/types/string.rs b/src/query/expression/src/types/string.rs index c4fe32534c96..cf26dff46ada 100644 --- a/src/query/expression/src/types/string.rs +++ b/src/query/expression/src/types/string.rs @@ -13,10 +13,9 @@ // limitations under the License. use std::cmp::Ordering; -use std::iter::once; use std::ops::Range; -use databend_common_arrow::arrow::buffer::Buffer; +use databend_common_arrow::arrow::array::BinaryViewArray; use databend_common_arrow::arrow::trusted_len::TrustedLen; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -32,7 +31,6 @@ use crate::types::DataType; use crate::types::DecimalSize; use crate::types::GenericMap; use crate::types::ValueType; -use crate::utils::arrow::buffer_into_mut; use crate::values::Column; use crate::values::Scalar; use crate::ColumnBuilder; @@ -166,7 +164,7 @@ impl ValueType for StringType { } fn column_memory_size(col: &Self::Column) -> usize { - col.data().len() + col.offsets().len() * 8 + col.memory_size() } #[inline(always)] @@ -224,13 +222,12 @@ impl ArgType for StringType { #[derive(Clone, PartialEq)] pub struct StringColumn { - data: Buffer, - offsets: Buffer, + pub(crate) data: BinaryViewArray, } impl StringColumn { - pub fn new(data: Buffer, offsets: Buffer) -> Self { - let col = BinaryColumn::new(data, offsets); + pub fn new(data: BinaryViewArray) -> Self { + let col = BinaryColumn::new(data); col.check_utf8().unwrap(); @@ -241,8 +238,8 @@ impl StringColumn { /// This function is unsound iff: /// * the offsets are not monotonically increasing /// * The `data` between two consecutive `offsets` are not valid utf8 - pub unsafe fn new_unchecked(data: Buffer, offsets: Buffer) -> Self { - let col = BinaryColumn::new(data, offsets); + pub unsafe fn new_unchecked(data: BinaryViewArray) -> Self { + let col = BinaryColumn::new(data); #[cfg(debug_assertions)] col.check_utf8().unwrap(); @@ -259,44 +256,28 @@ impl StringColumn { col.check_utf8().unwrap(); StringColumn { - data: col.data, - offsets: col.offsets, + data: col.into_inner(), } } pub fn len(&self) -> usize { - self.offsets.len() - 1 + self.data.len() } pub fn current_buffer_len(&self) -> usize { - (*self.offsets().last().unwrap() - *self.offsets().first().unwrap()) as _ - } - - pub fn data(&self) -> &Buffer { - &self.data - } - - pub fn offsets(&self) -> &Buffer { - &self.offsets + self.data.total_bytes_len() } pub fn memory_size(&self) -> usize { - let offsets = self.offsets.as_slice(); - let len = offsets.len(); - len * 8 + (offsets[len - 1] - offsets[0]) as usize + self.data.total_buffer_len() } pub fn index(&self, index: usize) -> Option<&str> { - if index + 1 >= self.offsets.len() { + if index >= self.len() { return None; } - let bytes = &self.data[(self.offsets[index] as usize)..(self.offsets[index + 1] as usize)]; - - #[cfg(debug_assertions)] - bytes.check_utf8().unwrap(); - - unsafe { Some(std::str::from_utf8_unchecked(bytes)) } + Some(unsafe { self.index_unchecked(index) }) } /// # Safety @@ -304,11 +285,9 @@ impl StringColumn { /// Calling this method with an out-of-bounds index is *[undefined behavior]* #[inline] pub unsafe fn index_unchecked(&self, index: usize) -> &str { - debug_assert!(index + 1 < self.offsets.len()); + debug_assert!(index < self.data.len()); - let start = *self.offsets.get_unchecked(index) as usize; - let end = *self.offsets.get_unchecked(index + 1) as usize; - let bytes = &self.data.get_unchecked(start..end); + let bytes = self.data.value_unchecked(index); #[cfg(debug_assertions)] bytes.check_utf8().unwrap(); @@ -321,71 +300,41 @@ impl StringColumn { /// Calling this method with an out-of-bounds index is *[undefined behavior]* #[inline] pub unsafe fn index_unchecked_bytes(&self, index: usize) -> &[u8] { - debug_assert!(index + 1 < self.offsets.len()); + debug_assert!(index < self.data.len()); - let start = *self.offsets.get_unchecked(index) as usize; - let end = *self.offsets.get_unchecked(index + 1) as usize; - self.data.get_unchecked(start..end) + self.data.value_unchecked(index) } pub fn slice(&self, range: Range) -> Self { - let offsets = self - .offsets + let data = self + .data .clone() - .sliced(range.start, range.end - range.start + 1); - StringColumn { - data: self.data.clone(), - offsets, - } + .sliced(range.start, range.end - range.start); + unsafe { Self::from_binary_unchecked(BinaryColumn::new(data)) } } pub fn iter(&self) -> StringIterator { StringIterator { - data: &self.data, - offsets: self.offsets.windows(2), + col: self, + index: 0, } } pub fn iter_binary(&self) -> BinaryIterator { BinaryIterator { - data: &self.data, - offsets: self.offsets.windows(2), - _t: std::marker::PhantomData, + col: &self.data, + index: 0, } } - pub fn into_buffer(self) -> (Buffer, Buffer) { - (self.data, self.offsets) - } - - pub fn check_valid(&self) -> Result<()> { - let offsets = self.offsets.as_slice(); - let len = offsets.len(); - if len < 1 { - return Err(ErrorCode::Internal(format!( - "StringColumn offsets length must be equal or greater than 1, but got {}", - len - ))); - } - - for i in 1..len { - if offsets[i] < offsets[i - 1] { - return Err(ErrorCode::Internal(format!( - "StringColumn offsets value must be equal or greater than previous value, but got {}", - offsets[i] - ))); - } - } - Ok(()) + pub fn into_inner(self) -> BinaryViewArray { + self.data } } impl From for BinaryColumn { fn from(col: StringColumn) -> BinaryColumn { - BinaryColumn { - data: col.data, - offsets: col.offsets, - } + BinaryColumn::new(col.into_inner()) } } @@ -395,34 +344,28 @@ impl TryFrom for StringColumn { fn try_from(col: BinaryColumn) -> Result { col.check_utf8()?; Ok(StringColumn { - data: col.data, - offsets: col.offsets, + data: col.into_inner(), }) } } pub struct StringIterator<'a> { - data: &'a [u8], - offsets: std::slice::Windows<'a, u64>, + col: &'a StringColumn, + index: usize, } impl<'a> Iterator for StringIterator<'a> { type Item = &'a str; fn next(&mut self) -> Option { - let bytes = self - .offsets - .next() - .map(|range| &self.data[(range[0] as usize)..(range[1] as usize)])?; - - #[cfg(debug_assertions)] - bytes.check_utf8().unwrap(); - - unsafe { Some(std::str::from_utf8_unchecked(bytes)) } + let value = self.col.index(self.index)?; + self.index += 1; + Some(value) } fn size_hint(&self) -> (usize, Option) { - self.offsets.size_hint() + let remaining = self.col.len() - self.index; + (remaining, Some(remaining)) } } @@ -432,29 +375,19 @@ unsafe impl<'a> std::iter::TrustedLen for StringIterator<'a> {} #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct StringColumnBuilder { - // if the StringColumnBuilder is created with `data_capacity`, need_estimated is false - pub need_estimated: bool, - pub data: Vec, - pub offsets: Vec, + inner: BinaryColumnBuilder, } impl StringColumnBuilder { pub fn with_capacity(len: usize, data_capacity: usize) -> Self { - let mut offsets = Vec::with_capacity(len + 1); - offsets.push(0); StringColumnBuilder { - need_estimated: data_capacity == 0 && len > 0, - data: Vec::with_capacity(data_capacity), - offsets, + inner: BinaryColumnBuilder::with_capacity(len, data_capacity), } } pub fn from_column(col: StringColumn) -> Self { - StringColumnBuilder { - need_estimated: col.data.is_empty(), - data: buffer_into_mut(col.data), - offsets: col.offsets.to_vec(), - } + let builder = BinaryColumnBuilder::from_column(col.into()); + unsafe { StringColumnBuilder::from_binary_unchecked(builder) } } pub fn from_data(data: Vec, offsets: Vec) -> Self { @@ -471,114 +404,69 @@ impl StringColumnBuilder { #[cfg(debug_assertions)] col.check_utf8().unwrap(); - StringColumnBuilder { - need_estimated: col.need_estimated, - data: col.data, - offsets: col.offsets, - } + StringColumnBuilder { inner: col } } pub fn repeat(scalar: &str, n: usize) -> Self { - let len = scalar.len(); - let data = scalar.as_bytes().repeat(n); - let offsets = once(0) - .chain((0..n).map(|i| (len * (i + 1)) as u64)) - .collect(); - StringColumnBuilder { - data, - offsets, - need_estimated: false, - } + let builder = BinaryColumnBuilder::repeat(scalar.as_bytes(), n); + unsafe { StringColumnBuilder::from_binary_unchecked(builder) } } pub fn repeat_default(n: usize) -> Self { - StringColumnBuilder { - data: vec![], - offsets: vec![0; n + 1], - need_estimated: false, - } + let builder = BinaryColumnBuilder::repeat_default(n); + unsafe { StringColumnBuilder::from_binary_unchecked(builder) } } pub fn len(&self) -> usize { - self.offsets.len() - 1 + self.inner.len() } pub fn memory_size(&self) -> usize { - self.offsets.len() * 8 + self.data.len() + self.inner.memory_size() } pub fn put_char(&mut self, item: char) { - self.data - .extend_from_slice(item.encode_utf8(&mut [0; 4]).as_bytes()); + self.inner.put_char(item); } #[inline] - #[deprecated] pub fn put_slice(&mut self, item: &[u8]) { #[cfg(debug_assertions)] item.check_utf8().unwrap(); - self.data.extend_from_slice(item); + self.inner.put_slice(item); } #[inline] pub fn put_str(&mut self, item: &str) { - self.data.extend_from_slice(item.as_bytes()); + self.inner.put_str(item); } pub fn put_char_iter(&mut self, iter: impl Iterator) { - for c in iter { - let mut buf = [0; 4]; - let result = c.encode_utf8(&mut buf); - self.data.extend_from_slice(result.as_bytes()); - } + self.inner.put_char_iter(iter); } #[inline] pub fn commit_row(&mut self) { - self.offsets.push(self.data.len() as u64); - - if self.need_estimated - && self.offsets.len() - 1 == 64 - && self.offsets.len() < self.offsets.capacity() - { - let bytes_per_row = self.data.len() / 64 + 1; - let bytes_estimate = bytes_per_row * self.offsets.capacity(); - - const MAX_HINT_SIZE: usize = 1_000_000_000; - // if we are more than 10% over the capacity, we reserve more - if bytes_estimate < MAX_HINT_SIZE - && bytes_estimate as f64 > self.data.capacity() as f64 * 1.10f64 - { - self.data.reserve(bytes_estimate - self.data.capacity()); - } - } + self.inner.commit_row(); } pub fn append_column(&mut self, other: &StringColumn) { - // the first offset of other column may not be zero - let other_start = *other.offsets.first().unwrap(); - let other_last = *other.offsets.last().unwrap(); - let start = self.offsets.last().cloned().unwrap(); - self.data - .extend_from_slice(&other.data[(other_start as usize)..(other_last as usize)]); - self.offsets.extend( - other - .offsets - .iter() - .skip(1) - .map(|offset| start + offset - other_start), - ); + let other = BinaryColumn::from(other.clone()); + self.inner.append_column(&other); } pub fn build(self) -> StringColumn { - unsafe { StringColumn::new_unchecked(self.data.into(), self.offsets.into()) } + let col = self.inner.build(); + + #[cfg(debug_assertions)] + col.check_utf8().unwrap(); + + unsafe { StringColumn::from_binary_unchecked(col) } } pub fn build_scalar(self) -> String { - assert_eq!(self.offsets.len(), 2); - - let bytes = self.data[(self.offsets[0] as usize)..(self.offsets[1] as usize)].to_vec(); + let bytes = self.inner.build_scalar(); #[cfg(debug_assertions)] bytes.check_utf8().unwrap(); @@ -588,18 +476,14 @@ impl StringColumnBuilder { #[inline] pub fn may_resize(&self, add_size: usize) -> bool { - self.data.len() + add_size > self.data.capacity() + self.inner.may_resize(add_size) } /// # Safety /// /// Calling this method with an out-of-bounds index is *[undefined behavior]* pub unsafe fn index_unchecked(&self, row: usize) -> &str { - debug_assert!(row + 1 < self.offsets.len()); - - let start = *self.offsets.get_unchecked(row) as usize; - let end = *self.offsets.get_unchecked(row + 1) as usize; - let bytes = self.data.get_unchecked(start..end); + let bytes = self.inner.index_unchecked(row); #[cfg(debug_assertions)] bytes.check_utf8().unwrap(); @@ -608,37 +492,20 @@ impl StringColumnBuilder { } pub fn push_repeat(&mut self, item: &str, n: usize) { - self.data.reserve(item.len() * n); - if self.need_estimated && self.offsets.len() - 1 < 64 { - for _ in 0..n { - self.data.extend_from_slice(item.as_bytes()); - self.commit_row(); - } - } else { - let start = self.data.len(); - let len = item.len(); - for _ in 0..n { - self.data.extend_from_slice(item.as_bytes()); - } - self.offsets - .extend((1..=n).map(|i| (start + len * i) as u64)); - } + self.inner.push_repeat(item.as_bytes(), n); } pub fn pop(&mut self) -> Option { - if self.len() > 0 { - let index = self.len() - 1; - let start = unsafe { *self.offsets.get_unchecked(index) as usize }; - self.offsets.pop(); - let val = self.data.split_off(start); - + self.inner.pop().map(|bytes| unsafe { #[cfg(debug_assertions)] - val.check_utf8().unwrap(); + bytes.check_utf8().unwrap(); - Some(unsafe { String::from_utf8_unchecked(val) }) - } else { - None - } + String::from_utf8_unchecked(bytes) + }) + } + + pub fn as_inner_mut(&mut self) -> &mut BinaryColumnBuilder { + &mut self.inner } } @@ -656,11 +523,7 @@ impl<'a> FromIterator<&'a str> for StringColumnBuilder { impl From for BinaryColumnBuilder { fn from(builder: StringColumnBuilder) -> BinaryColumnBuilder { - BinaryColumnBuilder { - need_estimated: builder.need_estimated, - data: builder.data, - offsets: builder.offsets, - } + builder.inner } } @@ -669,11 +532,7 @@ impl TryFrom for StringColumnBuilder { fn try_from(builder: BinaryColumnBuilder) -> Result { builder.check_utf8()?; - Ok(StringColumnBuilder { - need_estimated: builder.need_estimated, - data: builder.data, - offsets: builder.offsets, - }) + Ok(StringColumnBuilder { inner: builder }) } } @@ -712,7 +571,10 @@ impl CheckUTF8 for Vec { impl CheckUTF8 for BinaryColumn { fn check_utf8(&self) -> Result<()> { - check_utf8_column(&self.offsets, &self.data) + for bytes in self.iter() { + bytes.check_utf8()?; + } + Ok(()) } } diff --git a/src/query/expression/src/types/variant.rs b/src/query/expression/src/types/variant.rs index 6d7ab89a3c8d..262d2f36aa72 100644 --- a/src/query/expression/src/types/variant.rs +++ b/src/query/expression/src/types/variant.rs @@ -176,7 +176,7 @@ impl ValueType for VariantType { } fn column_memory_size(col: &Self::Column) -> usize { - col.data().len() + col.offsets().len() * 8 + col.memory_size() } #[inline(always)] diff --git a/src/query/expression/src/utils/display.rs b/src/query/expression/src/utils/display.rs index c0023fc7b8a9..2932f8e5d7cc 100755 --- a/src/query/expression/src/utils/display.rs +++ b/src/query/expression/src/utils/display.rs @@ -417,11 +417,7 @@ impl Debug for DecimalColumn { impl Debug for BinaryColumn { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("BinaryColumn") - .field( - "data", - &format_args!("0x{}", &hex::encode(self.data().as_slice())), - ) - .field("offsets", &self.offsets()) + .field("data", &format_args!("{:?}", self.data)) .finish() } } @@ -429,11 +425,7 @@ impl Debug for BinaryColumn { impl Debug for StringColumn { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("StringColumn") - .field( - "data", - &format_args!("0x{}", &hex::encode(self.data().as_slice())), - ) - .field("offsets", &self.offsets()) + .field("data", &format_args!("{:?}", self.data)) .finish() } } diff --git a/src/query/expression/src/values.rs b/src/query/expression/src/values.rs index 685d948684a8..0666d1603e67 100755 --- a/src/query/expression/src/values.rs +++ b/src/query/expression/src/values.rs @@ -1171,12 +1171,6 @@ impl Column { pub fn check_valid(&self) -> Result<()> { match self { - Column::Binary(x) => x.check_valid(), - Column::String(x) => x.check_valid(), - Column::Variant(x) => x.check_valid(), - Column::Geometry(x) => x.check_valid(), - Column::Geography(x) => x.check_valid(), - Column::Bitmap(x) => x.check_valid(), Column::Map(x) => { for y in x.iter() { y.check_valid()?; @@ -1665,8 +1659,8 @@ impl ColumnBuilder { builder.len() * 32 } ColumnBuilder::Boolean(c) => c.as_slice().len(), - ColumnBuilder::Binary(col) => col.data.len() + col.offsets.len() * 8, - ColumnBuilder::String(col) => col.data.len() + col.offsets.len() * 8, + ColumnBuilder::Binary(col) => col.memory_size(), + ColumnBuilder::String(col) => col.memory_size(), ColumnBuilder::Timestamp(col) => col.len() * 8, ColumnBuilder::Date(col) => col.len() * 4, ColumnBuilder::Array(col) => col.builder.memory_size() + col.offsets.len() * 8, @@ -2045,6 +2039,7 @@ impl ColumnBuilder { builder.commit_row(); } ColumnBuilder::String(builder) => { + let builder = builder.as_inner_mut(); let offset = reader.read_scalar::()? as usize; builder.data.resize(offset + builder.data.len(), 0); let last = *builder.offsets.last().unwrap() as usize; diff --git a/src/query/expression/tests/it/common.rs b/src/query/expression/tests/it/common.rs index c2dd01b0f952..38a3bdaf16ad 100644 --- a/src/query/expression/tests/it/common.rs +++ b/src/query/expression/tests/it/common.rs @@ -127,7 +127,7 @@ pub fn run_scatter(file: &mut impl Write, block: &DataBlock, indices: &[u32], sc } pub fn run_take(file: &mut impl Write, indices: &[u32], block: &DataBlock) { - let result = DataBlock::take(block, indices, &mut None); + let result = DataBlock::take(block, indices); match result { Ok(result_block) => { diff --git a/src/query/expression/tests/it/kernel.rs b/src/query/expression/tests/it/kernel.rs index 1a9e078d8d91..a37de74e342d 100644 --- a/src/query/expression/tests/it/kernel.rs +++ b/src/query/expression/tests/it/kernel.rs @@ -310,15 +310,9 @@ pub fn test_take_and_filter_and_concat() -> databend_common_exception::Result<() .collect_vec(); let concated_blocks = DataBlock::concat(&blocks)?; - let block_1 = concated_blocks.take(&take_indices, &mut None)?; + let block_1 = concated_blocks.take(&take_indices)?; let block_2 = concated_blocks.take_compacted_indices(&take_compact_indices, count)?; - let block_3 = DataBlock::take_column_vec( - &column_vec, - &data_types, - &take_chunks_indices, - count, - &mut None, - ); + let block_3 = DataBlock::take_column_vec(&column_vec, &data_types, &take_chunks_indices, count); let block_4 = DataBlock::concat(&filtered_blocks)?; let block_5 = concated_blocks.take_ranges( &build_range_selection(&take_indices, take_indices.len()), @@ -417,7 +411,7 @@ pub fn test_take_compact() -> databend_common_exception::Result<()> { take_indices.extend(std::iter::repeat(batch_index as u32).take(batch_size)); take_compact_indices.push((batch_index as u32, batch_size as u32)); } - let block_1 = block.take(&take_indices, &mut None)?; + let block_1 = block.take(&take_indices)?; let block_2 = block.take_compacted_indices(&take_compact_indices, count)?; assert_eq!(block_1.num_columns(), block_2.num_columns()); @@ -485,8 +479,8 @@ pub fn test_filters() -> databend_common_exception::Result<()> { .map(|(i, _)| i as u32) .collect::>(); - let t_b = bb.take(&indices, &mut None)?; - let t_c = cc.take(&indices, &mut None)?; + let t_b = bb.take(&indices)?; + let t_c = cc.take(&indices)?; let f_b = bb.filter_with_bitmap(&f)?; let f_c = cc.filter_with_bitmap(&f)?; @@ -574,7 +568,7 @@ pub fn test_scatter() -> databend_common_exception::Result<()> { } } - let block_1 = random_block.take(&take_indices, &mut None)?; + let block_1 = random_block.take(&take_indices)?; let block_2 = DataBlock::concat(&scattered_blocks)?; assert_eq!(block_1.num_columns(), block_2.num_columns()); diff --git a/src/query/expression/tests/it/row.rs b/src/query/expression/tests/it/row.rs index f7ba9b75dd9c..07b9c2e196c9 100644 --- a/src/query/expression/tests/it/row.rs +++ b/src/query/expression/tests/it/row.rs @@ -18,7 +18,6 @@ use arrow_ord::sort::LexicographicalComparator; use arrow_ord::sort::SortColumn; use arrow_schema::SortOptions; use databend_common_arrow::arrow::bitmap::MutableBitmap; -use databend_common_arrow::arrow::offset::OffsetsBuffer; use databend_common_base::base::OrderedFloat; use databend_common_expression::converts::arrow2::set_validities; use databend_common_expression::types::binary::BinaryColumnBuilder; @@ -70,31 +69,6 @@ fn test_fixed_width() { let rows = converter.convert_columns(&cols, cols[0].len()); - assert_eq!( - rows.offsets().clone(), - vec![0, 8, 16, 24, 32, 40, 48, 56].into() - ); - assert_eq!( - rows.data().clone(), - vec![ - 1, 128, 1, // - 1, 191, 166, 102, 102, // - 1, 128, 2, // - 1, 192, 32, 0, 0, // - 0, 0, 0, // - 0, 0, 0, 0, 0, // - 1, 127, 251, // - 1, 192, 128, 0, 0, // - 1, 128, 2, // - 1, 189, 204, 204, 205, // - 1, 128, 2, // - 1, 63, 127, 255, 255, // - 1, 128, 0, // - 1, 127, 255, 255, 255 // - ] - .into() - ); - unsafe { assert!(rows.index_unchecked(3) < rows.index_unchecked(6)); assert!(rows.index_unchecked(0) < rows.index_unchecked(1)); @@ -549,17 +523,7 @@ fn fuzz_test() { // arrow_ord does not support LargeBinary converted from Databend String Column::Nullable(c) => match &c.column { Column::String(sc) => { - let offsets = - sc.offsets().iter().map(|offset| *offset as i64).collect(); - let array = Box::new( - databend_common_arrow::arrow::array::Utf8Array::::try_new( - databend_common_arrow::arrow::datatypes::DataType::LargeUtf8, - unsafe { OffsetsBuffer::new_unchecked(offsets) }, - sc.data().clone(), - None, - ) - .unwrap(), - ); + let array = Box::new(sc.clone().into_inner()); set_validities(array, &c.validity) } _ => col.as_arrow(), diff --git a/src/query/formats/src/field_decoder/fast_values.rs b/src/query/formats/src/field_decoder/fast_values.rs index df66a4eef374..407fa6549503 100644 --- a/src/query/formats/src/field_decoder/fast_values.rs +++ b/src/query/formats/src/field_decoder/fast_values.rs @@ -270,7 +270,7 @@ impl FastFieldDecoderValues { reader: &mut Cursor, positions: &mut VecDeque, ) -> Result<()> { - self.read_string_inner(reader, &mut column.data, positions)?; + self.read_string_inner(reader, &mut column.as_inner_mut().data, positions)?; column.commit_row(); Ok(()) } diff --git a/src/query/formats/src/field_decoder/nested.rs b/src/query/formats/src/field_decoder/nested.rs index 6b1f7323ef53..f75b8c0bba39 100644 --- a/src/query/formats/src/field_decoder/nested.rs +++ b/src/query/formats/src/field_decoder/nested.rs @@ -196,7 +196,7 @@ impl NestedValues { column: &mut StringColumnBuilder, reader: &mut Cursor, ) -> Result<()> { - reader.read_quoted_text(&mut column.data, b'\'')?; + reader.read_quoted_text(&mut column.as_inner_mut().data, b'\'')?; column.commit_row(); Ok(()) } diff --git a/src/query/functions/src/scalars/arithmetic.rs b/src/query/functions/src/scalars/arithmetic.rs index 7d58f6978208..bee6f0616cdc 100644 --- a/src/query/functions/src/scalars/arithmetic.rs +++ b/src/query/functions/src/scalars/arithmetic.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_expression::serialize::read_decimal_with_size; +use databend_common_expression::types::binary::BinaryColumnBuilder; use databend_common_expression::types::decimal::DecimalDomain; use databend_common_expression::types::decimal::DecimalType; use databend_common_expression::types::nullable::NullableColumn; @@ -37,6 +38,7 @@ use databend_common_expression::types::NullableType; use databend_common_expression::types::NumberClass; use databend_common_expression::types::NumberDataType; use databend_common_expression::types::SimpleDomain; +use databend_common_expression::types::StringColumn; use databend_common_expression::types::StringType; use databend_common_expression::types::ALL_FLOAT_TYPES; use databend_common_expression::types::ALL_INTEGER_TYPES; @@ -972,29 +974,26 @@ pub fn register_number_to_string(registry: &mut FunctionRegistry) { let options = NUM_TYPE::lexical_options(); const FORMAT: u128 = lexical_core::format::STANDARD; + type Native = ::Native; + let mut builder = StringColumnBuilder::with_capacity(from.len(), from.len() + 1); - let values = &mut builder.data; + let mut buffer = Vec::with_capacity( + ::Native::FORMATTED_SIZE_DECIMAL, + ); - type Native = ::Native; - let mut offset: usize = 0; unsafe { for x in from.iter() { - values.reserve(offset + Native::FORMATTED_SIZE_DECIMAL); - values.set_len(offset + Native::FORMATTED_SIZE_DECIMAL); - let bytes = &mut values[offset..]; - let len = lexical_core::write_with_options_unchecked::< _, FORMAT, >( - Native::from(*x), bytes, &options + Native::from(*x), &mut buffer, &options ) .len(); - offset += len; - builder.offsets.push(offset as u64); + builder.put_slice(&buffer[..len]); + builder.commit_row(); } - values.set_len(offset); } Value::Column(builder.build()) } @@ -1009,7 +1008,7 @@ pub fn register_number_to_string(registry: &mut FunctionRegistry) { let options = NUM_TYPE::lexical_options(); const FORMAT: u128 = lexical_core::format::STANDARD; let mut builder = - StringColumnBuilder::with_capacity(from.len(), from.len() + 1); + BinaryColumnBuilder::with_capacity(from.len(), from.len() + 1); let values = &mut builder.data; type Native = ::Native; @@ -1031,7 +1030,7 @@ pub fn register_number_to_string(registry: &mut FunctionRegistry) { } values.set_len(offset); } - let result = builder.build(); + let result = StringColumn::try_from(builder.build()).unwrap(); Value::Column(NullableColumn::new( result, Bitmap::new_constant(true, from.len()), diff --git a/src/query/functions/src/scalars/binary.rs b/src/query/functions/src/scalars/binary.rs index 6c76598768f6..fad04ace15ca 100644 --- a/src/query/functions/src/scalars/binary.rs +++ b/src/query/functions/src/scalars/binary.rs @@ -30,7 +30,7 @@ use databend_common_expression::types::NumberDataType; use databend_common_expression::types::NumberType; use databend_common_expression::types::StringType; use databend_common_expression::types::UInt8Type; -use databend_common_expression::types::ValueType; +use databend_common_expression::vectorize_1_arg; use databend_common_expression::Column; use databend_common_expression::EvalContext; use databend_common_expression::Function; @@ -49,19 +49,7 @@ pub fn register(registry: &mut FunctionRegistry) { registry.register_passthrough_nullable_1_arg::, _, _>( "length", |_, _| FunctionDomain::Full, - |val, _| match val { - ValueRef::Scalar(s) => Value::Scalar(s.len() as u64), - ValueRef::Column(c) => { - let diffs = c - .offsets() - .iter() - .zip(c.offsets().iter().skip(1)) - .map(|(a, b)| b - a) - .collect::>(); - - Value::Column(diffs.into()) - } - }, + vectorize_1_arg::>(|val, _| val.len() as u64), ); registry.register_passthrough_nullable_1_arg::( @@ -101,12 +89,12 @@ pub fn register(registry: &mut FunctionRegistry) { "to_hex", |_, _| FunctionDomain::Full, vectorize_binary_to_string( - |col| col.data().len() * 2, + |col| col.current_buffer_len() * 2, |val, output, _| { - let old_len = output.data.len(); + let old_len = output.as_inner_mut().data.len(); let extra_len = val.len() * 2; - output.data.resize(old_len + extra_len, 0); - hex::encode_to_slice(val, &mut output.data[old_len..]).unwrap(); + output.as_inner_mut().data.resize(old_len + extra_len, 0); + hex::encode_to_slice(val, &mut output.as_inner_mut().data[old_len..]).unwrap(); output.commit_row(); }, ), @@ -128,10 +116,10 @@ pub fn register(registry: &mut FunctionRegistry) { "to_base64", |_, _| FunctionDomain::Full, vectorize_binary_to_string( - |col| col.data().len() * 4 / 3 + col.len() * 4, + |col| col.current_buffer_len() * 4 / 3 + col.len() * 4, |val, output, _| { base64::write::EncoderWriter::new( - &mut output.data, + &mut output.as_inner_mut().data, &base64::engine::general_purpose::STANDARD, ) .write_all(val) @@ -203,7 +191,7 @@ pub fn register(registry: &mut FunctionRegistry) { fn eval_binary_to_string(val: ValueRef, ctx: &mut EvalContext) -> Value { vectorize_binary_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, ctx| { if let Ok(val) = simdutf8::basic::from_utf8(val) { output.put_str(val); @@ -217,7 +205,7 @@ fn eval_binary_to_string(val: ValueRef, ctx: &mut EvalContext) -> Va fn eval_unhex(val: ValueRef, ctx: &mut EvalContext) -> Value { vectorize_string_to_binary( - |col| col.data().len() / 2, + |col| col.current_buffer_len() / 2, |val, output, ctx| { let old_len = output.data.len(); let extra_len = val.len() / 2; @@ -232,7 +220,7 @@ fn eval_unhex(val: ValueRef, ctx: &mut EvalContext) -> Value, ctx: &mut EvalContext) -> Value { vectorize_string_to_binary( - |col| col.data().len() * 4 / 3 + col.len() * 4, + |col| col.current_buffer_len() * 4 / 3 + col.len() * 4, |val, output, ctx| { if let Err(err) = base64::Engine::decode_vec( &base64::engine::general_purpose::STANDARD, @@ -304,34 +292,18 @@ fn char_fn(args: &[ValueRef], _: &mut EvalContext) -> Value { }); let input_rows = len.unwrap_or(1); - let mut values: Vec = vec![0; input_rows * args.len()]; - let values_ptr = values.as_mut_ptr(); + let mut builder = BinaryColumnBuilder::with_capacity(input_rows, 0); - for (i, arg) in args.iter().enumerate() { - match arg { - ValueRef::Scalar(v) => { - for j in 0..input_rows { - unsafe { - *values_ptr.add(args.len() * j + i) = *v; - } - } - } - ValueRef::Column(c) => { - for (j, ch) in UInt8Type::iter_column(c).enumerate() { - unsafe { - *values_ptr.add(args.len() * j + i) = ch; - } - } - } + for _ in 0..input_rows { + for arg in &args { + let val = arg.index(0).unwrap(); + builder.put_u8(val); } + builder.commit_row(); } - let offsets = (0..(input_rows + 1) as u64 * args.len() as u64) - .step_by(args.len()) - .collect::>(); - let result = BinaryColumn::new(values.into(), offsets.into()); match len { - Some(_) => Value::Column(Column::Binary(result)), - _ => Value::Scalar(Scalar::Binary(result.index(0).unwrap().to_vec())), + Some(_) => Value::Column(Column::Binary(builder.build())), + _ => Value::Scalar(Scalar::Binary(builder.build_scalar())), } } diff --git a/src/query/functions/src/scalars/comparison.rs b/src/query/functions/src/scalars/comparison.rs index 71f667de8bd5..d1e0c3d46ea0 100644 --- a/src/query/functions/src/scalars/comparison.rs +++ b/src/query/functions/src/scalars/comparison.rs @@ -531,35 +531,8 @@ fn vectorize_like( let mut builder = MutableBitmap::with_capacity(arg1.len()); let pattern_type = generate_like_pattern(arg2.as_bytes(), arg1.current_buffer_len()); if let LikePattern::SurroundByPercent(searcher) = pattern_type { - let needle_byte_len = searcher.needle().len(); - let data = arg1.data().as_slice(); - let offsets = arg1.offsets().as_slice(); - let mut idx = 0; - let mut pos = (*offsets.first().unwrap()) as usize; - let end = (*offsets.last().unwrap()) as usize; - - while pos < end { - if let Some(p) = searcher.search(&data[pos..end]) { - // data: {3x}googlex|{3x}googlex|{3x}googlex - // needle_size: 6 - // offsets: 0, 10, 20, 30 - // (pos, p): (0, 3) , (10, 3), (20, 3), () - while offsets[idx + 1] as usize <= pos + p { - builder.push(false); - idx += 1; - } - // check if the substring is in bound - builder.push(pos + p + needle_byte_len <= offsets[idx + 1] as usize); - pos = offsets[idx + 1] as usize; - idx += 1; - } else { - break; - } - } - - while idx < arg1.len() { - builder.push(false); - idx += 1; + for arg1 in arg1_iter { + builder.push(searcher.search(arg1.as_bytes()).is_some()); } } else { for arg1 in arg1_iter { diff --git a/src/query/functions/src/scalars/datetime.rs b/src/query/functions/src/scalars/datetime.rs index a8c54dc8c0fb..70fddc78254e 100644 --- a/src/query/functions/src/scalars/datetime.rs +++ b/src/query/functions/src/scalars/datetime.rs @@ -660,7 +660,12 @@ fn register_to_string(registry: &mut FunctionRegistry) { "to_string", |_, _| FunctionDomain::Full, vectorize_with_builder_1_arg::(|val, output, ctx| { - write!(output.data, "{}", date_to_string(val, ctx.func_ctx.tz.tz)).unwrap(); + write!( + output.as_inner_mut().data, + "{}", + date_to_string(val, ctx.func_ctx.tz.tz) + ) + .unwrap(); output.commit_row(); }), ); @@ -670,7 +675,7 @@ fn register_to_string(registry: &mut FunctionRegistry) { |_, _| FunctionDomain::Full, vectorize_with_builder_1_arg::(|val, output, ctx| { write!( - output.data, + output.as_inner_mut().data, "{}", timestamp_to_string(val, ctx.func_ctx.tz.tz) ) @@ -692,7 +697,7 @@ fn register_to_string(registry: &mut FunctionRegistry) { }, vectorize_with_builder_1_arg::>(|val, output, ctx| { write!( - output.builder.data, + output.builder.as_inner_mut().data, "{}", date_to_string(val, ctx.func_ctx.tz.tz) ) @@ -716,7 +721,7 @@ fn register_to_string(registry: &mut FunctionRegistry) { vectorize_with_builder_1_arg::>( |val, output, ctx| { write!( - output.builder.data, + output.builder.as_inner_mut().data, "{}", timestamp_to_string(val, ctx.func_ctx.tz.tz) ) diff --git a/src/query/functions/src/scalars/hash.rs b/src/query/functions/src/scalars/hash.rs index 4e00a7f4aeab..7eaa02a3ef0d 100644 --- a/src/query/functions/src/scalars/hash.rs +++ b/src/query/functions/src/scalars/hash.rs @@ -80,14 +80,14 @@ pub fn register(registry: &mut FunctionRegistry) { "md5", |_, _| FunctionDomain::MayThrow, vectorize_string_to_string( - |col| col.data().len() * 32, + |col| col.current_buffer_len() * 32, |val, output, ctx| { // TODO md5 lib doesn't allow encode into buffer... - let old_len = output.data.len(); - output.data.resize(old_len + 32, 0); + let old_len = output.as_inner_mut().data.len(); + output.as_inner_mut().data.resize(old_len + 32, 0); if let Err(err) = hex::encode_to_slice( Md5Hasher::digest(val).as_slice(), - &mut output.data[old_len..], + &mut output.as_inner_mut().data[old_len..], ) { ctx.set_error(output.len(), err.to_string()); } @@ -100,17 +100,18 @@ pub fn register(registry: &mut FunctionRegistry) { "sha", |_, _| FunctionDomain::MayThrow, vectorize_string_to_string( - |col| col.data().len() * 40, + |col| col.current_buffer_len() * 40, |val, output, ctx| { - let old_len = output.data.len(); - output.data.resize(old_len + 40, 0); + let old_len = output.as_inner_mut().data.len(); + output.as_inner_mut().data.resize(old_len + 40, 0); // TODO sha1 lib doesn't allow encode into buffer... let mut m = ::sha1::Sha1::new(); sha1::digest::Update::update(&mut m, val.as_bytes()); - if let Err(err) = - hex::encode_to_slice(m.finalize().as_slice(), &mut output.data[old_len..]) - { + if let Err(err) = hex::encode_to_slice( + m.finalize().as_slice(), + &mut output.as_inner_mut().data[old_len..], + ) { ctx.set_error(output.len(), err.to_string()); } output.commit_row(); @@ -122,13 +123,13 @@ pub fn register(registry: &mut FunctionRegistry) { "blake3", |_, _| FunctionDomain::MayThrow, vectorize_string_to_string( - |col| col.data().len() * 64, + |col| col.current_buffer_len() * 64, |val, output, ctx| { - let old_len = output.data.len(); - output.data.resize(old_len + 64, 0); + let old_len = output.as_inner_mut().data.len(); + output.as_inner_mut().data.resize(old_len + 64, 0); if let Err(err) = hex::encode_to_slice( blake3::hash(val.as_bytes()).as_bytes(), - &mut output.data[old_len..], + &mut output.as_inner_mut().data[old_len..], ) { ctx.set_error(output.len(), err.to_string()); } diff --git a/src/query/functions/src/scalars/other.rs b/src/query/functions/src/scalars/other.rs index 2ed8f5eeb456..257fd03885cd 100644 --- a/src/query/functions/src/scalars/other.rs +++ b/src/query/functions/src/scalars/other.rs @@ -22,6 +22,7 @@ use databend_common_base::base::convert_number_size; use databend_common_base::base::uuid::Uuid; use databend_common_base::base::OrderedFloat; use databend_common_expression::error_to_null; +use databend_common_expression::types::binary::BinaryColumnBuilder; use databend_common_expression::types::boolean::BooleanDomain; use databend_common_expression::types::nullable::NullableColumn; use databend_common_expression::types::number::Float32Type; @@ -232,16 +233,14 @@ pub fn register(registry: &mut FunctionRegistry) { "gen_random_uuid", |_| FunctionDomain::Full, |ctx| { - let mut values: Vec = Vec::with_capacity(ctx.num_rows * 36); - let mut offsets: Vec = Vec::with_capacity(ctx.num_rows); - offsets.push(0); + let mut builder = BinaryColumnBuilder::with_capacity(ctx.num_rows, 0); for _ in 0..ctx.num_rows { let value = Uuid::new_v4(); - offsets.push(offsets.last().unwrap() + 36u64); - write!(&mut values, "{:x}", value).unwrap(); + write!(&mut builder.data, "{}", value).unwrap(); } - let col = StringColumn::new(values.into(), offsets.into()); + + let col = StringColumn::try_from(builder.build()).unwrap(); Value::Column(col) }, ); diff --git a/src/query/functions/src/scalars/string.rs b/src/query/functions/src/scalars/string.rs index 25f10b82ccd2..2b16a0a72a92 100644 --- a/src/query/functions/src/scalars/string.rs +++ b/src/query/functions/src/scalars/string.rs @@ -26,6 +26,7 @@ use databend_common_expression::types::ArrayType; use databend_common_expression::types::NumberType; use databend_common_expression::types::StringType; use databend_common_expression::unify_string; +use databend_common_expression::vectorize_1_arg; use databend_common_expression::vectorize_with_builder_1_arg; use databend_common_expression::vectorize_with_builder_2_arg; use databend_common_expression::vectorize_with_builder_3_arg; @@ -103,7 +104,7 @@ pub fn register(registry: &mut FunctionRegistry) { "upper", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { for ch in val.chars() { if ch.is_ascii() { @@ -123,7 +124,7 @@ pub fn register(registry: &mut FunctionRegistry) { "lower", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { for ch in val.chars() { if ch.is_ascii() { @@ -148,19 +149,7 @@ pub fn register(registry: &mut FunctionRegistry) { registry.register_passthrough_nullable_1_arg::, _, _>( "octet_length", |_, _| FunctionDomain::Full, - |val, _| match val { - ValueRef::Scalar(s) => Value::Scalar(s.len() as u64), - ValueRef::Column(c) => { - let diffs = c - .offsets() - .iter() - .zip(c.offsets().iter().skip(1)) - .map(|(a, b)| b - a) - .collect::>(); - - Value::Column(diffs.into()) - } - }, + vectorize_1_arg::>(|val, _| val.len() as u64), ); registry.register_1_arg::, _, _>( @@ -383,7 +372,7 @@ pub fn register(registry: &mut FunctionRegistry) { "quote", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len() * 2, + |col| col.current_buffer_len() * 2, |val, output, _| { for ch in val.chars() { match ch { @@ -407,7 +396,7 @@ pub fn register(registry: &mut FunctionRegistry) { "reverse", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { for char in val.chars().rev() { output.put_char(char); @@ -437,7 +426,7 @@ pub fn register(registry: &mut FunctionRegistry) { "ltrim", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { output.put_str(val.trim_start()); output.commit_row(); @@ -449,7 +438,7 @@ pub fn register(registry: &mut FunctionRegistry) { "rtrim", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { output.put_str(val.trim_end()); output.commit_row(); @@ -461,7 +450,7 @@ pub fn register(registry: &mut FunctionRegistry) { "trim", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { output.put_str(val.trim()); output.commit_row(); @@ -473,7 +462,7 @@ pub fn register(registry: &mut FunctionRegistry) { "trim_leading", |_, _, _| FunctionDomain::Full, vectorize_string_to_string_2_arg( - |col, _| col.data().len(), + |col, _| col.current_buffer_len(), |val, trim_str, _, output| { if trim_str.is_empty() { output.put_str(val); @@ -491,7 +480,7 @@ pub fn register(registry: &mut FunctionRegistry) { "trim_trailing", |_, _, _| FunctionDomain::Full, vectorize_string_to_string_2_arg( - |col, _| col.data().len(), + |col, _| col.current_buffer_len(), |val, trim_str, _, output| { if trim_str.is_empty() { output.put_str(val); @@ -509,7 +498,7 @@ pub fn register(registry: &mut FunctionRegistry) { "trim_both", |_, _, _| FunctionDomain::Full, vectorize_string_to_string_2_arg( - |col, _| col.data().len(), + |col, _| col.current_buffer_len(), |val, trim_str, _, output| { if trim_str.is_empty() { output.put_str(val); @@ -536,12 +525,12 @@ pub fn register(registry: &mut FunctionRegistry) { "to_hex", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len() * 2, + |col| col.current_buffer_len() * 2, |val, output, _| { - let old_len = output.data.len(); + let old_len = output.as_inner_mut().data.len(); let extra_len = val.len() * 2; - output.data.resize(old_len + extra_len, 0); - hex::encode_to_slice(val, &mut output.data[old_len..]).unwrap(); + output.as_inner_mut().data.resize(old_len + extra_len, 0); + hex::encode_to_slice(val, &mut output.as_inner_mut().data[old_len..]).unwrap(); output.commit_row(); }, ), @@ -553,7 +542,7 @@ pub fn register(registry: &mut FunctionRegistry) { "bin", |_, _| FunctionDomain::Full, vectorize_with_builder_1_arg::, StringType>(|val, output, _| { - write!(output.data, "{val:b}").unwrap(); + write!(output.as_inner_mut().data, "{val:b}").unwrap(); output.commit_row(); }), ); @@ -561,7 +550,7 @@ pub fn register(registry: &mut FunctionRegistry) { "oct", |_, _| FunctionDomain::Full, vectorize_with_builder_1_arg::, StringType>(|val, output, _| { - write!(output.data, "{val:o}").unwrap(); + write!(output.as_inner_mut().data, "{val:o}").unwrap(); output.commit_row(); }), ); @@ -569,7 +558,7 @@ pub fn register(registry: &mut FunctionRegistry) { "to_hex", |_, _| FunctionDomain::Full, vectorize_with_builder_1_arg::, StringType>(|val, output, _| { - write!(output.data, "{val:x}").unwrap(); + write!(output.as_inner_mut().data, "{val:x}").unwrap(); output.commit_row(); }), ); @@ -625,7 +614,7 @@ pub fn register(registry: &mut FunctionRegistry) { "soundex", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| usize::max(col.data().len(), 4 * col.len()), + |col| usize::max(col.current_buffer_len(), 4 * col.len()), soundex::soundex, ), ); @@ -634,46 +623,19 @@ pub fn register(registry: &mut FunctionRegistry) { registry.register_passthrough_nullable_1_arg::, StringType, _, _>( "space", |_, _| FunctionDomain::MayThrow, - |times, ctx| match times { - ValueRef::Scalar(times) => { - if times > MAX_SPACE_LENGTH { - ctx.set_error( - 0, - format!("space length is too big, max is: {}", MAX_SPACE_LENGTH), - ); - Value::Scalar("".to_string()) - } else { - Value::Scalar(" ".repeat(times as usize)) - } - } - ValueRef::Column(col) => { - let mut total_space: u64 = 0; - let mut offsets: Vec = Vec::with_capacity(col.len() + 1); - offsets.push(0); - for (i, times) in col.iter().enumerate() { - if times > &MAX_SPACE_LENGTH { - ctx.set_error( - i, - format!("space length is too big, max is: {}", MAX_SPACE_LENGTH), - ); - break; - } - total_space += times; - offsets.push(total_space); - } - if ctx.errors.is_some() { - offsets.truncate(1); - total_space = 0; - } - let col = StringColumnBuilder { - data: " ".repeat(total_space as usize).into_bytes(), - offsets, - need_estimated: false, + vectorize_with_builder_1_arg::, StringType>(|times, output, ctx| { + if times > MAX_SPACE_LENGTH { + ctx.set_error( + output.len(), + format!("space length is too big, max is: {}", MAX_SPACE_LENGTH), + ); + } else { + for _ in 0..times { + output.put_char(' '); } - .build(); - Value::Column(col) } - }, + output.commit_row(); + }), ); registry.register_passthrough_nullable_2_arg::, StringType, _, _>( diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs index 6db4b9188601..f554b6157f38 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs @@ -98,8 +98,10 @@ impl RowConverter for CommonRowConverter { let (_, validity) = c.validity(); let col = c.remove_nullable(); let col = col.as_variant().unwrap(); - let mut builder = - BinaryColumnBuilder::with_capacity(col.len(), col.data().len()); + let mut builder = BinaryColumnBuilder::with_capacity( + col.len(), + col.current_buffer_len(), + ); for (i, val) in col.iter().enumerate() { if let Some(validity) = validity { if unsafe { !validity.get_bit_unchecked(i) } { diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index b8833274a1e9..fd5ef8c2f906 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -197,10 +197,6 @@ impl PipelineBuilder { self.main_pipeline.output_len(), barrier, )?); - let mut has_string_column = false; - for field in join.output_schema()?.fields() { - has_string_column |= field.data_type().is_string_column(); - } self.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create(TransformHashJoinProbe::create( @@ -212,7 +208,6 @@ impl PipelineBuilder { self.func_ctx.clone(), &join.join_type, !join.non_equi_conditions.is_empty(), - has_string_column, )?)) })?; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index f6694abadd46..b5a24f7a1b34 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -515,8 +515,8 @@ impl HashJoinBuildState { let space_size = match &keys_state { // safe to unwrap(): offset.len() >= 1. - KeysState::Column(Column::Binary(col) | Column::Variant(col) | Column::Bitmap(col)) => col.offsets().last().unwrap(), - KeysState::Column(Column::String(col) ) => col.offsets().last().unwrap(), + KeysState::Column(Column::Binary(col) | Column::Variant(col) | Column::Bitmap(col)) => col.current_buffer_len(), + KeysState::Column(Column::String(col) ) => col.current_buffer_len(), // The function `build_keys_state` of both HashMethodSerializer and HashMethodSingleString // must return `Column::Binary` | `Column::String` | `Column::Variant` | `Column::Bitmap`. _ => unreachable!(), @@ -528,7 +528,7 @@ impl HashJoinBuildState { let mut entry_local_space: Vec = Vec::with_capacity(valid_num * entry_size); let mut string_local_space: Vec = - Vec::with_capacity(*space_size as usize); + Vec::with_capacity(space_size as usize); let mut raw_entry_ptr = unsafe { std::mem::transmute::<*mut u8, *mut StringRawEntry>(entry_local_space.as_mut_ptr()) }; let mut string_local_space_ptr = string_local_space.as_mut_ptr(); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index 0c38cad7d69a..5e27be5eb94a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -508,7 +508,6 @@ impl HashJoinProbeState { &generation_state.build_columns, &generation_state.build_columns_data_type, &generation_state.build_num_rows, - &mut probe_state.generation_state.string_items_buf, )?; if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { @@ -597,7 +596,6 @@ impl HashJoinProbeState { &generation_state.build_columns, &generation_state.build_columns_data_type, &generation_state.build_num_rows, - &mut probe_state.generation_state.string_items_buf, )?); build_indexes_idx = 0; } @@ -659,7 +657,6 @@ impl HashJoinProbeState { &generation_state.build_columns, &generation_state.build_columns_data_type, &generation_state.build_num_rows, - &mut probe_state.generation_state.string_items_buf, )?); build_indexes_idx = 0; } @@ -747,7 +744,6 @@ impl HashJoinProbeState { &generation_state.build_columns, &generation_state.build_columns_data_type, &generation_state.build_num_rows, - &mut probe_state.generation_state.string_items_buf, )?; result_blocks.push(self.merge_eq_block( Some(build_block), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs index 26af688afad6..3d5cdbaf0719 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs @@ -344,10 +344,7 @@ impl TransformHashJoinProbe { { let end = (interval.1 - chunk_start).min(start + self.max_block_size as u32 - 1); let range = (start..=end).collect::>(); - let data_block = chunk_block.take( - &range, - &mut self.probe_state.generation_state.string_items_buf, - )?; + let data_block = chunk_block.take(&range)?; assert!(!data_block.is_empty()); let (segment_idx, block_idx) = split_prefix(prefix); info!( diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs index 32294afc57b7..51bcbbbb4cc6 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs @@ -210,11 +210,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -224,7 +220,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs index 813e5b168c1c..6e42f3c92fb2 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs @@ -79,11 +79,7 @@ impl HashJoinProbeState { )); } - let result_block = DataBlock::take( - &process_state.input, - &probe_indexes[0..count], - &mut probe_state.generation_state.string_items_buf, - )?; + let result_block = DataBlock::take(&process_state.input, &probe_indexes[0..count])?; probe_state.process_state = None; @@ -237,7 +233,6 @@ impl HashJoinProbeState { result_blocks.push(DataBlock::take( &process_state.input, &probe_indexes[0..unmatched_idx], - &mut probe_state.generation_state.string_items_buf, )?); } @@ -266,11 +261,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -280,7 +271,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs index f8dc4a100396..6afb2d80ba34 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs @@ -372,11 +372,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - let mut probe_block = DataBlock::take( - input, - &probe_indexes[0..unmatched_idx], - &mut probe_state.string_items_buf, - )?; + let mut probe_block = DataBlock::take(input, &probe_indexes[0..unmatched_idx])?; // For full join, wrap nullable for probe block if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { let nullable_probe_columns = probe_block @@ -435,11 +431,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - let mut probe_block = DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?; + let mut probe_block = DataBlock::take(input, &probe_indexes[0..matched_idx])?; // For full join, wrap nullable for probe block if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { let nullable_probe_columns = probe_block @@ -459,7 +451,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?; // For left or full join, wrap nullable for build block. let nullable_columns = if build_state.build_num_rows == 0 { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs index 23f47e26512f..0292ec9eb554 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs @@ -341,11 +341,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -355,7 +351,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs index 1f98cc837261..bae30ec7ccd4 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs @@ -82,7 +82,6 @@ impl HashJoinProbeState { result_blocks.push(DataBlock::take( &process_state.input, &probe_indexes[0..matched_idx], - &mut probe_state.generation_state.string_items_buf, )?); } @@ -232,7 +231,6 @@ impl HashJoinProbeState { result_blocks.push(DataBlock::take( &process_state.input, &probe_indexes[0..matched_idx], - &mut probe_state.generation_state.string_items_buf, )?); } @@ -261,11 +259,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -275,7 +269,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs index f5730d477934..cac83b169073 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs @@ -242,11 +242,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - let probe_block = DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?; + let probe_block = DataBlock::take(input, &probe_indexes[0..matched_idx])?; // The join type is right join, we need to wrap nullable for probe side. let nullable_columns = probe_block @@ -264,7 +260,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark_join.rs index 270a67cfe9cb..372e46aa0b03 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark_join.rs @@ -276,11 +276,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -290,7 +286,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs index 95098d589c17..d9d345d5fa49 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs @@ -282,11 +282,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -296,7 +292,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs index 87d7c063b1df..87ac376e47da 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs @@ -85,7 +85,6 @@ impl ProbeState { max_block_size: usize, join_type: &JoinType, with_conjunction: bool, - has_string_column: bool, func_ctx: FunctionContext, other_predicate: Option, ) -> Self { @@ -134,7 +133,7 @@ impl ProbeState { process_state: None, max_block_size, mutable_indexes: MutableIndexes::new(max_block_size), - generation_state: ProbeBlockGenerationState::new(max_block_size, has_string_column), + generation_state: ProbeBlockGenerationState::new(max_block_size), selection: vec![0; max_block_size], hashes: vec![0; max_block_size], selection_count: 0, @@ -185,20 +184,13 @@ pub struct ProbeBlockGenerationState { pub(crate) is_probe_projected: bool, // When we need a bitmap that is all true, we can directly slice it to reduce memory usage. pub(crate) true_validity: Bitmap, - // The string_items_buf is used as a buffer to reduce memory allocation when taking [u8] Columns. - pub(crate) string_items_buf: Option>, } impl ProbeBlockGenerationState { - fn new(size: usize, has_string_column: bool) -> Self { + fn new(size: usize) -> Self { Self { is_probe_projected: false, true_validity: Bitmap::new_constant(true, size), - string_items_buf: if has_string_column { - Some(vec![(0, 0); size]) - } else { - None - }, } } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs index 776c88dfeb8b..059e4ed2483a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs @@ -65,7 +65,6 @@ impl RowSpace { build_columns: &[ColumnVec], build_columns_data_type: &[DataType], num_rows: &usize, - string_items_buf: &mut Option>, ) -> Result { if *num_rows != 0 { let data_block = DataBlock::take_column_vec( @@ -73,7 +72,6 @@ impl RowSpace { build_columns_data_type, row_ptrs, row_ptrs.len(), - string_items_buf, ); Ok(data_block) } else { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index c478a19f5ebf..cd113272c8b3 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -126,7 +126,6 @@ impl TransformHashJoinProbe { func_ctx: FunctionContext, join_type: &JoinType, with_conjunct: bool, - has_string_column: bool, ) -> Result> { join_probe_state.probe_attach(); // Create a hash join spiller. @@ -153,7 +152,6 @@ impl TransformHashJoinProbe { max_block_size, join_type, with_conjunct, - has_string_column, func_ctx, other_predicate, ); diff --git a/src/query/storages/common/index/src/bloom_index.rs b/src/query/storages/common/index/src/bloom_index.rs index a5614b8b2a73..8763f35ae9a5 100644 --- a/src/query/storages/common/index/src/bloom_index.rs +++ b/src/query/storages/common/index/src/bloom_index.rs @@ -545,7 +545,7 @@ impl BloomIndex { /// If it does, the bloom index for the column will not be established. fn check_large_string(column: &Column) -> bool { if let Column::String(v) = &column { - let bytes_per_row = v.data().len() / v.len().max(1); + let bytes_per_row = v.current_buffer_len() / v.len().max(1); if bytes_per_row > 256 { return true; } diff --git a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs index 4d4674b50d4a..066a418b626d 100644 --- a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs +++ b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs @@ -150,8 +150,8 @@ pub(crate) fn update_bitmap_with_bloom_filter( } idx += 1; }), - KeysState::Column(Column::String(col)) => col.iter_binary().for_each(|key| { - let hash = key.fast_hash(); + KeysState::Column(Column::String(col)) => col.iter().for_each(|key| { + let hash = key.as_bytes().fast_hash(); if filter.contains(&hash) { bitmap.set(idx, true); } diff --git a/src/query/storages/fuse/src/statistics/cluster_statistics.rs b/src/query/storages/fuse/src/statistics/cluster_statistics.rs index 01b2316a70e0..0b72a4794aee 100644 --- a/src/query/storages/fuse/src/statistics/cluster_statistics.rs +++ b/src/query/storages/fuse/src/statistics/cluster_statistics.rs @@ -106,7 +106,7 @@ impl ClusterStatsGenerator { if !self.cluster_key_index.is_empty() { let indices = vec![0u32, block.num_rows() as u32 - 1]; - block = block.take(&indices, &mut None)?; + block = block.take(&indices)?; } block = self diff --git a/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs b/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs index 199900f4d283..8a1c15e1d329 100644 --- a/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs +++ b/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs @@ -186,16 +186,9 @@ impl RowDecoder for CsvDecoder { Ok(vec![]) } - fn flush(&self, columns: Vec, num_rows: usize) -> Vec { + fn flush(&self, columns: Vec, _num_rows: usize) -> Vec { if let Some(projection) = &self.load_context.pos_projection { - let empty_strings = Column::String( - StringColumnBuilder { - need_estimated: false, - data: vec![], - offsets: vec![0; num_rows + 1], - } - .build(), - ); + let empty_strings = Column::String(StringColumnBuilder::with_capacity(0, 0).build()); columns .into_iter() .enumerate()