Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add: New NVD CPE Match Criteria API #1067

Merged
merged 3 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions pontos/nvd/cpe_match/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

import asyncio
from argparse import Namespace
from typing import Callable

import httpx

from pontos.nvd.cpe_match.api import CPEMatchApi

from ._parser import cpe_match_parse, cpe_matches_parse

__all__ = ("CPEMatchApi",)


async def query_cpe_match(args: Namespace) -> None:
async with CPEMatchApi(token=args.token) as api:
cpe_match = await api.cpe_match(args.match_criteria_id)
print(cpe_match)


async def query_cpe_matches(args: Namespace) -> None:
async with CPEMatchApi(token=args.token) as api:
response = api.cpe_matches(
cve_id=args.cve_id,
request_results=args.number,
start_index=args.start,
)
async for cpe_match in response:
print(cpe_match)


def cpe_match_main() -> None:
main(cpe_match_parse(), query_cpe_match)


def cpe_matches_main() -> None:
main(cpe_matches_parse(), query_cpe_matches)


def main(args: Namespace, func: Callable) -> None:
try:
asyncio.run(func(args))
except KeyboardInterrupt:
pass
except httpx.HTTPStatusError as e:
print(f"HTTP Error {e.response.status_code}: {e.response.text}")
36 changes: 36 additions & 0 deletions pontos/nvd/cpe_match/_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from argparse import ArgumentParser, Namespace
from typing import Optional, Sequence

import shtab


def cpe_matches_parse(args: Optional[Sequence[str]] = None) -> Namespace:
parser = ArgumentParser()
shtab.add_argument_to(parser)
parser.add_argument("--token", help="API key to use for querying.")
parser.add_argument("--cve-id", help="Get matches for a specific CVE")
parser.add_argument(

Check warning on line 16 in pontos/nvd/cpe_match/_parser.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/_parser.py#L12-L16

Added lines #L12 - L16 were not covered by tests
"--number", "-n", metavar="N", help="Request only N matches", type=int
)
parser.add_argument(

Check warning on line 19 in pontos/nvd/cpe_match/_parser.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/_parser.py#L19

Added line #L19 was not covered by tests
"--start",
"-s",
help="Index of the first match to request.",
type=int,
)
return parser.parse_args(args)

Check warning on line 25 in pontos/nvd/cpe_match/_parser.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/_parser.py#L25

Added line #L25 was not covered by tests


def cpe_match_parse(args: Optional[Sequence[str]] = None) -> Namespace:
parser = ArgumentParser()
shtab.add_argument_to(parser)
parser.add_argument("--token", help="API key to use for querying.")
parser.add_argument(

Check warning on line 32 in pontos/nvd/cpe_match/_parser.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/_parser.py#L29-L32

Added lines #L29 - L32 were not covered by tests
"--match-criteria-id",
help="Get the match string with the given matchCriteriaId ",
)
return parser.parse_args(args)

Check warning on line 36 in pontos/nvd/cpe_match/_parser.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/_parser.py#L36

Added line #L36 was not covered by tests
218 changes: 218 additions & 0 deletions pontos/nvd/cpe_match/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from datetime import datetime
from types import TracebackType
from typing import (
Any,
Iterator,
Optional,
Type,
)

from httpx import Timeout

from pontos.errors import PontosError
from pontos.nvd.api import (
DEFAULT_TIMEOUT_CONFIG,
JSON,
NVDApi,
NVDResults,
Params,
convert_camel_case,
format_date,
now,
)
from pontos.nvd.models.cpe_match_string import CPEMatchString

__all__ = ("CPEMatchApi",)

DEFAULT_NIST_NVD_CPE_MATCH_URL = (
"https://services.nvd.nist.gov/rest/json/cpematch/2.0"
)
MAX_CPE_MATCHES_PER_PAGE = 500


def _result_iterator(data: JSON) -> Iterator[CPEMatchString]:
results: list[dict[str, Any]] = data.get("match_strings", []) # type: ignore
return (

Check warning on line 39 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L38-L39

Added lines #L38 - L39 were not covered by tests
CPEMatchString.from_dict(result["match_string"]) for result in results
)


