Skip to content

Commit

Permalink
fix(s3): restore working test for DynamoDb log store repair log on re…
Browse files Browse the repository at this point in the history
…ad (#2120)

# Description

Make sure the read path for delta table commit entries passes through
the log store, enabling it to ensure the invariants and potentially
repair a broken commit in the context of S3 / DynamoDb log store
implementation.

This also adds another test in the context of S3 log store: repairing a
log store on load was not implemented previously.
  
Note that this a stopgap and not a complete solution: it comes with a
performance penalty as we're triggering a redundant object store list
operation just for the purpose of "triggering" the log store
functionality.


fixes #2109

---------

Co-authored-by: Ion Koutsouris <[email protected]>
Co-authored-by: R. Tyler Croy <[email protected]>
  • Loading branch information
3 people committed Jan 31, 2024
1 parent 893d631 commit d1b24f5
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 28 deletions.
14 changes: 14 additions & 0 deletions crates/aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ impl LogStore for S3DynamoDbLogStore {
self.table_path.clone()
}

async fn refresh(&self) -> DeltaResult<()> {
let entry = self
.lock_client
.get_latest_entry(&self.table_path)
.await
.map_err(|err| DeltaTableError::GenericError {
source: Box::new(err),
})?;
if let Some(entry) = entry {
self.repair_entry(&entry).await?;
}
Ok(())
}

async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
let entry = self
.lock_client
Expand Down
14 changes: 13 additions & 1 deletion crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ async fn test_repair_commit_entry() -> TestResult<()> {

#[tokio::test]
#[serial]
#[ignore = "https://github.com/delta-io/delta-rs/issues/2109"]
async fn test_repair_on_update() -> TestResult<()> {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let mut table = prepare_table(&context, "repair_on_update").await?;
Expand All @@ -168,6 +167,19 @@ async fn test_repair_on_update() -> TestResult<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn test_repair_on_load() -> TestResult<()> {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let mut table = prepare_table(&context, "repair_on_update").await?;
let _entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?;
table.load_version(1).await?;
// table should fix the broken entry while loading a specific version
assert_eq!(table.version(), 1);
validate_lock_table_state(&table, 1).await?;
Ok(())
}

const WORKERS: i64 = 3;
const COMMITS: i64 = 15;

Expand Down
13 changes: 10 additions & 3 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tracing::debug;

use super::parse;
use crate::kernel::{arrow::json, Action, ActionType, Metadata, Protocol, Schema, StructType};
use crate::logstore::LogStore;
use crate::operations::transaction::get_commit_bytes;
use crate::protocol::DeltaOperation;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};
Expand Down Expand Up @@ -148,15 +149,21 @@ impl LogSegment {
table_root: &Path,
start_version: i64,
end_version: Option<i64>,
store: &dyn ObjectStore,
log_store: &dyn LogStore,
) -> DeltaResult<Self> {
debug!(
"try_new_slice: start_version: {}, end_version: {:?}",
start_version, end_version
);
log_store.refresh().await?;
let log_url = table_root.child("_delta_log");
let (mut commit_files, checkpoint_files) =
list_log_files(store, &log_url, end_version, Some(start_version)).await?;
let (mut commit_files, checkpoint_files) = list_log_files(
log_store.object_store().as_ref(),
&log_url,
end_version,
Some(start_version),
)
.await?;
// remove all files above requested version
if let Some(version) = end_version {
commit_files.retain(|meta| meta.location.commit_version() <= Some(version));
Expand Down
23 changes: 11 additions & 12 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use self::parse::{read_adds, read_removes};
use self::replay::{LogMapper, LogReplayScanner, ReplayStream};
use super::{Action, Add, CommitInfo, DataType, Metadata, Protocol, Remove, StructField};
use crate::kernel::StructType;
use crate::logstore::LogStore;
use crate::table::config::TableConfig;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

Expand Down Expand Up @@ -108,41 +109,39 @@ impl Snapshot {
/// Update the snapshot to the given version
pub async fn update(
&mut self,
store: Arc<dyn ObjectStore>,
log_store: Arc<dyn LogStore>,
target_version: Option<i64>,
) -> DeltaResult<()> {
self.update_inner(store, target_version).await?;
self.update_inner(log_store, target_version).await?;
Ok(())
}

async fn update_inner(
&mut self,
store: Arc<dyn ObjectStore>,
log_store: Arc<dyn LogStore>,
target_version: Option<i64>,
) -> DeltaResult<Option<LogSegment>> {
if let Some(version) = target_version {
if version == self.version() {
return Ok(None);
}
if version < self.version() {
return Err(DeltaTableError::Generic(
"Cannoit downgrade snapshot".into(),
));
return Err(DeltaTableError::Generic("Cannot downgrade snapshot".into()));
}
}
let log_segment = LogSegment::try_new_slice(
&Path::default(),
self.version() + 1,
target_version,
store.as_ref(),
log_store.as_ref(),
)
.await?;
if log_segment.commit_files.is_empty() && log_segment.checkpoint_files.is_empty() {
return Ok(None);
}

let (protocol, metadata) = log_segment
.read_metadata(store.clone(), &self.config)
.read_metadata(log_store.object_store().clone(), &self.config)
.await?;
if let Some(protocol) = protocol {
self.protocol = protocol;
Expand Down Expand Up @@ -376,20 +375,20 @@ impl EagerSnapshot {
/// Update the snapshot to the given version
pub async fn update(
&mut self,
store: Arc<dyn ObjectStore>,
log_store: Arc<dyn LogStore>,
target_version: Option<i64>,
) -> DeltaResult<()> {
if Some(self.version()) == target_version {
return Ok(());
}
let new_slice = self
.snapshot
.update_inner(store.clone(), target_version)
.update_inner(log_store.clone(), target_version)
.await?;
if let Some(new_slice) = new_slice {
let files = std::mem::take(&mut self.files);
let log_stream = new_slice.commit_stream(
store.clone(),
log_store.object_store().clone(),
&log_segment::COMMIT_SCHEMA,
&self.snapshot.config,
)?;
Expand All @@ -398,7 +397,7 @@ impl EagerSnapshot {
} else {
new_slice
.checkpoint_stream(
store,
log_store.object_store(),
&log_segment::CHECKPOINT_SCHEMA,
&self.snapshot.config,
)
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ pub trait LogStore: Sync + Send {
/// Return the name of this LogStore implementation
fn name(&self) -> String;

/// Trigger sync operation on log store to.
async fn refresh(&self) -> DeltaResult<()> {
Ok(())
}

/// Read data for commit entry with the given version.
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>>;

Expand Down
6 changes: 1 addition & 5 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,7 @@ impl DeltaTable {
self.version(),
);
match self.state.as_mut() {
Some(state) => {
state
.update(self.log_store.object_store(), max_version)
.await
}
Some(state) => state.update(self.log_store.clone(), max_version).await,
_ => {
let state = DeltaTableState::try_new(
&Path::default(),
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::kernel::{
Action, Add, DataType, EagerSnapshot, LogDataHandler, LogicalFile, Metadata, Protocol, Remove,
StructType,
};
use crate::logstore::LogStore;
use crate::partitions::{DeltaTablePartition, PartitionFilter};
use crate::protocol::DeltaOperation;
use crate::{DeltaResult, DeltaTableError};
Expand Down Expand Up @@ -196,10 +197,10 @@ impl DeltaTableState {
/// Update the state of the table to the given version.
pub async fn update(
&mut self,
store: Arc<dyn ObjectStore>,
log_store: Arc<dyn LogStore>,
version: Option<i64>,
) -> Result<(), DeltaTableError> {
self.snapshot.update(store, version).await?;
self.snapshot.update(log_store, version).await?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ def delete_dynamodb_table(table_name: str):

@pytest.fixture
def setup():
os.environ['AWS_ENDPOINT_URL'] = 'http://localhost:4566'
os.environ['AWS_REGION'] = 'us-east-1'
os.environ['AWS_ACCESS_KEY_ID'] = 'deltalake'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'weloverust'
os.environ['AWS_ALLOW_HTTP'] = 'true'
os.environ["AWS_ENDPOINT_URL"] = "http://localhost:4566"
os.environ["AWS_REGION"] = "us-east-1"
os.environ["AWS_ACCESS_KEY_ID"] = "deltalake"
os.environ["AWS_SECRET_ACCESS_KEY"] = "weloverust"
os.environ["AWS_ALLOW_HTTP"] = "true"
id = uuid.uuid4()
bucket_name = f"delta-rs-integration-{id}"
bucket_url = f"s3://{bucket_name}"
Expand Down

0 comments on commit d1b24f5

Please sign in to comment.