Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preparing errata bulk releases for UI #1023

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions alws/crud/errata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1452,24 +1452,16 @@ async def release_errata_record(record_id: str, platform_id: int, force: bool):
logging.info("Record %s successfully released", record_id)


async def bulk_errata_records_release(records_ids: List[str]):
async def bulk_errata_records_release(
records_ids: List[str], force: bool = False
):
pulp = PulpClient(
settings.pulp_host,
settings.pulp_user,
settings.pulp_password,
)
release_tasks = []
repos_to_publish = []
async with open_async_session(key=get_async_db_key()) as session:
await session.execute(
update(models.NewErrataRecord)
.where(models.NewErrataRecord.id.in_(records_ids))
.values(
release_status=ErrataReleaseStatus.IN_PROGRESS,
last_release_log=None,
)
)

async with open_async_session(key=get_async_db_key()) as session:
session: AsyncSession
db_records = await session.execute(
Expand Down Expand Up @@ -1500,7 +1492,9 @@ async def bulk_errata_records_release(records_ids: List[str]):
(
repo_mapping,
missing_pkg_names,
) = get_albs_packages_from_record(db_record, pulp_packages)
) = get_albs_packages_from_record(
db_record, pulp_packages, force
)
except Exception as exc:
db_record.release_status = ErrataReleaseStatus.FAILED
db_record.last_release_log = str(exc)
Expand Down Expand Up @@ -1668,7 +1662,9 @@ async def get_updateinfo_xml_from_pulp(
return cr_upd.xml_dump() if cr_upd.updates else None


async def prepare_resetting(items_to_insert: List, record: models.NewErrataRecord, session: AsyncSession):
async def prepare_resetting(
items_to_insert: List, record: models.NewErrataRecord, session: AsyncSession
):
search_params = prepare_search_params(record)
prod_repos_cache = await load_platform_packages(
platform=record.platform,
Expand Down Expand Up @@ -1721,7 +1717,9 @@ async def reset_matched_errata_packages(record_id: str, session: AsyncSession):
await session.flush()


async def get_errata_records_threshold(issued_date_str: str, session: AsyncSession):
async def get_errata_records_threshold(
issued_date_str: str, session: AsyncSession
):
issued_date = datetime.datetime.strptime(
issued_date_str, '%Y-%m-%d %H:%M:%S'
)
Expand All @@ -1748,7 +1746,7 @@ async def get_errata_records_threshold(issued_date_str: str, session: AsyncSessi


async def reset_matched_erratas_packages_threshold(
issued_date: str,
issued_date: str,
):
async with open_async_session(key=get_async_db_key()) as session:
records = await get_errata_records_threshold(issued_date, session)
Expand All @@ -1757,5 +1755,39 @@ async def reset_matched_erratas_packages_threshold(
await prepare_resetting(items_to_insert, record, session)
session.add_all(items_to_insert)
await session.flush()
logging.info(f'Packages for records {[record.id for record in records]}'
f' have been matched if their date is later than {issued_date}')
logging.info(
f'Packages for records {[record.id for record in records]}'
f' have been matched if their date is later than {issued_date}'
)


async def set_errata_packages_in_progress(
records_ids: List[str],
session: AsyncSession,
):
db_records = await session.execute(
select(models.NewErrataRecord).where(
models.NewErrataRecord.id.in_(records_ids)
)
)
records = db_records.scalars().all()
records_to_update = [
record.id
for record in records
if record.release_status != ErrataReleaseStatus.IN_PROGRESS
]
skipped_records = [
record.id
for record in records
if record.release_status == ErrataReleaseStatus.IN_PROGRESS
]
if records_to_update:
await session.execute(
update(models.NewErrataRecord)
.where(models.NewErrataRecord.id.in_(records_to_update))
.values(
release_status=ErrataReleaseStatus.IN_PROGRESS,
last_release_log=None,
)
)
return records_to_update, skipped_records
16 changes: 11 additions & 5 deletions alws/dramatiq/errata.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ async def _release_errata_record(record_id: str, platform_id: int, force: bool):
)


async def _bulk_errata_records_release(records_ids: typing.List[str]):
await bulk_errata_records_release(records_ids)
async def _bulk_errata_records_release(
records_ids: typing.List[str], force: bool = False
):
await bulk_errata_records_release(records_ids, force)


async def _reset_matched_erratas_packages_threshold(issued_date: str):
Expand Down Expand Up @@ -53,9 +55,11 @@ def release_errata(record_id: str, platform_id: int, force: bool):
queue_name="errata",
time_limit=DRAMATIQ_TASK_TIMEOUT,
)
def bulk_errata_release(records_ids: typing.List[str]):
def bulk_errata_release(records_ids: typing.List[str], force: bool = False):
event_loop.run_until_complete(setup_all())
event_loop.run_until_complete(_bulk_errata_records_release(records_ids))
event_loop.run_until_complete(
_bulk_errata_records_release(records_ids, force)
)


@dramatiq.actor(
Expand All @@ -66,4 +70,6 @@ def bulk_errata_release(records_ids: typing.List[str]):
)
def reset_records_threshold(issued_date: str):
event_loop.run_until_complete(setup_all())
event_loop.run_until_complete(_reset_matched_erratas_packages_threshold(issued_date))
event_loop.run_until_complete(
_reset_matched_erratas_packages_threshold(issued_date)
)
55 changes: 38 additions & 17 deletions alws/routers/errata.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@
from alws.auth import get_current_user
from alws.constants import ErrataReleaseStatus
from alws.crud import errata as errata_crud
from alws.crud.errata import get_errata_records_threshold
from alws.crud.errata import (
get_errata_records_threshold,
set_errata_packages_in_progress,
)
from alws.dependencies import get_async_db_key
from alws.dramatiq import bulk_errata_release, release_errata, reset_records_threshold
from alws.dramatiq import (
bulk_errata_release,
release_errata,
reset_records_threshold,
)
from alws.schemas import errata_schema

router = APIRouter(
Expand Down Expand Up @@ -193,14 +200,26 @@ async def release_errata_record(


@router.post("/bulk_release_records/")
async def bulk_release_errata_records(records_ids: List[str]):
bulk_errata_release.send(records_ids)
return {
"message": (
"Following records scheduled for release:"
f" {', '.join(records_ids)}"
)
}
async def bulk_release_errata_records(
records_ids: List[str],
force: bool = False,
session: AsyncSession = Depends(
AsyncSessionDependency(key=get_async_db_key())
),
):
records_to_update, skipped_records = await set_errata_packages_in_progress(
records_ids, session
)
if not records_to_update:
return {"message": "No records to update, all in progress"}
bulk_errata_release.send(records_ids, force)
message = (
"Following records scheduled for release:"
f" {', '.join(records_to_update)}"
)
if skipped_records:
message = f"{message}. Skipped: {', '.join(skipped_records)}"
return {"message": message}


@router.post('/reset-matched-packages')
Expand All @@ -216,17 +235,19 @@ async def reset_matched_packages(

@router.post('/reset-matched-packages-multiple')
async def reset_matched_erratas_packages_threshold(
issued_date,
session: AsyncSession = Depends(
AsyncSessionDependency(key=get_async_db_key())
),
issued_date,
session: AsyncSession = Depends(
AsyncSessionDependency(key=get_async_db_key())
),
):
records = await get_errata_records_threshold(issued_date, session)
if records:
reset_records_threshold.send(issued_date)

message = (f"Records issued after {issued_date} are scheduled for package resetting"
if records else
f"No unreleased records found after {issued_date} including")
message = (
f"Records issued after {issued_date} are scheduled for package resetting"
if records
else f"No unreleased records found after {issued_date} including"
)

return {'message': message}
Loading