class CPEMatchApi(NVDApi):
"""
API for querying the NIST NVD CPE match information.

Should be used as an async context manager.

Example:
.. code-block:: python

from pontos.nvd.cpe_match import CPEMatchApi

async with CPEMatchApi() as api:
cpe = await api.cpe_match_string(...)
"""

def __init__(
self,
*,
token: Optional[str] = None,
timeout: Optional[Timeout] = DEFAULT_TIMEOUT_CONFIG,
rate_limit: bool = True,
) -> None:
"""
Create a new instance of the CPE API.

Args:
token: The API key to use. Using an API key allows to run more
requests at the same time.
timeout: Timeout settings for the HTTP requests
rate_limit: Set to False to ignore rate limits. The public rate
limit (without an API key) is 5 requests in a rolling 30 second
window. The rate limit with an API key is 50 requests in a
rolling 30 second window.
See https://nvd.nist.gov/developers/start-here#divRateLimits
Default: True.
"""
super().__init__(

Check warning on line 80 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L80

Added line #L80 was not covered by tests
DEFAULT_NIST_NVD_CPE_MATCH_URL,
token=token,
timeout=timeout,
rate_limit=rate_limit,
)

def cpe_matches(
self,
*,
last_modified_start_date: Optional[datetime] = None,
last_modified_end_date: Optional[datetime] = None,
cve_id: Optional[str] = None,
match_string_search: Optional[str] = None,
request_results: Optional[int] = None,
start_index: int = 0,
results_per_page: Optional[int] = None,
) -> NVDResults[CPEMatchString]:
"""
Get all CPE matches for the provided arguments

https://nvd.nist.gov/developers/products#divCpeMatch

Args:
last_modified_start_date: Return all CPE matches last modified after this date.
last_modified_end_date: Return all CPE matches last modified before this date.
cve_id: Return all CPE matches for this Common Vulnerabilities and Exposures identifier.
match_string_search: Return all CPE matches that conform to the given pattern
request_results: Number of CPE matches to download. Set to None
(default) to download all available matches.
start_index: Index of the first CPE match to be returned. Useful
only for paginated requests that should not start at the first
page.
results_per_page: Number of results in a single requests. Mostly
useful for paginated requests.

Returns:
A NVDResponse for CPE matches

Example:
.. code-block:: python

from pontos.nvd.cpe_match import CPEMatchApi

async with CPEMatchApi() as api:
async for match_string in api.matches(cve_id='CVE-2024-1234'):
print(match_string)

json = api.matches(cve_id='CVE-2024-1234').json()

async for match_strings in api.matches(
cve_id='CVE-2024-1234',
).chunks():
for match_string in match_strings:
print(match_string)
"""
params: Params = {}

Check warning on line 136 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L136

Added line #L136 was not covered by tests

if last_modified_start_date:
params["lastModStartDate"] = format_date(last_modified_start_date)

Check warning on line 139 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L139

Added line #L139 was not covered by tests
if not last_modified_end_date:
params["lastModEndDate"] = format_date(now())

Check warning on line 141 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L141

Added line #L141 was not covered by tests
if last_modified_end_date:
params["lastModEndDate"] = format_date(last_modified_end_date)

Check warning on line 143 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L143

Added line #L143 was not covered by tests

if cve_id:
params["cveId"] = cve_id

Check warning on line 146 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L146

Added line #L146 was not covered by tests
if match_string_search:
params["matchStringSearch"] = match_string_search

Check warning on line 148 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L148

Added line #L148 was not covered by tests

results_per_page = min(

Check warning on line 150 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L150

Added line #L150 was not covered by tests
results_per_page or MAX_CPE_MATCHES_PER_PAGE,
request_results or MAX_CPE_MATCHES_PER_PAGE,
)
if start_index is None:
start_index = 0

Check warning on line 155 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L155

Added line #L155 was not covered by tests

return NVDResults(

Check warning on line 157 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L157

Added line #L157 was not covered by tests
self,
params,
_result_iterator,
request_results=request_results,
results_per_page=results_per_page,
start_index=start_index,
)

async def cpe_match(self, match_criteria_id: str) -> CPEMatchString:
"""
Returns a single CPE match for the given match criteria id.

Args:
match_criteria_id: Match criteria identifier

Returns:
A CPE match for the given identifier

Raises:
PontosError: If match criteria ID is empty or if no match with the given ID is
found.

Example:
.. code-block:: python

from pontos.nvd.cpe_match import CVEApi

async with CVEApi() as api:
match = await api.cpe_match("36FBCF0F-8CEE-474C-8A04-5075AF53FAF4")
print(match)
"""
if not match_criteria_id:
raise PontosError("Missing Match Criteria ID.")

Check warning on line 190 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L190

Added line #L190 was not covered by tests

response = await self._get(

Check warning on line 192 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L192

Added line #L192 was not covered by tests
params={"matchCriteriaId": match_criteria_id}
)
response.raise_for_status()
data = response.json(object_hook=convert_camel_case)
match_strings = data["match_strings"]

Check warning on line 197 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L195-L197

Added lines #L195 - L197 were not covered by tests
if not match_strings:
raise PontosError(

Check warning on line 199 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L199

Added line #L199 was not covered by tests
f"No match with Match Criteria ID '{match_criteria_id}' found."
)

match_string = match_strings[0]
return CPEMatchString.from_dict(match_string["match_string"])

Check warning on line 204 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L203-L204

Added lines #L203 - L204 were not covered by tests

async def __aenter__(self) -> "CPEMatchApi":
await super().__aenter__()
return self

Check warning on line 208 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L207-L208

Added lines #L207 - L208 were not covered by tests

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
return await super().__aexit__( # type: ignore

Check warning on line 216 in pontos/nvd/cpe_match/api.py

View check run for this annotation

Codecov / codecov/patch

pontos/nvd/cpe_match/api.py#L216

Added line #L216 was not covered by tests
exc_type, exc_value, traceback
)
57 changes: 57 additions & 0 deletions pontos/nvd/models/cpe_match_string.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
from uuid import UUID

from pontos.models import Model


@dataclass
class CPEMatch(Model):
"""
Represents a single CPE match.
Attributes:
cpe_name: Name of the matching CPE
cpe_name_id: Name ID of the matching CPE
"""

cpe_name: str
cpe_name_id: UUID


@dataclass
class CPEMatchString(Model):
"""
Represents a CPE match string, matching criteria to one or more CPEs
Attributes:
match_criteria_id: The identifier of the CPE match
criteria: The CPE formatted match criteria
version_start_including: Optional start of the matching version range, including the given version
version_start_excluding: Optional start of the matching version range, excluding the given version
version_end_including: Optional end of the matching version range, including the given version
version_end_excluding: Optional end of the matching version range, excluding the given version
status: Status of the CPE match
cpe_last_modified: The date the CPEs list of the match was last modified
created: Creation date of the CPE
last_modified: Last modification date of the CPE
matches: List of CPEs matching the criteria string and the optional range limits
"""

match_criteria_id: UUID
criteria: str
status: str
cpe_last_modified: datetime
created: datetime
last_modified: datetime
matches: List[CPEMatch] = field(default_factory=list)
version_start_including: Optional[str] = None
version_start_excluding: Optional[str] = None
version_end_including: Optional[str] = None
version_end_excluding: Optional[str] = None
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ pontos-nvd-cves = 'pontos.nvd.cve:cves_main'
pontos-nvd-cve-changes = 'pontos.nvd.cve_changes:main'
pontos-nvd-cpe = 'pontos.nvd.cpe:cpe_main'
pontos-nvd-cpes = 'pontos.nvd.cpe:cpes_main'
pontos-nvd-cpe-match = 'pontos.nvd.cpe_matches:cpe_match_main'
pontos-nvd-cpe-matches = 'pontos.nvd.cpe_matches:cpe_matches_main'

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
Loading