Skip to content

Commit

Permalink
Merge branch 'main' into spark-integration-test
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Jan 30, 2024
2 parents c5bf076 + 467afc5 commit 1f9bcd8
Show file tree
Hide file tree
Showing 548 changed files with 319 additions and 368 deletions.
39 changes: 19 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,27 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "49" }
arrow-arith = { version = "49" }
arrow-array = { version = "49" }
arrow-buffer = { version = "49" }
arrow-cast = { version = "49" }
arrow-ipc = { version = "49" }
arrow-json = { version = "49" }
arrow-ord = { version = "49" }
arrow-row = { version = "49" }
arrow-schema = { version = "49" }
arrow-select = { version = "49" }
object_store = { version = "0.8" }
parquet = { version = "49" }
arrow = { version = "50" }
arrow-arith = { version = "50" }
arrow-array = { version = "50" }
arrow-buffer = { version = "50" }
arrow-cast = { version = "50" }
arrow-ipc = { version = "50" }
arrow-json = { version = "50" }
arrow-ord = { version = "50" }
arrow-row = { version = "50" }
arrow-schema = { version = "50" }
arrow-select = { version = "50" }
object_store = { version = "0.9" }
parquet = { version = "50" }

# datafusion
datafusion = { version = "34" }
datafusion-expr = { version = "34" }
datafusion-common = { version = "34" }
datafusion-proto = { version = "34" }
datafusion-sql = { version = "34" }
datafusion-physical-expr = { version = "34" }

