From ad2571209eef8cdb5e7aa318ae3211caad353e71 Mon Sep 17 00:00:00 2001 From: bklvsky Date: Tue, 15 Oct 2024 20:31:56 +0000 Subject: [PATCH] Preparing errata bulk releases for UI (AlmaLinux/build-system#325) - Adding force parameter to bulk releases errata - Setting IN_PROGRESS status in router - black formatting --- alws/crud/errata.py | 66 ++++++++++++++++++++++++++++++----------- alws/dramatiq/errata.py | 16 ++++++---- alws/routers/errata.py | 55 +++++++++++++++++++++++----------- 3 files changed, 98 insertions(+), 39 deletions(-) diff --git a/alws/crud/errata.py b/alws/crud/errata.py index be6fa9ba..de00b5b1 100644 --- a/alws/crud/errata.py +++ b/alws/crud/errata.py @@ -1452,7 +1452,9 @@ async def release_errata_record(record_id: str, platform_id: int, force: bool): logging.info("Record %s successfully released", record_id) -async def bulk_errata_records_release(records_ids: List[str]): +async def bulk_errata_records_release( + records_ids: List[str], force: bool = False +): pulp = PulpClient( settings.pulp_host, settings.pulp_user, @@ -1460,16 +1462,6 @@ async def bulk_errata_records_release(records_ids: List[str]): ) release_tasks = [] repos_to_publish = [] - async with open_async_session(key=get_async_db_key()) as session: - await session.execute( - update(models.NewErrataRecord) - .where(models.NewErrataRecord.id.in_(records_ids)) - .values( - release_status=ErrataReleaseStatus.IN_PROGRESS, - last_release_log=None, - ) - ) - async with open_async_session(key=get_async_db_key()) as session: session: AsyncSession db_records = await session.execute( @@ -1500,7 +1492,9 @@ async def bulk_errata_records_release(records_ids: List[str]): ( repo_mapping, missing_pkg_names, - ) = get_albs_packages_from_record(db_record, pulp_packages) + ) = get_albs_packages_from_record( + db_record, pulp_packages, force + ) except Exception as exc: db_record.release_status = ErrataReleaseStatus.FAILED db_record.last_release_log = str(exc) @@ -1668,7 +1662,9 @@ async def get_updateinfo_xml_from_pulp( return cr_upd.xml_dump() if cr_upd.updates else None -async def prepare_resetting(items_to_insert: List, record: models.NewErrataRecord, session: AsyncSession): +async def prepare_resetting( + items_to_insert: List, record: models.NewErrataRecord, session: AsyncSession +): search_params = prepare_search_params(record) prod_repos_cache = await load_platform_packages( platform=record.platform, @@ -1721,7 +1717,9 @@ async def reset_matched_errata_packages(record_id: str, session: AsyncSession): await session.flush() -async def get_errata_records_threshold(issued_date_str: str, session: AsyncSession): +async def get_errata_records_threshold( + issued_date_str: str, session: AsyncSession +): issued_date = datetime.datetime.strptime( issued_date_str, '%Y-%m-%d %H:%M:%S' ) @@ -1748,7 +1746,7 @@ async def get_errata_records_threshold(issued_date_str: str, session: AsyncSessi async def reset_matched_erratas_packages_threshold( - issued_date: str, + issued_date: str, ): async with open_async_session(key=get_async_db_key()) as session: records = await get_errata_records_threshold(issued_date, session) @@ -1757,5 +1755,39 @@ async def reset_matched_erratas_packages_threshold( await prepare_resetting(items_to_insert, record, session) session.add_all(items_to_insert) await session.flush() - logging.info(f'Packages for records {[record.id for record in records]}' - f' have been matched if their date is later than {issued_date}') + logging.info( + f'Packages for records {[record.id for record in records]}' + f' have been matched if their date is later than {issued_date}' + ) + + +async def set_errata_packages_in_progress( + records_ids: List[str], + session: AsyncSession, +): + db_records = await session.execute( + select(models.NewErrataRecord).where( + models.NewErrataRecord.id.in_(records_ids) + ) + ) + records = db_records.scalars().all() + records_to_update = [ + record.id + for record in records + if record.release_status != ErrataReleaseStatus.IN_PROGRESS + ] + skipped_records = [ + record.id + for record in records + if record.release_status == ErrataReleaseStatus.IN_PROGRESS + ] + if records_to_update: + await session.execute( + update(models.NewErrataRecord) + .where(models.NewErrataRecord.id.in_(records_to_update)) + .values( + release_status=ErrataReleaseStatus.IN_PROGRESS, + last_release_log=None, + ) + ) + return records_to_update, skipped_records diff --git a/alws/dramatiq/errata.py b/alws/dramatiq/errata.py index 3179d8a8..2996dfae 100644 --- a/alws/dramatiq/errata.py +++ b/alws/dramatiq/errata.py @@ -22,8 +22,10 @@ async def _release_errata_record(record_id: str, platform_id: int, force: bool): ) -async def _bulk_errata_records_release(records_ids: typing.List[str]): - await bulk_errata_records_release(records_ids) +async def _bulk_errata_records_release( + records_ids: typing.List[str], force: bool = False +): + await bulk_errata_records_release(records_ids, force) async def _reset_matched_erratas_packages_threshold(issued_date: str): @@ -53,9 +55,11 @@ def release_errata(record_id: str, platform_id: int, force: bool): queue_name="errata", time_limit=DRAMATIQ_TASK_TIMEOUT, ) -def bulk_errata_release(records_ids: typing.List[str]): +def bulk_errata_release(records_ids: typing.List[str], force: bool = False): event_loop.run_until_complete(setup_all()) - event_loop.run_until_complete(_bulk_errata_records_release(records_ids)) + event_loop.run_until_complete( + _bulk_errata_records_release(records_ids, force) + ) @dramatiq.actor( @@ -66,4 +70,6 @@ def bulk_errata_release(records_ids: typing.List[str]): ) def reset_records_threshold(issued_date: str): event_loop.run_until_complete(setup_all()) - event_loop.run_until_complete(_reset_matched_erratas_packages_threshold(issued_date)) + event_loop.run_until_complete( + _reset_matched_erratas_packages_threshold(issued_date) + ) diff --git a/alws/routers/errata.py b/alws/routers/errata.py index 68770524..50462d43 100644 --- a/alws/routers/errata.py +++ b/alws/routers/errata.py @@ -8,9 +8,16 @@ from alws.auth import get_current_user from alws.constants import ErrataReleaseStatus from alws.crud import errata as errata_crud -from alws.crud.errata import get_errata_records_threshold +from alws.crud.errata import ( + get_errata_records_threshold, + set_errata_packages_in_progress, +) from alws.dependencies import get_async_db_key -from alws.dramatiq import bulk_errata_release, release_errata, reset_records_threshold +from alws.dramatiq import ( + bulk_errata_release, + release_errata, + reset_records_threshold, +) from alws.schemas import errata_schema router = APIRouter( @@ -193,14 +200,26 @@ async def release_errata_record( @router.post("/bulk_release_records/") -async def bulk_release_errata_records(records_ids: List[str]): - bulk_errata_release.send(records_ids) - return { - "message": ( - "Following records scheduled for release:" - f" {', '.join(records_ids)}" - ) - } +async def bulk_release_errata_records( + records_ids: List[str], + force: bool = False, + session: AsyncSession = Depends( + AsyncSessionDependency(key=get_async_db_key()) + ), +): + records_to_update, skipped_records = await set_errata_packages_in_progress( + records_ids, session + ) + if not records_to_update: + return {"message": "No records to update, all in progress"} + bulk_errata_release.send(records_ids, force) + message = ( + "Following records scheduled for release:" + f" {', '.join(records_to_update)}" + ) + if skipped_records: + message = f"{message}. Skipped: {', '.join(skipped_records)}" + return {"message": message} @router.post('/reset-matched-packages') @@ -216,17 +235,19 @@ async def reset_matched_packages( @router.post('/reset-matched-packages-multiple') async def reset_matched_erratas_packages_threshold( - issued_date, - session: AsyncSession = Depends( - AsyncSessionDependency(key=get_async_db_key()) - ), + issued_date, + session: AsyncSession = Depends( + AsyncSessionDependency(key=get_async_db_key()) + ), ): records = await get_errata_records_threshold(issued_date, session) if records: reset_records_threshold.send(issued_date) - message = (f"Records issued after {issued_date} are scheduled for package resetting" - if records else - f"No unreleased records found after {issued_date} including") + message = ( + f"Records issued after {issued_date} are scheduled for package resetting" + if records + else f"No unreleased records found after {issued_date} including" + ) return {'message': message}