-
Notifications
You must be signed in to change notification settings - Fork 0
/
maven.py
1296 lines (1060 loc) · 46.8 KB
/
maven.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
A Python implementation of Maven-related functionality, including parsing of Maven POMs
and Maven metadata files, download of remote artifacts, and calculation of dependencies.
(copied from https://github.com/ctrueden/db-xml-maven by Curtis Rueden)
"""
import logging
import sys
from abc import ABC, abstractmethod
from datetime import datetime
from hashlib import md5, sha1
from itertools import combinations
from os import environ
from pathlib import Path
from re import findall, match
from subprocess import run
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
from xml.etree import ElementTree
import requests
# -- Constants --
DEFAULT_LOCAL_REPOS = []
DEFAULT_REMOTE_REPOS = {"central": "https://repo.maven.apache.org/maven2"}
DEFAULT_CLASSIFIER = ""
DEFAULT_PACKAGING = "jar"
# -- Logging --
_log = logging.getLogger(__name__)
# -- Functions --
def ts2dt(ts: str) -> datetime:
"""
Convert a Maven-style timestamp string into a Python datetime object.
Valid forms:
* 20210702144918 (seen in <lastUpdated> in maven-metadata.xml)
* 20210702.144917 (seen in deployed SNAPSHOT filenames and <snapshotVersion><value>)
"""
m = match("(\\d{4})(\\d\\d)(\\d\\d)\\.?(\\d\\d)(\\d\\d)(\\d\\d)", ts)
if not m: raise ValueError(f"Invalid timestamp: {ts}")
return datetime(*map(int, m.groups())) # noqa
def coord2str(
groupId: str,
artifactId: str,
version: Optional[str] = None,
classifier: Optional[str] = None,
packaging: Optional[str] = None,
scope: Optional[str] = None,
optional: bool = False
):
"""
Return a string representation of the given Maven coordinates.
For an overview of Maven coordinates, see:
* https://maven.apache.org/pom.html#Maven_Coordinates
* https://maven.apache.org/pom.html#dependencies
:param groupId: The groupId of the Maven coordinate.
:param artifactId: The artifactId of the Maven coordinate.
:param version The version of the Maven coordinate.
:param classifier: The classifier of the Maven coordinate.
:param packaging: The packaging/type of the Maven coordinate.
:param scope: The scope of the Maven dependency.
:param optional: Whether the Maven dependency is optional.
:return:
A string encompassing the given fields, matching the
G:A:P:C:V:S order used by mvn's dependency:list goal.
"""
s = f"{groupId}:{artifactId}"
if packaging: s += f":{packaging}"
if classifier: s += f":{classifier}"
if version: s += f":{version}"
if scope: s += f":{scope}"
if optional: s += " (optional)"
return s
def read(p: Path, mode: str) -> Union[str, bytes]:
with open(p, mode) as f:
return f.read()
def text(p: Path) -> str:
return read(p, "r")
def binary(p: Path) -> bytes:
return read(p, "rb")
# -- Classes --
class Resolver(ABC):
"""
Logic for doing non-trivial Maven-related things, including:
* downloading and caching an artifact from a remote repository; and
* determining the dependencies of a particular Maven component.
"""
@abstractmethod
def download(self, artifact: "Artifact") -> Optional[Path]:
"""
Download an artifact file from a remote repository.
:param artifact: The artifact for which a local path should be resolved.
:return: Local path to the saved artifact, or None if the artifact cannot be resolved.
"""
...
@abstractmethod
def dependencies(self, component: "Component") -> List["Dependency"]:
"""
Determine dependencies for the given Maven component.
:param component: The component for which to determine the dependencies.
:return: The list of dependencies.
"""
...
class SimpleResolver(Resolver):
"""
A resolver that works by pure Python code.
Low overhead, but less feature complete than mvn.
"""
def download(self, artifact: "Artifact") -> Optional[Path]:
if artifact.version.endswith("-SNAPSHOT"):
raise RuntimeError("Downloading of snapshots is not yet implemented.")
for remote_repo in artifact.env.remote_repos.values():
url = f"{remote_repo}/{artifact.component.path_prefix}/{artifact.filename}"
response: requests.Response = requests.get(url)
if response.status_code == 200:
# Artifact downloaded successfully.
# TODO: Also get MD5 and SHA1 files if available.
# And for each, if it *is* available and successfully gotten,
# check the actual hash of the downloaded file contents against the expected one.
cached_file = artifact.cached_path
assert not cached_file.exists()
cached_file.parent.mkdir(parents=True, exist_ok=True)
with open(cached_file, "wb") as f:
f.write(response.content)
_log.debug(f"Downloaded {url} to {cached_file}")
return cached_file
raise RuntimeError(f"Artifact {artifact} not found in remote repositories {artifact.env.remote_repos}")
def dependencies(self, component: "Component") -> List["Dependency"]:
model = Model(component.pom())
return list(model.deps.values())
class SysCallResolver(Resolver):
"""
A resolver that works by shelling out to mvn.
Requires Maven to be installed.
"""
def __init__(self, mvn_command: Path):
self.mvn_command = mvn_command
self.mvn_flags = ["-B", "-T8"]
def download(self, artifact: "Artifact") -> Optional[Path]:
_log.info(f"Downloading artifact: {artifact}")
assert artifact.env.repo_cache
assert artifact.groupId
assert artifact.artifactId
assert artifact.version
assert artifact.packaging
args = [
f"-Dmaven.repo.local={artifact.env.repo_cache}",
f"-DgroupId={artifact.groupId}",
f"-DartifactId={artifact.artifactId}",
f"-Dversion={artifact.version}",
f"-Dpackaging={artifact.packaging}",
]
if artifact.classifier:
args.append(f"-Dclassifier={artifact.classifier}")
if artifact.env.remote_repos:
remote_repos = ",".join(f"{name}::::{url}" for name, url in artifact.env.remote_repos.items())
args.append(f"-DremoteRepositories={remote_repos}")
self._mvn("dependency:get", *args)
# The file should now exist in the local repo cache.
assert artifact.cached_path and artifact.cached_path.exists()
return artifact.cached_path
def dependencies(self, component: "Component") -> List["Dependency"]:
# Invoke the dependency:list goal, direct dependencies only.
pom_artifact = component.artifact(packaging="pom")
assert pom_artifact.env.repo_cache
output = self._mvn(
"dependency:list",
"-f", pom_artifact.resolve(),
"-DexcludeTransitive=true",
f"-Dmaven.repo.local={pom_artifact.env.repo_cache}"
)
# FIXME: Fix the following logic to parse dependency:list output.
# Filter to include only the actual lines of XML.
lines = output.splitlines()
snip = snap = None
for i, line in enumerate(lines):
if snip is None and line.startswith("<?xml"):
snip = i
elif line == "</project>":
snap = i
break
assert snip is not None and snap is not None
pom = POM("\n".join(lines[snip:snap + 1]), pom_artifact.env)
# Extract the flattened dependencies.
return pom.dependencies()
def _mvn(self, *args) -> str:
# TODO: Windows.
return SysCallResolver._run(self.mvn_command, *self.mvn_flags, *args)
@staticmethod
def _run(command, *args) -> str:
command_and_args = (command,) + args
_log.debug(f"Executing: {command_and_args}")
result = run(command_and_args, capture_output=True)
if result.returncode == 0: return result.stdout.decode()
error_message = (
f"Command failed with exit code {result.returncode}:\n"
f"{command_and_args}"
)
if result.stdout: error_message += f"\n\n[stdout]\n{result.stdout.decode()}"
if result.stderr: error_message += f"\n\n[stderr]\n{result.stderr.decode()}"
raise RuntimeError(error_message)
class Environment:
"""
Maven environment.
* Local repo cache folder.
* Local repository storage folders.
* Remote repository name:URL pairs.
* Artifact resolution mechanism.
"""
def __init__(
self,
repo_cache: Optional[Path] = None,
local_repos: Optional[List[Path]] = None,
remote_repos: Optional[Dict[str, str]] = None,
resolver: Optional[Resolver] = None,
):
"""
Create a Maven environment.
:param repo_cache:
Optional path to Maven local repository cache directory, i.e. destination
of `mvn install`. Maven typically uses ~/.m2/repository by default.
This directory is treated as *read-write* by this library, e.g.
the download() function will store downloaded artifacts there.
If no local repository cache path is given, Maven defaults will be used
(M2_REPO environment variable, or ~/.m2/repository by default).
:param local_repos:
Optional list of Maven repository storage local paths to check for artifacts.
These are real Maven repositories, such as those managed by a Sonatype Nexus v2 instance,
i.e. ultimate destinations of `mvn deploy`, *not* local repository caches!
These directories are treated as *read-only* by this library.
If no local repository paths are given, none will be inferred.
:param remote_repos:
Optional dict of remote name:URL pairs, with each URL corresponding
to a remote Maven repository accessible via HTTP/HTTPS.
If no local repository paths are given, only Maven Central will be used.
:param resolver:
Optional mechanism to use for resolving local paths to artifacts.
By default, the SimpleResolver will be used.
"""
self.repo_cache: Path = repo_cache or environ.get("M2_REPO", Path("~").expanduser() / ".m2" / "repository")
self.local_repos: List[Path] = (DEFAULT_LOCAL_REPOS if local_repos is None else local_repos).copy()
self.remote_repos: Dict[str, str] = (DEFAULT_REMOTE_REPOS if remote_repos is None else remote_repos).copy()
self.resolver: Resolver = resolver or SimpleResolver()
def project(self, groupId: str, artifactId: str) -> "Project":
"""
TODO
Get a project (G:A) with the given groupId and artifactId.
:param groupId: The groupId of the project.
:param artifactId: The artifactId of the project.
:return: The Project object.
"""
return Project(self, groupId, artifactId)
def dependency(self, el: ElementTree.Element) -> "Dependency":
"""
Create a Dependency object from the given XML element.
:param el: The XML element from which to create the dependency.
:return: The Dependency object.
"""
groupId = el.findtext("groupId")
artifactId = el.findtext("artifactId")
assert groupId and artifactId
version = el.findtext("version") # NB: Might be None, which means managed.
classifier = el.findtext("classifier") or DEFAULT_CLASSIFIER
packaging = el.findtext("type") or DEFAULT_PACKAGING
scope = el.findtext("scope") or ("test" if packaging == "tests" else "compile")
optional = el.findtext("optional") == "true" or False
exclusions = [
self.project(ex.findtext("groupId"), ex.findtext("artifactId"))
for ex in el.findall("exclusions/exclusion")
]
project = self.project(groupId, artifactId)
artifact = project.at_version(version).artifact(classifier, packaging)
return Dependency(artifact, scope, optional, exclusions)
class Project:
"""
This is a Maven project: i.e. a groupId+artifact (G:A) pair.
"""
def __init__(self, env: Environment, groupId: str, artifactId: str):
self.env = env
self.groupId = groupId
self.artifactId = artifactId
self._metadata: Optional[Metadata] = None
def __eq__(self, other):
return (
other is not None
and self.groupId == other.groupId
and self.artifactId == other.artifactId
)
def __hash__(self):
return hash((self.groupId, self.artifactId))
def __str__(self):
return coord2str(self.groupId, self.artifactId)
@property
def path_prefix(self) -> Path:
"""
Relative directory where artifacts of this project are organized.
E.g. org.jruby:jruby-core -> org/jruby/jruby-core
"""
return Path(*self.groupId.split("."), self.artifactId)
def at_version(self, version: str) -> "Component":
"""
Fix this project (G:A) at a particular version (G:A:V).
:param version: The version of the project.
:return: Component at the given version.
"""
return Component(self, version)
@property
def metadata(self) -> "Metadata":
"""Maven metadata about this project, encompassing all known sources."""
if self._metadata is None:
# Aggregate all locally available project maven-metadata.xml sources.
repo_cache_dir = self.env.repo_cache / self.path_prefix
paths = (
[p for p in repo_cache_dir.glob("maven-metadata*.xml")] +
[r / self.path_prefix / "maven-metadata.xml" for r in self.env.local_repos]
)
self._metadata = Metadatas([MetadataXML(p) for p in paths if p.exists()])
return self._metadata
def update(self) -> None:
"""Update metadata from remote sources."""
raise RuntimeError("Unimplemented")
@property
def release(self) -> str:
"""
The newest release version of this project.
This is the equivalent of Maven's RELEASE version.
"""
return self.metadata.release
@property
def latest(self) -> str:
"""
The latest SNAPSHOT version of this project.
This is the equivalent of Maven's LATEST version.
"""
return self.metadata.latest
def versions(self, releases: bool = True, snapshots: bool = False, locked: bool = False) -> List["Component"]:
"""
Get a list of all known versions of this project.
:param releases:
If True, include release versions (those not ending in -SNAPSHOT) in the results.
:param snapshots:
If True, include snapshot versions (those ending in -SNAPSHOT) in the results.
:param locked:
If True, returned snapshot versions will include the timestamp or "lock" flavor of the version strings;
For example: 2.94.3-20230706.150124-1 rather than 2.94.3-SNAPSHOT.
As such, there may be more entries returned than when this flag is False.
:return: List of Component objects, each of which represents a known version.
"""
# TODO: Think about whether multiple timestamped snapshots at the same snapshot version should be
# one Component, or multiple Components. because we could just have a list of timestamps in the Component
# as a field... but then we probably violate existing 1-to-many vs 1-to-1 type assumptions regarding how Components and Artifacts relate.
# You can only "sort of" have an artifact for a SNAPSHOT without a timestamp lock... it's always timestamped on the remote side,
# but on the local side only implicitly unless Maven's snapshot locking feature is used... confusing.
if locked: raise RuntimeError("Locked snapshot reporting is unimplemented")
return [
self.at_version(v)
for v in self.metadata.versions
if (
(snapshots and v.endswith("-SNAPSHOT")) or
(releases and not v.endswith("-SNAPSHOT"))
)
]
class Component:
"""
This is a Project at a particular version -- i.e. a G:A:V.
One POM per component.
"""
def __init__(self, project: Project, version: str):
self.project = project
self.version = version
def __eq__(self, other):
return (
other is not None
and self.project == other.project
and self.version == other.version
)
def __hash__(self):
return hash((self.project, self.version))
def __str__(self):
return coord2str(self.groupId, self.artifactId, self.version)
@property
def env(self) -> Environment:
"""The component's Maven environment."""
return self.project.env
@property
def groupId(self) -> str:
"""The component's groupId."""
return self.project.groupId
@property
def artifactId(self) -> str:
"""The component's artifactId."""
return self.project.artifactId
@property
def path_prefix(self) -> Path:
"""
Relative directory where artifacts of this component are organized.
E.g. org.jruby:jruby-core:9.3.3.0 -> org/jruby/jruby-core/9.3.3.0
"""
return self.project.path_prefix / self.version
def artifact(self, classifier: str = DEFAULT_CLASSIFIER, packaging: str = DEFAULT_PACKAGING) -> "Artifact":
"""
Get an artifact (G:A:P:C:V) associated with this component.
:param classifier: Classifier of the artifact.
:param packaging: Packaging/type of the artifact.
:return:
The Artifact object representing this component
with particular classifier and packaging.
"""
return Artifact(self, classifier, packaging)
def pom(self) -> "POM":
"""
Get a data structure with the contents of the POM.
:return: The POM content.
"""
pom_artifact = self.artifact(packaging="pom")
return POM(pom_artifact.resolve(), self.env)
class Artifact:
"""
This is a Component plus classifier and packaging (G:A:P:C:V).
One file per artifact.
"""
def __init__(self, component: Component, classifier: str = DEFAULT_CLASSIFIER, packaging: str = DEFAULT_PACKAGING):
self.component = component
self.classifier = classifier
self.packaging = packaging
def __eq__(self, other):
return (
other is not None
and self.component == other.component
and self.classifier == other.classifier
and self.packaging == other.packaging
)
def __hash__(self):
return hash((self.component, self.classifier, self.packaging))
def __str__(self):
return coord2str(self.groupId, self.artifactId, self.version, self.classifier, self.packaging)
@property
def env(self) -> Environment:
return self.component.env
@property
def groupId(self) -> str:
"""The artifact's groupId."""
return self.component.groupId
@property
def artifactId(self) -> str:
"""The artifact's artifactId."""
return self.component.artifactId
@property
def version(self) -> str:
"""The artifact's version."""
return self.component.version
@property
def filename(self) -> str:
"""
Filename portion of the artifact path. E.g.:
- g=org.python a=jython v=2.7.0 -> jython-2.7.0.jar
- g=org.lwjgl a=lwjgl v=3.3.1 c=natives-linux -> lwjgl-3.3.1-natives-linux.jar
- g=org.scijava a=scijava-common v=2.94.2 p=pom -> scijava-common-2.94.2.pom
"""
classifier_suffix = f"-{self.classifier}" if self.classifier else ""
return f"{self.artifactId}-{self.version}{classifier_suffix}.{self.packaging}"
@property
def cached_path(self) -> Optional[Path]:
"""
Path to the artifact in the linked environment's local repository cache.
Might not actually exist! This just returns where it *would be* if present.
"""
return (
self.env.repo_cache / self.component.path_prefix / self.filename
if self.env.repo_cache
else None
)
def resolve(self) -> Path:
"""
Resolve a local path to the artifact, downloading it as needed:
1. If present in the linked local repository cache, use that path.
2. Else if present in a linked locally available repository storage directory, use that path.
3. Otherwise, invoke the environment's resolver to download it.
"""
# Check Maven local repository cache first if available.
cached_file = self.cached_path
if cached_file and cached_file.exists(): return cached_file
# Check any locally available Maven repository storage directories.
for base in self.env.local_repos:
# TODO: Be smarter than this when version is a SNAPSHOT,
# because local repo storage has timestamped SNAPSHOT filenames.
p = base / self.component.path_prefix / self.filename
if p.exists(): return p
# Artifact was not found locally; need to download it.
return self.env.resolver.download(self)
def md5(self) -> str:
"""Compute the MD5 hash of the artifact."""
return self._checksum("md5", md5)
def sha1(self) -> str:
"""Compute the SHA1 hash of the artifact."""
return self._checksum("sha1", sha1)
def _checksum(self, suffix, func):
p = self.resolve()
checksum_path = p.parent / f"{p.name}.{suffix}"
return text(checksum_path) or func(binary(p)).hexdigest()
class Dependency:
"""
This is an Artifact with scope, optional flag, and exclusions list.
"""
def __init__(
self,
artifact: Artifact,
scope: str = None,
optional: bool = False,
exclusions: Iterable[Project] = None
):
if scope is None: scope = "test" if artifact.classifier == "tests" else "compile"
self.artifact = artifact
self.scope = scope
self.optional = optional
self.exclusions: Tuple[Project] = tuple() if exclusions is None else tuple(exclusions)
def __str__(self):
return coord2str(self.groupId, self.artifactId, self.version, self.classifier, self.type, self.scope, self.optional)
@property
def env(self) -> Environment:
"""The dependency's Maven environment."""
return self.artifact.env
@property
def groupId(self) -> str:
"""The dependency's groupId."""
return self.artifact.groupId
@property
def artifactId(self) -> str:
"""The dependency's artifactId."""
return self.artifact.artifactId
@property
def version(self) -> str:
"""The dependency's version."""
return self.artifact.version
@property
def classifier(self) -> str:
"""The dependency's classifier."""
return self.artifact.classifier
@property
def type(self) -> str:
"""The dependency's packaging/type."""
return self.artifact.packaging
def set_version(self, version: str) -> None:
"""
Alter the dependency's version.
:param version: The new version to use.
"""
assert isinstance(version, str)
self.artifact.component.version = version
class XML:
def __init__(self, source: Union[str, Path], env: Optional[Environment] = None):
self.source = source
self.env: Environment = env or Environment()
self.tree: ElementTree.ElementTree = (
ElementTree.ElementTree(ElementTree.fromstring(source))
if isinstance(source, str)
else ElementTree.parse(source)
)
XML._strip_ns(self.tree.getroot())
def dump(self, el: ElementTree.Element = None) -> str:
"""
Get a string representation of the given XML element.
:param el: Element to stringify, or None to stringify the root node.
:return: The XML as a string.
"""
# NB: Be careful: childless ElementTree.Element objects are falsy!
if el is None: el = self.tree.getroot()
return ElementTree.tostring(el).decode()
def elements(self, path: str) -> List[ElementTree.Element]:
return self.tree.findall(path)
def element(self, path: str) -> Optional[ElementTree.Element]:
els = self.elements(path)
assert len(els) <= 1
return els[0] if els else None
def values(self, path: str) -> List[str]:
return [el.text for el in self.elements(path)]
def value(self, path: str) -> Optional[str]:
el = self.element(path)
# NB: Be careful: childless ElementTree.Element objects are falsy!
return None if el is None else el.text
@staticmethod
def _strip_ns(el: ElementTree.Element) -> None:
"""
Remove namespace prefixes from elements and attributes.
Credit: https://stackoverflow.com/a/32552776/1207769
"""
if el.tag.startswith("{"):
el.tag = el.tag[el.tag.find("}") + 1:]
for k in list(el.attrib.keys()):
if k.startswith("{"):
k2 = k[k.find("}") + 1:]
el.attrib[k2] = el.attrib[k]
del el.attrib[k]
for child in el:
XML._strip_ns(child)
class POM(XML):
"""
Convenience wrapper around a Maven POM XML document.
"""
def artifact(self) -> Artifact:
"""
Get an Artifact object representing this POM.
"""
project = self.env.project(self.groupId, self.artifactId)
return project.at_version(self.version).artifact(packaging="pom")
def parent(self) -> Optional["POM"]:
"""
Get POM data for this POM's parent POM, or None if no parent is declared.
"""
if not self.element("parent"): return None
g = self.value("parent/groupId")
a = self.value("parent/artifactId")
v = self.value("parent/version")
assert g and a and v
relativePath = self.value("parent/relativePath")
if (
isinstance(self.source, Path) and
relativePath and
(parent_path := self.source / relativePath).exists()
):
# Use locally available parent POM file.
parent_pom = POM(parent_path, self.env)
if (
g == parent_pom.groupId and
a == parent_pom.artifactId and
v == parent_pom.version
):
return parent_pom
pom_artifact = self.env.project(g, a).at_version(v).artifact(packaging="pom")
return POM(pom_artifact.resolve(), self.env)
@property
def groupId(self) -> Optional[str]:
"""The POM's <groupId> (or <parent><groupId>) value."""
return self.value("groupId") or self.value("parent/groupId")
@property
def artifactId(self) -> Optional[str]:
"""The POM's <artifactId> value."""
return self.value("artifactId")
@property
def version(self) -> Optional[str]:
"""The POM's <version> (or <parent><version>) value."""
return self.value("version") or self.value("parent/version")
@property
def name(self) -> Optional[str]:
"""The POM's <name> value."""
return self.value("name")
@property
def description(self) -> Optional[str]:
"""The POM's <description> value."""
return self.value("description")
@property
def scmURL(self) -> Optional[str]:
"""The POM's <scm><url> value."""
return self.value("scm/url")
@property
def issuesURL(self) -> Optional[str]:
"""The POM's <issueManagement><url> value."""
return self.value("issueManagement/url")
@property
def ciURL(self) -> Optional[str]:
"""The POM's <ciManagement><url> value."""
return self.value("ciManagement/url")
@property
def developers(self) -> List[Dict[str, Any]]:
"""Dictionary of the POM's <developer> entries."""
return self._people("developers/developer")
@property
def contributors(self) -> List[Dict[str, Any]]:
"""Dictionary of the POM's <contributor> entries."""
return self._people("contributors/contributor")
def _people(self, path: str) -> List[Dict[str, Any]]:
people = []
for el in self.elements(path):
person: Dict[str, Any] = {}
for child in el:
if len(child) == 0:
person[child.tag] = child.text
else:
if child.tag == "properties":
for grand in child:
person[grand.tag] = grand.text
else:
person[child.tag] = [grand.text for grand in child]
people.append(person)
return people
@property
def properties(self) -> Dict[str, str]:
"""Dictionary of key/value pairs from the POM's <properties>."""
return {el.tag: el.text for el in self.elements("properties/*")}
def dependencies(self, managed: bool = False) -> List[Dependency]:
"""
Gets a list of the POM's <dependency> entries,
represented as Dependency objects.
:param managed:
If True, dependency entries will correspond to the POM's
<dependencyManagement> instead of <dependencies>.
:return: The list of Dependency objects.
"""
xpath = "dependencies/dependency"
if managed: xpath = f"dependencyManagement/{xpath}"
return [
self.env.dependency(el)
for el in self.elements(xpath)
]
class Metadata(ABC):
@property
@abstractmethod
def groupId(self) -> Optional[str]: ...
@property
@abstractmethod
def artifactId(self) -> Optional[str]: ...
@property
@abstractmethod
def lastUpdated(self) -> Optional[datetime]: ...
@property
@abstractmethod
def latest(self) -> Optional[str]: ...
@property
@abstractmethod
def versions(self) -> List[str]: ...
@property
@abstractmethod
def lastVersion(self) -> Optional[str]: ...
@property
@abstractmethod
def release(self) -> Optional[str]: ...
class MetadataXML(XML, Metadata):
"""
Convenience wrapper around a maven-metadata.xml document.
"""
@property
def groupId(self) -> Optional[str]:
return self.value("groupId")
@property
def artifactId(self) -> Optional[str]:
return self.value("artifactId")
@property
def lastUpdated(self) -> Optional[datetime]:
value = self.value("versioning/lastUpdated")
return ts2dt(value) if value else None
@property
def latest(self) -> Optional[str]:
# WARNING: The <latest> value is often wrong, for reasons I don't know.
# However, the last <version> under <versions> has the correct value.
# Consider using lastVersion instead of latest.
return self.value("versioning/latest")
@property
def versions(self) -> List[str]:
return self.values("versioning/versions/version")
@property
def lastVersion(self) -> Optional[str]:
return vs[-1] if (vs := self.versions) else None
@property
def release(self) -> Optional[str]:
return self.value("versioning/release")
class Metadatas(Metadata):
"""
A unified Maven metadata combined over a collection of individual Maven metadata.
The typical use case for this class is to aggregate multiple maven-metadata.xml files
describing the same project, across multiple local repository cache and storage directories.
"""
def __init__(self, metadatas: Iterable[Metadata]):
self.metadatas: List[Metadata] = sorted(metadatas, key=lambda m: m.lastUpdated)
for a, b in combinations(self.metadatas, 2):
assert a.groupId == b.groupId and a.artifactId == b.artifactId
@property
def groupId(self) -> Optional[str]:
return self.metadatas[0].groupId if self.metadatas else None
@property
def artifactId(self) -> Optional[str]:
return self.metadatas[0].artifactId if self.metadatas else None
@property
def lastUpdated(self) -> Optional[datetime]:
return self.metadatas[-1].lastUpdated if self.metadatas else None
@property
def latest(self) -> Optional[str]:
return next((m.latest for m in reversed(self.metadatas) if m.latest), None)
@property
def versions(self) -> List[str]:
return [v for m in self.metadatas for v in m.versions]
@property
def lastVersion(self) -> Optional[str]:
return versions[-1] if (versions := self.versions) else None
@property
def release(self) -> Optional[str]:
return next((m.release for m in reversed(self.metadatas) if m.release), None)
# (groupId, artifactId, classifier, type)
GACT = Tuple[str, str, str, str]
class Model:
"""
A minimal Maven metadata model, tracking only dependencies and properties.
"""
def __init__(self, pom: "POM"):
"""
Build a Maven metadata model from the given POM.
:param pom: A source POM from which to extract metadata (e.g. dependencies).
"""
self.env = pom.env
self.gav = f"{pom.groupId}:{pom.artifactId}:{pom.version}"
_log.debug(f"{self.gav}: begin model initialization")
# Transfer raw metadata from POM source to target model.
# For now, we handle only dependencies, dependencyManagement, and properties.
self.deps: Dict[GACT, Dependency] = {}
self.dep_mgmt: Dict[GACT, Dependency] = {}
self.props: Dict[str, str] = {}
self._merge(pom)
# The following steps are adapted from the maven-model-builder:
# https://maven.apache.org/ref/3.3.9/maven-model-builder/
# -- profile activation and injection --
_log.debug(f"{self.gav}: profile activation and injection")
# Compute active profiles.
active_profiles = [
profile
for profile in pom.elements("profiles/profile")
if Model._is_active_profile(profile)
]
# Merge values from the active profiles into the model.
for profile in active_profiles:
profile_dep_els = profile.findall("dependencies/dependency")
profile_deps = [self.env.dependency(el) for el in profile_dep_els]
self._merge_deps(profile_deps)
profile_dep_mgmt_els = profile.findall("dependencyManagement/dependencies/dependency")