datafusion = { version = "35" }
datafusion-expr = { version = "35" }
datafusion-common = { version = "35" }
datafusion-proto = { version = "35" }
datafusion-sql = { version = "35" }
datafusion-physical-expr = { version = "35" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
6 changes: 3 additions & 3 deletions crates/deltalake-aws/Cargo.toml → crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
deltalake-core = { path = "../deltalake-core" }
deltalake-core = { version = "0.17.0", path = "../core" }
rusoto_core = { version = "0.47", default-features = false, optional = true }
rusoto_credential = { version = "0.47" }
rusoto_sts = { version = "0.47", default-features = false, optional = true }
Expand All @@ -26,10 +26,10 @@ url = { workspace = true }
backoff = { version = "0.4", features = [ "tokio" ] }

[dev-dependencies]
deltalake-core = { path = "../deltalake-core", features = ["datafusion"] }
deltalake-core = { path = "../core", features = ["datafusion"] }
chrono = { workspace = true }
serial_test = "3"
deltalake-test = { path = "../deltalake-test" }
deltalake-test = { path = "../test" }
pretty_env_logger = "*"
rand = "0.8"
serde_json = { workspace = true }
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ impl LogStore for S3DynamoDbLogStore {
self.table_path.clone()
}

async fn refresh(&self) -> DeltaResult<()> {
let entry = self
.lock_client
.get_latest_entry(&self.table_path)
.await
.map_err(|err| DeltaTableError::GenericError {
source: Box::new(err),
})?;
if let Some(entry) = entry {
self.repair_entry(&entry).await?;
}
Ok(())
}

async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
let entry = self
.lock_client
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ async fn test_repair_commit_entry() -> TestResult<()> {

#[tokio::test]
#[serial]
#[ignore = "https://github.com/delta-io/delta-rs/issues/2109"]
async fn test_repair_on_update() -> TestResult<()> {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let mut table = prepare_table(&context, "repair_on_update").await?;
Expand All @@ -168,6 +167,19 @@ async fn test_repair_on_update() -> TestResult<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn test_repair_on_load() -> TestResult<()> {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let mut table = prepare_table(&context, "repair_on_update").await?;
let _entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?;
table.load_version(1).await?;
// table should fix the broken entry while loading a specific version
assert_eq!(table.version(), 1);
validate_lock_table_state(&table, 1).await?;
Ok(())
}

const WORKERS: i64 = 3;
const COMMITS: i64 = 15;

Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions crates/deltalake-azure/Cargo.toml → crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
deltalake-core = { path = "../deltalake-core" }
deltalake-core = { version = "0.17.0", path = "../core" }
lazy_static = "1"

# workspace depenndecies
Expand All @@ -21,7 +21,7 @@ url = { workspace = true }
[dev-dependencies]
chrono = { workspace = true }
serial_test = "3"
deltalake-test = { path = "../deltalake-test" }
deltalake-test = { path = "../test" }
pretty_env_logger = "*"
rand = "0.8"
serde_json = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum AzureCredential {
/// Authorizing with secret
ClientSecret,
/// Using a shared access signature
#[allow(dead_code)]
ManagedIdentity,
/// Using a shared access signature
SasKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[allow(dead_code)]
#[error("failed to parse config: {0}")]
Parse(String),

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion crates/benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ datafusion-sql = { workspace = true }
datafusion-physical-expr = { workspace = true }

[dependencies.deltalake-core]
path = "../deltalake-core"
path = "../core"
version = "0"
features = ["datafusion"]
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
async-trait = { workspace = true }
aws-config = "1"
aws-sdk-glue = "1"
deltalake-core = { path = "../deltalake-core" }
deltalake-core = { version = "0.17.0", path = "../core" }
# This can depend on a lowest common denominator of core once that's released
# deltalake_core = { version = "0.17.0" }
thiserror = { workspace = true }
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions crates/deltalake-core/Cargo.toml → crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
sqlparser = { version = "0.40", optional = true }
sqlparser = { version = "0.41", optional = true }

[dev-dependencies]
criterion = "0.5"
ctor = "0"
deltalake-test = { path = "../deltalake-test", features = ["datafusion"] }
deltalake-test = { path = "../test", features = ["datafusion"] }
dotenvy = "0"
hyper = { version = "0.14", features = ["server"] }
maplit = "1"
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ mod tests {

#[tokio::test]
async fn test_table_names() {
let fs = ListingSchemaProvider::try_new("../deltalake-test/tests/data/", None).unwrap();
let fs = ListingSchemaProvider::try_new("../test/tests/data/", None).unwrap();
fs.refresh().await.unwrap();
let table_names = fs.table_names();
assert!(table_names.len() > 20);
Expand All @@ -172,9 +172,7 @@ mod tests {

#[tokio::test]
async fn test_query_table() {
let schema = Arc::new(
ListingSchemaProvider::try_new("../deltalake-test/tests/data/", None).unwrap(),
);
let schema = Arc::new(ListingSchemaProvider::try_new("../test/tests/data/", None).unwrap());
schema.refresh().await.unwrap();

let ctx = SessionContext::new();
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub mod physical;
impl From<DeltaTableError> for DataFusionError {
fn from(err: DeltaTableError) -> Self {
match err {
DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source),
DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source, None),
DeltaTableError::Io { source } => DataFusionError::IoError(source),
DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source),
DeltaTableError::Parquet { source } => DataFusionError::ParquetError(source),
Expand All @@ -102,7 +102,7 @@ impl From<DeltaTableError> for DataFusionError {
impl From<DataFusionError> for DeltaTableError {
fn from(err: DataFusionError) -> Self {
match err {
DataFusionError::ArrowError(source) => DeltaTableError::Arrow { source },
DataFusionError::ArrowError(source, _) => DeltaTableError::Arrow { source },
DataFusionError::IoError(source) => DeltaTableError::Io { source },
DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source },
DataFusionError::ParquetError(source) => DeltaTableError::Parquet { source },
Expand Down Expand Up @@ -430,7 +430,6 @@ impl<'a> DeltaScanBuilder<'a> {
limit: self.limit,
table_partition_cols,
output_ordering: vec![],
infinite_source: false,
},
logical_filter.as_ref(),
)
Expand Down Expand Up @@ -808,7 +807,7 @@ pub(crate) fn logical_expr_to_physical_expr(
) -> Arc<dyn PhysicalExpr> {
let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
}

pub(crate) async fn execute_plan_to_batch(
Expand Down Expand Up @@ -1238,7 +1237,6 @@ pub(crate) async fn find_files_scan<'a>(
let predicate_expr = create_physical_expr(
&Expr::IsTrue(Box::new(expression.clone())),
&input_dfschema,
&input_schema,
state.execution_props(),
)?;

Expand Down Expand Up @@ -1689,7 +1687,7 @@ mod tests {

#[tokio::test]
async fn delta_table_provider_with_config() {
let table = crate::open_table("../deltalake-test/tests/data/delta-2.2.0-partitioned-types")
let table = crate::open_table("../test/tests/data/delta-2.2.0-partitioned-types")
.await
.unwrap();
let config = DeltaScanConfigBuilder::new()
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -955,10 +955,8 @@ mod tests {
let inline = dv_inline();
assert_eq!(None, inline.absolute_path(&parent).unwrap());

let path = std::fs::canonicalize(PathBuf::from(
"../deltalake-test/tests/data/table-with-dv-small/",
))
.unwrap();
let path = std::fs::canonicalize(PathBuf::from("../test/tests/data/table-with-dv-small/"))
.unwrap();
let parent = url::Url::from_directory_path(path).unwrap();
let dv_url = parent
.join("deletion_vector_61d16c75-6994-46b7-a15b-8b538852e50e.bin")
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl LogicalFile<'_> {
self.deletion_vector.as_ref().and_then(|arr| {
arr.storage_type
.is_valid(self.index)
.then(|| DeletionVectorView {
.then_some(DeletionVectorView {
data: arr,
index: self.index,
})
Expand Down Expand Up @@ -682,7 +682,7 @@ mod tests {

#[tokio::test]
async fn read_delta_1_2_1_struct_stats_table() {
let table_uri = "../deltalake-test/tests/data/delta-1.2.1-only-struct-stats";
let table_uri = "../test/tests/data/delta-1.2.1-only-struct-stats";
let table_from_struct_stats = crate::open_table(table_uri).await.unwrap();
let table_from_json_stats = crate::open_table_with_version(table_uri, 1).await.unwrap();

Expand Down Expand Up @@ -727,7 +727,7 @@ mod tests {

#[tokio::test]
async fn df_stats_delta_1_2_1_struct_stats_table() {
let table_uri = "../deltalake-test/tests/data/delta-1.2.1-only-struct-stats";
let table_uri = "../test/tests/data/delta-1.2.1-only-struct-stats";
let table_from_struct_stats = crate::open_table(table_uri).await.unwrap();

let file_stats = table_from_struct_stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tracing::debug;

use super::parse;
use crate::kernel::{arrow::json, Action, ActionType, Metadata, Protocol, Schema, StructType};
use crate::logstore::LogStore;
use crate::operations::transaction::get_commit_bytes;
use crate::protocol::DeltaOperation;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};
Expand Down Expand Up @@ -148,15 +149,21 @@ impl LogSegment {
table_root: &Path,
start_version: i64,
end_version: Option<i64>,
store: &dyn ObjectStore,
log_store: &dyn LogStore,
) -> DeltaResult<Self> {
debug!(
"try_new_slice: start_version: {}, end_version: {:?}",
start_version, end_version
);
log_store.refresh().await?;
let log_url = table_root.child("_delta_log");
let (mut commit_files, checkpoint_files) =
list_log_files(store, &log_url, end_version, Some(start_version)).await?;
let (mut commit_files, checkpoint_files) = list_log_files(
log_store.object_store().as_ref(),
&log_url,
end_version,
Some(start_version),
)
.await?;
// remove all files above requested version
if let Some(version) = end_version {
commit_files.retain(|meta| meta.location.commit_version() <= Some(version));
Expand Down
Loading

0 comments on commit 1f9bcd8

Please sign in to comment.