From 1f22df453808c0fab009e1db686e2f811ae35328 Mon Sep 17 00:00:00 2001 From: Yaroslav Kishchenko Date: Thu, 12 Nov 2020 13:48:07 +0300 Subject: [PATCH] [62] Make creation of extraction opportunities faster. --- .../semi/create_extraction_opportunities.py | 216 +++++++++++++----- 1 file changed, 162 insertions(+), 54 deletions(-) diff --git a/veniq/baselines/semi/create_extraction_opportunities.py b/veniq/baselines/semi/create_extraction_opportunities.py index d068e6b2..e2d8e238 100644 --- a/veniq/baselines/semi/create_extraction_opportunities.py +++ b/veniq/baselines/semi/create_extraction_opportunities.py @@ -1,4 +1,5 @@ -from typing import Dict, List, Optional +from collections import defaultdict +from typing import Dict, List, Optional, NamedTuple from veniq.ast_framework import AST from .extract_semantic import extract_method_statements_semantic @@ -9,66 +10,173 @@ def create_extraction_opportunities( statements_semantic: Dict[Statement, StatementSemantic] ) -> List[ExtractionOpportunity]: - extraction_opportunities: List[ExtractionOpportunity] = [] - for step in range(1, len(statements_semantic) + 1): - for extraction_opportunity in _ExtractionOpportunityIterator(statements_semantic, step): - if extraction_opportunity and extraction_opportunity not in extraction_opportunities: - extraction_opportunities.append(extraction_opportunity) - - return extraction_opportunities - - -class _ExtractionOpportunityIterator: - def __init__(self, statements_semantic: Dict[Statement, StatementSemantic], step: int): - self._statements_semantic = statements_semantic - self._statements = list(statements_semantic.keys()) - self._step = step - - self._statement_index = 0 - - def __iter__(self): - return self - - def __next__(self) -> ExtractionOpportunity: - if self._statement_index >= len(self._statements_semantic): - raise StopIteration + statements = list(statements_semantic.keys()) + semantics = list(statements_semantic.values()) + + statements_similarity_provider = _StatementsSimilarityProvider(semantics) + statements_ranges = _StatementsRanges(statements) + + extraction_opportunities = statements_ranges.create_initial_ranges(statements_similarity_provider) + + similarity_gaps = statements_similarity_provider.get_similarity_gaps() + for gap in sorted(similarity_gaps.keys()): + # Create a separate list for new extraction opportunities + # created during merge of statements ranges with **fixed** similarity gap + # due to the possible overwrites of those new extraction opportunities + new_extraction_opportunities: List[ExtractionOpportunity] = [] + for statement_index in similarity_gaps[gap]: + new_opportunity = statements_ranges.merge_ranges(statement_index, statement_index + gap) + + # If, for a fixed similarity gap, a new extraction opportunity starts with the same statements + # as previous one, this means, that the last range of first similarity gap is the first range + # of next similarity gap, i.e. those gaps are overlapping, and in the final result the both + # must be in same extraction opportunity. Notice that the resulting opportunity of second merge + # is simply extending the previous one, so we can take it insted of previous opportunity. + if new_extraction_opportunities and new_extraction_opportunities[-1][0] == new_opportunity[0]: + new_extraction_opportunities[-1] = new_opportunity + else: + new_extraction_opportunities.append(new_opportunity) + extraction_opportunities.extend(new_extraction_opportunities) - fails_qty = 0 - first_statement_index = self._statement_index - last_statement_index: Optional[int] = None + extraction_opportunities = [ + tuple(filter(lambda node: not node.is_fake, extraction_opportunity)) + for extraction_opportunity in extraction_opportunities + if any(not node.is_fake for node in extraction_opportunity) + ] - self._statement_index += 1 + return extraction_opportunities - while self._statement_index < len(self._statements) and last_statement_index is None: - previous_statement_semantic = self._get_statement_semantic(self._statement_index - fails_qty - 1) - current_statement_semantic = self._get_statement_semantic(self._statement_index) - if current_statement_semantic.is_similar(previous_statement_semantic): - fails_qty = 0 - self._statement_index += 1 +class _StatementsSimilarityProvider: + def __init__(self, statements_semantic: List[StatementSemantic]): + self._steps_to_next_similar: List[Optional[int]] = [ + self._calculate_steps_to_next_similar(statements_semantic, statement_index) + for statement_index in range(len(statements_semantic)) + ] + + def has_next_similar_statement(self, statement_index: int) -> bool: + return self._steps_to_next_similar[statement_index] is not None + + def get_steps_to_next_similar_statement(self, statement_index: int) -> int: + step = self._steps_to_next_similar[statement_index] + if step is None: + raise ValueError(f"All statements after {statement_index}th are not similar to it.") + + return step + + def get_similarity_gaps(self) -> Dict[int, List[int]]: + """ + Finds all statements, that next similar statement is not following them directly. + Returns dict with steps as keys and list of coresponding statement indexes as values. + Fo example, if next similar statement to 1st is 3rd, to 2nd is 4th and for 5th is 8th, + then the output will be: {2: [1, 2], 3: [5]}. + NOTICE: all statement indexes lists are sorted. + """ + similarity_gaps_by_size: Dict[int, List[int]] = defaultdict(list) + for statement_index, step in enumerate(self._steps_to_next_similar): + if step and step > 1: + similarity_gaps_by_size[step].append(statement_index) + + return similarity_gaps_by_size + + @staticmethod + def _calculate_steps_to_next_similar( + statements_semantic: List[StatementSemantic], statement_index: int + ) -> Optional[int]: + step = 1 + current_statement = statements_semantic[statement_index] + while statement_index + step < len(statements_semantic): + if current_statement.is_similar(statements_semantic[statement_index + step]): + return step + step += 1 + + return None + + +class _StatementsRanges: + """ + Represents a division of a sequence of statements by non overlapping sorted ranges. + """ + + class _Range(NamedTuple): + begin: int + end: int # ! NOTICE: Index past the last element in a range. + + def __init__(self, statements: List[Statement]): + self._statements = statements + self._ranges: List[_StatementsRanges._Range] = [] + + def create_initial_ranges( + self, statements_similarity: _StatementsSimilarityProvider + ) -> List[ExtractionOpportunity]: + """ + A initial statements range is a continuos range of statements, where each statement, + except the first one, is similar to previous. + """ + + extraction_opportunities: List[ExtractionOpportunity] = [] + + range_begin = 0 + range_end = 1 # ! NOTICE: Index past the last element in a range. + + for index, statement in enumerate(self._statements): + if ( + statements_similarity.has_next_similar_statement(index) + and statements_similarity.get_steps_to_next_similar_statement(index) == 1 + ): + range_end += 1 else: - fails_qty += 1 - if fails_qty == self._step: - self._statement_index -= self._step - 1 - last_statement_index = self._statement_index - 1 - else: - self._statement_index += 1 - - # self._statement_index has passed over self._statements - # put last_statement_index to the last statement before sequence of failures - if last_statement_index is None: - last_statement_index = len(self._statements) - fails_qty - 1 - - return tuple( - self._statements[i] - for i in range(first_statement_index, last_statement_index + 1) - if not self._statements[i].is_fake + self._ranges.append(self._Range(range_begin, range_end)) + extraction_opportunities.append(tuple(self._statements[range_begin:range_end])) + range_begin = range_end + range_end += 1 + + if range_begin < len(self._statements): + self._ranges.append(self._Range(range_begin, len(self._statements))) + extraction_opportunities.append(tuple(self._statements[range_begin:])) + + return extraction_opportunities + + def merge_ranges( + self, first_range_statement_index: int, last_range_statement_index + ) -> ExtractionOpportunity: + """ + Identifies first and last ranges by given statements indexes and + merge them two and all other ranges between them. + Returns statements from newly created range. + """ + first_range_index = self._get_range_index(first_range_statement_index) + last_range_index = self._get_range_index(last_range_statement_index) + + first_range = self._ranges[first_range_index] + last_range = self._ranges[last_range_index] + + new_range = self._Range(first_range.begin, last_range.end) + self._ranges[first_range_index:last_range_index + 1] = [new_range] + + return tuple(self._statements[new_range.begin:new_range.end]) + + def _get_range_index(self, statement_index: int) -> int: + if not self._ranges: + raise ValueError("No ranges was created.") + + smallest_index_in_ranges = self._ranges[0].begin + if statement_index < smallest_index_in_ranges: + raise ValueError( + f"Element is before all the ranges. Element index = {statement_index}, " + f"smallest index among elements in ranges = {smallest_index_in_ranges}." + ) + + for range_index, range in enumerate(self._ranges): + if statement_index < range.end: + return range_index + + largets_index_in_ranges = self._ranges[-1].end - 1 + raise ValueError( + f"Element is past all the ranges. Element index = {statement_index}, " + f"greatest index among elements in ranges = {largets_index_in_ranges}." ) - def _get_statement_semantic(self, statement_index: int) -> StatementSemantic: - current_statement = self._statements[statement_index] - return self._statements_semantic[current_statement] - def _print_extraction_opportunities(method_ast: AST, filepath: str, class_name: str, method_name: str): statements_semantic = extract_method_statements_semantic(method_ast)