Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8543] Fixing Secondary Index Record generation to not rely on WriteStatus #12313

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

nsivabalan
Copy link
Contributor

@nsivabalan nsivabalan commented Nov 21, 2024

Change Logs

SI record generation is of two steps:
a. Find record keys that are updated or deleted and add deleted records to SI index. We will do a lookup of the same in SI to find the SI, record key combo and prepare delete records.
b. For the latest data (inserted or updated), we read the records to find SI value, record key combination to generate new insert records to ingest to SI.

Among the above steps, (a) is the one which was relying on WriteStatus.

In this patch, we are only fixing (a). i.e. Finding the list of records keys that got updated or deleted in the current commit of interest will not rely on WriteStatus, but do on-demand read from data files.
Based on time permitting for the 1.x release, we might have a follow up patch, where we can unify steps a and b and get it done in one step.

This patch definitely has to go in to remove the dependency on WriteStatus. The optimization of merging steps a and b will be followed up based on available bandwidht and timeframe before we wrap up 1.0. It is an optimization step and not really impacts correctness. Since Secondary Index itself is a new feature that we are introducing in 1.x, we wanted to take care of correctness and reliability in the first place.

Impact

  • Fixing Secondary Index Record generation to not rely on WriteStatus.

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Nov 21, 2024
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code style comments. Will do second pass again

}
updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus);
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, instantTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come we still have the updateFromWriteStatuses method..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still rename writeStatus to writeStatuses. plural

Copy link
Contributor Author

@nsivabalan nsivabalan Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I have punted below changes to next patch.
a. Remove HoodieWriteDelegate from WriteStatus
b. Remove the update api in HoodieMetadataWriter totally.

Current patch ensures none of MDT record generation uses the RDD<.WriteStatus>.

I can incorporate the b in this patch if you prefer it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. we can follow up. but we should fix this for good.

@@ -159,6 +159,9 @@ public class HoodieWriteStat implements Serializable {
@Nullable
private Long maxEventTime;

@Nullable
private String prevBaseFile;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you just need the basefile? not the entire previous file slice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieDeltaWriteStat contains the log files. So, HoodieWriteStat only contains info about base files.

* @return Iterator of {@link HoodieRecord}s for RLI Metadata partition.
* @throws IOException
*/
public static Iterator<HoodieRecord> generateRLIMetadataHoodieRecordsForBaseFile(String basePath,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UT? all methods in file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. we already have it for the RLI methods. I will be updating the patch w/ more tests for the rest (SI related ones).


import static java.util.stream.Collectors.toList;

public class BaseFileRecordParsingUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a new Utils class? adding lots of short utils classes, makes the code harder to maintain

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not want to keep expanding HoodieMetadataTableUti. Its already close to 3000 lines.
So, kept all record level parsing method to this new class.

HoodieCommitMetadata commitMetadata,
HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataTableMetaClient,
int writesFileIdEncoding,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be enum

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method bit long, IMO. we need small methods that can be easily tested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. will split it up into smaller ones

Copy link
Contributor Author

@nsivabalan nsivabalan Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, which one you meant for enum.
for writesFileIdEncoding, we only have two possible values for now. 0 -> UUID based fileID. 1 -> random string.
we have not defined any enums for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets file a code cleanup JIRA for 1.1 for these.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@vinothchandar
Copy link
Member

the specific thing you are not addressing is custom record mergers and payloads? (since that would need reading the previous image and merging fully using the merger/payload). ? Can you make sure we throw an error - for this scenario .. if RLI is enabled on other mergers/payloads.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am good with the approach overall. But please resolve testing/code cleanup/errors first

HoodieCommitMetadata commitMetadata,
HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataTableMetaClient,
int writesFileIdEncoding,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets file a code cleanup JIRA for 1.1 for these.

} else if (!isRecord1Deleted && isRecord2Deleted) {
return record1;
} else {
throw new HoodieIOException("Two HoodieRecord updates to RLI is seen for same record key " + record2.getRecordKey() + ", record 1 : "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember leaving a comment here, that we should handle true/true case - by not throwing an error? - for log with redundant deletes. i.e we just encode a streaming delete for same key, without checking for the existence first.

* @param parallelism parallelism to use.
* @return
*/
private static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this package protected and UT

}

static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext,
HoodieCommitMetadata commitMetadata,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting


@VisibleForTesting
public static Set<String> getRecordKeys(String filePath, HoodieTableMetaClient datasetMetaClient,
Option<Schema> writerSchemaOpt, int maxBufferSize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting

int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1);
String basePath = dataTableMetaClient.getBasePath().toString();
// we might need to set some additional variables if we need to process log files.
boolean anyLogFilesWithDeletes = allWriteStats.stream().anyMatch(writeStat -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid duplicating these code blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are some minor differences across RLI and SI code paths.

one of them generates HoodieData, while the SI one just returns a List and
one of them is interested only for logs w/ deletes.
one of them is interested in INSERTS and DELETES, while the other is interested in DELETES and UPDATES.

I have avoided code duplication in BaseFileParsingUtils. here, it becomes little messy. Will try to take a stab at it. but not gonna be a easy one.

@nsivabalan
Copy link
Contributor Author

hey @vinothchandar

the specific thing you are not addressing is custom record mergers and payloads? (since that would need reading the previous image and merging fully using the merger/payload). ? Can you make sure we throw an error - for this scenario .. if RLI is enabled on other mergers/payloads.

yes. we are throwing errors
#12337

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants