Skip to content

Commit

Permalink
Add basic segmentation capabilities to zffstreamer;
Browse files Browse the repository at this point in the history
  • Loading branch information
ph0llux committed Oct 28, 2024
1 parent b286746 commit 199e52c
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 10 deletions.
3 changes: 3 additions & 0 deletions src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ pub enum ZffErrorKind {
MissmatchIdentifier,
/// Error will be returned if the binary search fails (e.g. if the set is empty or malformed).
BinarySearchError,
/// Error will be returned if the segmentation has not finished.
SegmentNotFinished,
}

impl fmt::Display for ZffErrorKind {
Expand Down Expand Up @@ -224,6 +226,7 @@ impl fmt::Display for ZffErrorKind {
ZffErrorKind::UnknownMetadataExtendedType => "UnknownMetadataExtendedType",
ZffErrorKind::MissmatchIdentifier => "MissmatchIdentifier",
ZffErrorKind::BinarySearchError => "BinarySearchError",
ZffErrorKind::SegmentNotFinished => "SegmentNotFinished",
};
write!(f, "{}", err_msg)
}
Expand Down
162 changes: 152 additions & 10 deletions src/lib/io/zffstreamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use super::*;

// - internal
use crate::{
footer::SegmentFooter, header::{ChunkMapType, ChunkMaps, SegmentHeader},
HeaderCoding,
footer::SegmentFooter, header::{ChunkMapType, ChunkMaps, SegmentHeader}, HeaderCoding
};

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -50,6 +49,21 @@ enum PreparedDataQueueState {
None,
}

#[derive(Debug, Clone)]
enum SegmentationState {
Full(u64), // current segment number
Partial(u64), // current segment number
Finished(u64), // current segment number
FullLastSegment, // the last segment is full
FinishedLastSegment, // the last segment is finished
}

impl Default for SegmentationState {
fn default() -> Self {
SegmentationState::Partial(INITIAL_SEGMENT_NUMBER)
}
}

// - external
#[cfg(feature = "log")]
use log::trace;
Expand Down Expand Up @@ -112,6 +126,7 @@ pub struct ZffStreamer<R: Read> {
optional_parameters: ZffCreationParameters,
in_progress_data: ZffStreamerInProgressData,
read_state: ReadState,
segmentation_state: SegmentationState,
}

impl<R: Read> ZffStreamer<R> {
Expand Down Expand Up @@ -158,6 +173,7 @@ impl<R: Read> ZffStreamer<R> {
object_encoder,
current_object_encoder,
in_progress_data: build_in_progress_data(&params),
segmentation_state: SegmentationState::default(),
optional_parameters: params,
})
}
Expand Down Expand Up @@ -194,6 +210,27 @@ impl<R: Read> ZffStreamer<R> {
total_files
}

/// sets the next segment.
pub fn next_segment(&mut self) -> Result<()> {
// check if the current segment is already finished
match self.segmentation_state {
SegmentationState::Partial(_) => return Err(ZffError::new(ZffErrorKind::SegmentNotFinished, "")),
SegmentationState::Full(_) => return Err(ZffError::new(ZffErrorKind::SegmentNotFinished, "")),
SegmentationState::Finished(segment_number) => {
self.segmentation_state = SegmentationState::Partial(segment_number+1);
self.read_state = ReadState::SegmentHeader;
self.in_progress_data.encoded_segment_header_read_bytes = ReadBytes::NotRead;
self.in_progress_data.encoded_segment_header = SegmentHeader::new(
self.optional_parameters.unique_identifier,
segment_number+1,
self.optional_parameters.chunkmap_size.unwrap_or(DEFAULT_CHUNKMAP_SIZE)
).encode_directly();
},
SegmentationState::FullLastSegment => return Err(ZffError::new(ZffErrorKind::SegmentNotFinished, "")),
SegmentationState::FinishedLastSegment => return Err(ZffError::new(ZffErrorKind::NoObjectsLeft, "")),
}
Ok(())
}

