-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8543] Fixing Secondary Index Record generation to not rely on WriteStatus #12313
base: master
Are you sure you want to change the base?
[HUDI-8543] Fixing Secondary Index Record generation to not rely on WriteStatus #12313
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code style comments. Will do second pass again
} | ||
updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); | ||
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus); | ||
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, instantTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how come we still have the updateFromWriteStatuses
method..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still rename writeStatus
to writeStatuses
. plural
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I have punted below changes to next patch.
a. Remove HoodieWriteDelegate from WriteStatus
b. Remove the update api in HoodieMetadataWriter totally.
Current patch ensures none of MDT record generation uses the RDD<.WriteStatus>.
I can incorporate the b in this patch if you prefer it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. we can follow up. but we should fix this for good.
@@ -159,6 +159,9 @@ public class HoodieWriteStat implements Serializable { | |||
@Nullable | |||
private Long maxEventTime; | |||
|
|||
@Nullable | |||
private String prevBaseFile; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you just need the basefile? not the entire previous file slice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HoodieDeltaWriteStat contains the log files. So, HoodieWriteStat only contains info about base files.
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
Outdated
Show resolved
Hide resolved
* @return Iterator of {@link HoodieRecord}s for RLI Metadata partition. | ||
* @throws IOException | ||
*/ | ||
public static Iterator<HoodieRecord> generateRLIMetadataHoodieRecordsForBaseFile(String basePath, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UT? all methods in file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. we already have it for the RLI methods. I will be updating the patch w/ more tests for the rest (SI related ones).
|
||
import static java.util.stream.Collectors.toList; | ||
|
||
public class BaseFileRecordParsingUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a new Utils class? adding lots of short utils classes, makes the code harder to maintain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not want to keep expanding HoodieMetadataTableUti. Its already close to 3000 lines.
So, kept all record level parsing method to this new class.
HoodieCommitMetadata commitMetadata, | ||
HoodieMetadataConfig metadataConfig, | ||
HoodieTableMetaClient dataTableMetaClient, | ||
int writesFileIdEncoding, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
method bit long, IMO. we need small methods that can be easily tested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. will split it up into smaller ones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, which one you meant for enum.
for writesFileIdEncoding, we only have two possible values for now. 0 -> UUID based fileID. 1 -> random string.
we have not defined any enums for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets file a code cleanup JIRA for 1.1 for these.
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Outdated
Show resolved
Hide resolved
the specific thing you are not addressing is custom record mergers and payloads? (since that would need reading the previous image and merging fully using the merger/payload). ? Can you make sure we throw an error - for this scenario .. if RLI is enabled on other mergers/payloads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am good with the approach overall. But please resolve testing/code cleanup/errors first
HoodieCommitMetadata commitMetadata, | ||
HoodieMetadataConfig metadataConfig, | ||
HoodieTableMetaClient dataTableMetaClient, | ||
int writesFileIdEncoding, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets file a code cleanup JIRA for 1.1 for these.
} else if (!isRecord1Deleted && isRecord2Deleted) { | ||
return record1; | ||
} else { | ||
throw new HoodieIOException("Two HoodieRecord updates to RLI is seen for same record key " + record2.getRecordKey() + ", record 1 : " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember leaving a comment here, that we should handle true
/true
case - by not throwing an error? - for log with redundant deletes. i.e we just encode a streaming delete for same key, without checking for the existence first.
* @param parallelism parallelism to use. | ||
* @return | ||
*/ | ||
private static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this package protected and UT
} | ||
|
||
static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext, | ||
HoodieCommitMetadata commitMetadata, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Show resolved
Hide resolved
|
||
@VisibleForTesting | ||
public static Set<String> getRecordKeys(String filePath, HoodieTableMetaClient datasetMetaClient, | ||
Option<Schema> writerSchemaOpt, int maxBufferSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting
int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1); | ||
String basePath = dataTableMetaClient.getBasePath().toString(); | ||
// we might need to set some additional variables if we need to process log files. | ||
boolean anyLogFilesWithDeletes = allWriteStats.stream().anyMatch(writeStat -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid duplicating these code blocks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are some minor differences across RLI and SI code paths.
one of them generates HoodieData, while the SI one just returns a List and
one of them is interested only for logs w/ deletes.
one of them is interested in INSERTS and DELETES, while the other is interested in DELETES and UPDATES.
I have avoided code duplication in BaseFileParsingUtils. here, it becomes little messy. Will try to take a stab at it. but not gonna be a easy one.
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Show resolved
Hide resolved
hey @vinothchandar
yes. we are throwing errors |
Change Logs
SI record generation is of two steps:
a. Find record keys that are updated or deleted and add deleted records to SI index. We will do a lookup of the same in SI to find the SI, record key combo and prepare delete records.
b. For the latest data (inserted or updated), we read the records to find SI value, record key combination to generate new insert records to ingest to SI.
Among the above steps, (a) is the one which was relying on WriteStatus.
In this patch, we are only fixing (a). i.e. Finding the list of records keys that got updated or deleted in the current commit of interest will not rely on WriteStatus, but do on-demand read from data files.
Based on time permitting for the 1.x release, we might have a follow up patch, where we can unify steps a and b and get it done in one step.
This patch definitely has to go in to remove the dependency on WriteStatus. The optimization of merging steps a and b will be followed up based on available bandwidht and timeframe before we wrap up 1.0. It is an optimization step and not really impacts correctness. Since Secondary Index itself is a new feature that we are introducing in 1.x, we wanted to take care of correctness and reliability in the first place.
Impact
Risk level (write none, low medium or high below)
low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist