Skip to content

Commit

Permalink
Allow writing BYTE_ARRAY with converted type NONE
Browse files Browse the repository at this point in the history
This allows to store binary data of arbitrary length in a parquet file,
without having to wrongly declare it as UTF-8.

Fixes the writer part of #42971

The reader part has already been fixed in 4d82549
and this uses a similar implementation, but with a stricter set of
"exceptions" (only byte arrays with NONE type are allowed).
  • Loading branch information
pulkomandy committed Nov 15, 2024
1 parent df40f7a commit ae87ec3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
26 changes: 22 additions & 4 deletions cpp/src/parquet/stream_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ constexpr int16_t StreamWriter::kDefLevelOne;
constexpr int16_t StreamWriter::kRepLevelZero;
constexpr int64_t StreamWriter::kBatchSizeOne;

// The converted type may be NONE to store raw data in a byte array
// The following is a list of converted types which are allowed instead of the
// expected converted type.
// Each pair given is:
// {<StreamReader expected type>, <Parquet file converted type>}
// So for example {ConvertedType::INT_32, ConvertedType::NONE} means
// that if the StreamWriter was expecting the converted type INT_32,
// then it will allow to write a Parquet file using the converted type
// NONE.
//
static const std::set<std::pair<ConvertedType::type, ConvertedType::type>>
converted_type_exceptions = {{ConvertedType::UTF8, ConvertedType::NONE}};

StreamWriter::FixedStringView::FixedStringView(const char* data_ptr)
: data{data_ptr}, size{std::strlen(data_ptr)} {}

Expand Down Expand Up @@ -198,10 +211,15 @@ void StreamWriter::CheckColumn(Type::type physical_type,
"' not '" + TypeToString(physical_type) + "'");
}
if (converted_type != node->converted_type()) {
throw ParquetException("Column converted type mismatch. Column '" + node->name() +
"' has converted type[" +
ConvertedTypeToString(node->converted_type()) + "] not '" +
ConvertedTypeToString(converted_type) + "'");
// The converted type does not always match with the value
// provided so check the set of exceptions.
if (converted_type_exceptions.find({converted_type, node->converted_type()}) ==
converted_type_exceptions.end()) {
throw ParquetException("Column converted type mismatch. Column '" + node->name() +
"' has converted type[" +
ConvertedTypeToString(node->converted_type()) + "] not '" +
ConvertedTypeToString(converted_type) + "'");
}
}
// Length must be exact.
// A shorter length fixed array is not acceptable as it would
Expand Down
23 changes: 20 additions & 3 deletions cpp/src/parquet/stream_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class TestStreamWriter : public ::testing::Test {
fields.push_back(schema::PrimitiveNode::Make("double_field", Repetition::REQUIRED,
Type::DOUBLE, ConvertedType::NONE));

fields.push_back(schema::PrimitiveNode::Make("bytes_field", Repetition::REQUIRED,
Type::BYTE_ARRAY, ConvertedType::NONE));

return std::static_pointer_cast<schema::GroupNode>(
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
}
Expand All @@ -99,7 +102,7 @@ TEST_F(TestStreamWriter, DefaultConstructed) {
EXPECT_EQ(0, os.current_column());
EXPECT_EQ(0, os.current_row());
EXPECT_EQ(0, os.num_columns());
EXPECT_EQ(0, os.SkipColumns(10));
EXPECT_EQ(0, os.SkipColumns(11));
}

TEST_F(TestStreamWriter, TypeChecking) {
Expand Down Expand Up @@ -162,6 +165,14 @@ TEST_F(TestStreamWriter, TypeChecking) {
EXPECT_THROW(writer_ << 5.4f, ParquetException);
EXPECT_NO_THROW(writer_ << 5.4);

// Required type: Variable length byte array.
EXPECT_EQ(10, writer_.current_column());
EXPECT_THROW(writer_ << 5, ParquetException);
EXPECT_THROW(writer_ << char3_array, ParquetException);
EXPECT_THROW(writer_ << char4_array, ParquetException);
EXPECT_THROW(writer_ << char5_array, ParquetException);
EXPECT_NO_THROW(writer_ << std::string_view("\xff\0ok", 4));

EXPECT_EQ(0, writer_.current_row());
EXPECT_NO_THROW(writer_ << EndRow);
EXPECT_EQ(1, writer_.current_row());
Expand Down Expand Up @@ -210,6 +221,10 @@ TEST_F(TestStreamWriter, RequiredFieldChecking) {
EXPECT_THROW(writer_ << optional<double>(), ParquetException);
EXPECT_NO_THROW(writer_ << optional<double>(5.4));

// Required field of type: Variable length byte array.
EXPECT_THROW(writer_ << optional<std::string>(), ParquetException);
EXPECT_NO_THROW(writer_ << optional<std::string>("ok"));

EXPECT_NO_THROW(writer_ << EndRow);
}

Expand All @@ -234,6 +249,7 @@ TEST_F(TestStreamWriter, EndRow) {
EXPECT_NO_THROW(writer_ << uint64_t((1ull << 60) + 123));
EXPECT_NO_THROW(writer_ << 25.4f);
EXPECT_NO_THROW(writer_ << 3.3424);
EXPECT_NO_THROW(writer_ << "ok");
// Correct use of end row after all fields have been output.
EXPECT_NO_THROW(writer_ << EndRow);
EXPECT_EQ(1, writer_.current_row());
Expand Down Expand Up @@ -272,6 +288,7 @@ TEST_F(TestStreamWriter, EndRowGroup) {
EXPECT_NO_THROW(writer_ << uint64_t((1ull << 60) - i * i)) << "index: " << i;
EXPECT_NO_THROW(writer_ << 42325.4f / float(i + 1)) << "index: " << i;
EXPECT_NO_THROW(writer_ << 3.2342e5 / double(i + 1)) << "index: " << i;
EXPECT_NO_THROW(writer_ << std::to_string(i)) << "index: " << i;
EXPECT_NO_THROW(writer_ << EndRow) << "index: " << i;

if (i % 1000 == 0) {
Expand All @@ -293,7 +310,7 @@ TEST_F(TestStreamWriter, SkipColumns) {
writer_ << true << std::string("Cannot skip mandatory columns");
EXPECT_THROW(writer_.SkipColumns(1), ParquetException);
writer_ << 'x' << std::array<char, 4>{'A', 'B', 'C', 'D'} << int8_t(2) << uint16_t(3)
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0;
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0 << "ok";
writer_ << EndRow;
}

Expand All @@ -304,7 +321,7 @@ TEST_F(TestStreamWriter, AppendNotImplemented) {
writer_ = StreamWriter{ParquetFileWriter::Open(outfile, GetSchema())};
writer_ << false << std::string("Just one row") << 'x'
<< std::array<char, 4>{'A', 'B', 'C', 'D'} << int8_t(2) << uint16_t(3)
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0;
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0 << "ok";
writer_ << EndRow;
writer_ = StreamWriter{};

Expand Down

0 comments on commit ae87ec3

Please sign in to comment.