/// Returns true if the chunkmap was full and flushed.
fn check_chunkmap_is_full_and_flush(&mut self, chunk_map_type: ChunkMapType) -> bool {
Expand Down Expand Up @@ -313,13 +350,24 @@ impl<R: Read> ZffStreamer<R> {

impl<R: Read> Read for ZffStreamer<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self.segmentation_state {
SegmentationState::Finished(_) => {
return Ok(0);
},
SegmentationState::FinishedLastSegment => {
return Ok(0);
},
_ => {},
}

let mut bytes_written_to_buffer = 0; // the number of bytes which are written to the current buffer,

// may improve performance in different cases.
let buf_len = buf.len();
'read_loop: loop {
match self.read_state {
ReadState::SegmentHeader => {
#[cfg(feature = "log")]
trace!("ReadState::SegmentHeader");
// reads the segment header if not already read
// this is the initial state.
Expand All @@ -334,14 +382,22 @@ impl<R: Read> Read for ZffStreamer<R> {
};

// switch to the next state
self.read_state = ReadState::ObjectHeader;
match self.in_progress_data.current_encoded_object_header_read_bytes {
ReadBytes::Finished => {
self.read_state = ReadState::Chunking;
},
_ => {
self.read_state = ReadState::ObjectHeader;
},
}
// prepare the current object header
self.in_progress_data.current_encoded_object_header = self.current_object_encoder.get_encoded_header();
self.in_progress_data.segment_footer.add_object_header_offset(
self.current_object_encoder.obj_number(),
self.in_progress_data.bytes_read);
},
ReadState::ObjectHeader => {
#[cfg(feature = "log")]
trace!("ReadState::ObjectHeader");
// reads the current object header if not already read
let read_bytes = fill_buffer(
Expand All @@ -358,6 +414,7 @@ impl<R: Read> Read for ZffStreamer<R> {
self.read_state = ReadState::ChunkOffsetMap;
},
ReadState::ChunkOffsetMap => {
#[cfg(feature = "log")]
trace!("ReadState::ChunkOffsetMap");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -371,10 +428,17 @@ impl<R: Read> Read for ZffStreamer<R> {
};

// switch to the next state
self.read_state = ReadState::Chunking;
match self.segmentation_state {
SegmentationState::Partial(_) => self.read_state = ReadState::Chunking,
SegmentationState::Full(_) => self.read_state = ReadState::ChunkSizeMap,
SegmentationState::Finished(_) => unreachable!(),
SegmentationState::FullLastSegment => unreachable!(),
SegmentationState::FinishedLastSegment => unreachable!(),
};
},

ReadState::ChunkSizeMap => {
#[cfg(feature = "log")]
trace!("ReadState::ChunkSizeMap");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -388,10 +452,17 @@ impl<R: Read> Read for ZffStreamer<R> {
};

// switch to the next state
self.read_state = ReadState::Chunking;
match self.segmentation_state {
SegmentationState::Partial(_) => self.read_state = ReadState::Chunking,
SegmentationState::Full(_) => self.read_state = ReadState::ChunkFlagsMap,
SegmentationState::Finished(_) => unreachable!(),
SegmentationState::FullLastSegment => unreachable!(),
SegmentationState::FinishedLastSegment => unreachable!(),
};
},

ReadState::ChunkFlagsMap => {
#[cfg(feature = "log")]
trace!("ReadState::ChunkFlagsMap");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -405,10 +476,17 @@ impl<R: Read> Read for ZffStreamer<R> {
};

// switch to the next state
self.read_state = ReadState::Chunking;
match self.segmentation_state {
SegmentationState::Partial(_) => self.read_state = ReadState::Chunking,
SegmentationState::Full(_) => self.read_state = ReadState::ChunkCrcMap,
SegmentationState::Finished(_) => unreachable!(),
SegmentationState::FullLastSegment => unreachable!(),
SegmentationState::FinishedLastSegment => unreachable!(),
};
},

ReadState::ChunkCrcMap => {
#[cfg(feature = "log")]
trace!("ReadState::ChunkCrcMap");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -422,10 +500,17 @@ impl<R: Read> Read for ZffStreamer<R> {
};

// switch to the next state
self.read_state = ReadState::Chunking;
match self.segmentation_state {
SegmentationState::Partial(_) => self.read_state = ReadState::Chunking,
SegmentationState::Full(_) => self.read_state = ReadState::ChunkSamebytesMap,
SegmentationState::Finished(_) => unreachable!(),
SegmentationState::FullLastSegment => unreachable!(),
SegmentationState::FinishedLastSegment => unreachable!(),
};
},

ReadState::ChunkSamebytesMap => {
#[cfg(feature = "log")]
trace!("ReadState::ChunkSamebytesMap");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -438,10 +523,17 @@ impl<R: Read> Read for ZffStreamer<R> {
return Ok(bytes_written_to_buffer);
};
// switch to the next state
self.read_state = ReadState::Chunking;
match self.segmentation_state {
SegmentationState::Partial(_) => self.read_state = ReadState::Chunking,
SegmentationState::Full(_) => self.read_state = ReadState::ChunkDeduplicationMap,
SegmentationState::Finished(_) => unreachable!(),
SegmentationState::FullLastSegment => unreachable!(),
SegmentationState::FinishedLastSegment => unreachable!(),
};
},

ReadState::ChunkDeduplicationMap => {
#[cfg(feature = "log")]
trace!("ReadState::ChunkDeduplicationMap");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -455,10 +547,17 @@ impl<R: Read> Read for ZffStreamer<R> {
};

// switch to the next state
self.read_state = ReadState::Chunking;
match self.segmentation_state {
SegmentationState::Partial(_) => self.read_state = ReadState::Chunking,
SegmentationState::Full(_) => self.read_state = ReadState::SegmentFooter,
SegmentationState::Finished(_) => unreachable!(),
SegmentationState::FullLastSegment => unreachable!(),
SegmentationState::FinishedLastSegment => unreachable!(),
};
},

ReadState::LastChunkOffsetMapOfObject => {
#[cfg(feature = "log")]
trace!("ReadState::LastChunkOffsetMapOfObject");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -479,6 +578,7 @@ impl<R: Read> Read for ZffStreamer<R> {
},

ReadState::LastChunkSizeMapOfObject => {
#[cfg(feature = "log")]
trace!("ReadState::LastChunkSizeMapOfObject");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -499,6 +599,7 @@ impl<R: Read> Read for ZffStreamer<R> {
},

ReadState::LastChunkFlagsMapOfObject => {
#[cfg(feature = "log")]
trace!("ReadState::LastChunkFlagsMapOfObject");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -519,6 +620,7 @@ impl<R: Read> Read for ZffStreamer<R> {
},

ReadState::LastChunkCrcMapOfObject => {
#[cfg(feature = "log")]
trace!("ReadState::LastChunkCrcMapOfObject");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -539,6 +641,7 @@ impl<R: Read> Read for ZffStreamer<R> {
},

ReadState::LastChunkSamebytesMapOfObject => {
#[cfg(feature = "log")]
trace!("ReadState::LastChunkSamebytesMapOfObject");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand All @@ -559,6 +662,7 @@ impl<R: Read> Read for ZffStreamer<R> {
},

ReadState::LastChunkDeduplicationMapOfObject => {
#[cfg(feature = "log")]
trace!("ReadState::LastChunkDeduplicationMapOfObject");
// reads the chunkmap if not already read
let read_bytes = fill_buffer(
Expand Down Expand Up @@ -586,9 +690,33 @@ impl<R: Read> Read for ZffStreamer<R> {
},

ReadState::Chunking => {
#[cfg(feature = "log")]
trace!("ReadState::Chunking");
let current_chunk_number = self.current_object_encoder.current_chunk_number();

// check the read bytes (for segment length)
// TODO: calculate also the sizes of the current chunkmaps
let read_bytes_segment = match self.segmentation_state {
SegmentationState::Partial(segment_number) => self.in_progress_data.bytes_read - self.in_progress_data.bytes_read*(segment_number-1),
_ => unreachable!(),
};
if read_bytes_segment >= self.optional_parameters.target_segment_size.unwrap_or(u64::MAX) {
// set the appropriate segmentation state to full (with the next segment number)
self.segmentation_state = match self.segmentation_state {
SegmentationState::Partial(segment_number) => SegmentationState::Full(segment_number+1),
_ => unreachable!(),
};
// flush all chunkmaps
self.flush_chunkmap(ChunkMapType::OffsetMap);
self.flush_chunkmap(ChunkMapType::SizeMap);
self.flush_chunkmap(ChunkMapType::FlagsMap);
self.flush_chunkmap(ChunkMapType::CRCMap);
self.flush_chunkmap(ChunkMapType::SamebytesMap);
self.flush_chunkmap(ChunkMapType::DeduplicationMap);
self.read_state = ReadState::ChunkOffsetMap;
continue;
}

// reads the chunking data
let read_bytes = fill_buffer(
&self.in_progress_data.current_encoded_chunked_data,
Expand Down Expand Up @@ -778,6 +906,7 @@ impl<R: Read> Read for ZffStreamer<R> {
}
},
ReadState::ObjectFooter => {
#[cfg(feature = "log")]
trace!("ReadState::ObjectFooter");
// reads the current object footer if not already read
let read_bytes = fill_buffer(
Expand All @@ -804,6 +933,7 @@ impl<R: Read> Read for ZffStreamer<R> {
self.in_progress_data.main_footer.encode_directly().len() as u64);
self.in_progress_data.current_encoded_segment_footer = self.in_progress_data.segment_footer.encode_directly();
self.in_progress_data.current_encoded_segment_footer_read_bytes = ReadBytes::NotRead;
self.segmentation_state = SegmentationState::FullLastSegment;
self.read_state = ReadState::SegmentFooter;
continue;
}
Expand All @@ -814,6 +944,7 @@ impl<R: Read> Read for ZffStreamer<R> {
self.in_progress_data.bytes_read);
},
ReadState::SegmentFooter => {
#[cfg(feature = "log")]
trace!("ReadState::SegmentFooter");
// reads the current segment footer if not already read
let read_bytes = fill_buffer(
Expand All @@ -832,9 +963,16 @@ impl<R: Read> Read for ZffStreamer<R> {
self.in_progress_data.encoded_main_footer = self.in_progress_data.main_footer.encode_directly();
self.in_progress_data.encoded_main_footer_read_bytes = ReadBytes::NotRead;

self.read_state = ReadState::MainFooter;
match self.segmentation_state {
SegmentationState::Full(segment_number) => self.segmentation_state = SegmentationState::Finished(segment_number),
SegmentationState::Partial(_) => unreachable!(),
SegmentationState::Finished(_) => unreachable!(),
SegmentationState::FullLastSegment => self.read_state = ReadState::MainFooter,
SegmentationState::FinishedLastSegment => unreachable!(),
}
},
ReadState::MainFooter => {
#[cfg(feature = "log")]
trace!("ReadState::MainFooter");
// reads the main footer if not already read
let read_bytes = fill_buffer(
Expand All @@ -843,6 +981,9 @@ impl<R: Read> Read for ZffStreamer<R> {
buf,
&mut bytes_written_to_buffer)?;
self.in_progress_data.bytes_read += read_bytes as u64;
if bytes_written_to_buffer < buf_len {
self.segmentation_state = SegmentationState::FinishedLastSegment;
}
return Ok(bytes_written_to_buffer);
}
}
Expand All @@ -852,6 +993,7 @@ impl<R: Read> Read for ZffStreamer<R> {

fn build_in_progress_data(params: &ZffCreationParameters) -> ZffStreamerInProgressData {
let mut in_progress_data = ZffStreamerInProgressData::new();
in_progress_data.main_footer.description_notes = params.description_notes.clone();

// setup default chunkmap_size if not set in parameters
let chunkmap_size = params.chunkmap_size.unwrap_or(DEFAULT_CHUNKMAP_SIZE);
Expand Down

0 comments on commit 199e52c

Please sign in to comment.