Skip to content

Commit

Permalink
Support a system-installed org token
Browse files Browse the repository at this point in the history
  • Loading branch information
mcg1969 committed Nov 22, 2024
1 parent 7e984bb commit 896af76
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 16 deletions.
49 changes: 45 additions & 4 deletions anaconda_anon_usage/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@

import sys
from collections import namedtuple
from os.path import expanduser, join
from os.path import dirname, exists, expanduser, join

from conda.base import constants as c_constants

from . import __version__
from .utils import _debug, _random_token, _saved_token, cached

Tokens = namedtuple("Tokens", ("version", "client", "session", "environment"))
Tokens = namedtuple("Tokens", ("version", "client", "session", "environment", "system"))
CONFIG_DIR = expanduser("~/.conda")


Expand All @@ -24,6 +26,37 @@ def version_token():
return __version__


@cached
def system_token():
"""
The system token is a token installed into a read-only system
location, presumably by MDM software. If present, it is used
to add an additional "o/..." token to the user agent string
"""
# Do not import SEARCH_PATH directly since we need to
# temporarily patch it for testing
for path in c_constants.SEARCH_PATH:
# Terminate the search at the first encounter
# with a non-system directory, to ensure that
# we use only system directories.
if path.startswith("~"):
break
# Do not use the directories that involve
# environment variables, or .d/ directories
if path.startswith("$") or path.endswith("/"):
continue
path = join(dirname(path), "aau_token")
if exists(path):
try:
_debug("Reading system token: %s", path)
with open(path) as fp:
return fp.read()
except Exception:
_debug("Unable to read system token")
return
_debug("No system token found")


@cached
def client_token():
"""
Expand Down Expand Up @@ -65,7 +98,11 @@ def all_tokens(prefix=None):
Fields: version, client, session, environment
"""
return Tokens(
version_token(), client_token(), session_token(), environment_token(prefix)
version_token(),
client_token(),
session_token(),
environment_token(prefix),
system_token(),
)


Expand All @@ -76,14 +113,18 @@ def token_string(prefix=None, enabled=True):
appended to the conda user agent.
"""
parts = ["aau/" + __version__]
if enabled:
if enabled or system_token():
if not enabled:
_debug("anaconda_anon_usage enabled by system token")
values = all_tokens(prefix)
if values.client:
parts.append("c/" + values.client)
if values.session:
parts.append("s/" + values.session)
if values.environment:
parts.append("e/" + values.environment)
if values.system:
parts.append("o/" + values.system)
else:
_debug("anaconda_anon_usage disabled by config")
result = " ".join(parts)
Expand Down
19 changes: 14 additions & 5 deletions anaconda_anon_usage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
WRITE_DEFER = 1
WRITE_FAIL = 2

# Length of the randomly generated token. There is 6 bits of
# randomness in each character.
TOKEN_LENGTH = 22


def cached(func):
def call_if_needed(*args, **kwargs):
Expand Down Expand Up @@ -73,8 +77,10 @@ def _debug(s, *args, error=False):


def _random_token(what="random"):
data = os.urandom(16)
result = base64.urlsafe_b64encode(data).strip(b"=").decode("ascii")
# base64 encoding captures 6 bits per character.
# Generate enough random bytes to ensure all charaaters are random
data = os.urandom((TOKEN_LENGTH * 6 - 1) // 8 + 1)
result = base64.urlsafe_b64encode(data).decode("ascii")[:TOKEN_LENGTH]
_debug("Generated %s token: %s", what, result)
return result

Expand Down Expand Up @@ -145,7 +151,7 @@ def _deferred_exists(
return token


def _saved_token(fpath, what, must_exist=None):
def _saved_token(fpath, what, must_exist=None, read_only=False):
"""
Implements the saved token functionality. If the specified
file exists, and contains a token with the right format,
Expand All @@ -172,9 +178,12 @@ def _saved_token(fpath, what, must_exist=None):
_debug("Retrieved %s token: %s", what, client_token)
except Exception as exc:
_debug("Unexpected error reading: %s\n %s", fpath, exc, error=True)
if not client_token and read_only:
_debug("Read-only %s token does not exist", what)
return client_token
if len(client_token) < 22:
if len(client_token) > 0:
_debug("Generating longer token")
_debug("Generating longer %s token", what)
client_token = _random_token(what)
status = _write_attempt(must_exist, fpath, client_token, what[0] in WRITE_CHAOS)
if status == WRITE_FAIL:
Expand All @@ -183,6 +192,6 @@ def _saved_token(fpath, what, must_exist=None):
elif status == WRITE_DEFER:
# If the environment has not yet been created we need
# to defer the token write until later.
_debug("Deferring token write")
_debug("Deferring %s token write", what)
DEFERRED.append((must_exist, fpath, client_token, what))
return client_token
21 changes: 21 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import tempfile
from os import remove
from os.path import join

import pytest
from conda.base import constants as c_constants
from conda.base.context import Context, context

from anaconda_anon_usage import tokens, utils
Expand All @@ -12,6 +14,25 @@ def aau_token_path():
return join(tokens.CONFIG_DIR, "aau_token")


@pytest.fixture
def system_token():
with tempfile.TemporaryDirectory() as tname:
tname = tname.replace("\\", "/")
o_path = c_constants.SEARCH_PATH
n_path = (
"/tmp/fake/condarc.d/",
tname + "/.condarc",
tname + "/condarc",
tname + "/condarc.d/",
)
c_constants.SEARCH_PATH = n_path + o_path
rtoken = utils._random_token()
with open(tname + "/aau_token", "w") as fp:
fp.write(rtoken)
yield rtoken
c_constants.SEARCH_PATH = o_path


@pytest.fixture(autouse=True)
def token_cleanup(request, aau_token_path):
def _remove():
Expand Down
31 changes: 24 additions & 7 deletions tests/unit/test_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_token_string():
assert "c/" in token_string
assert "s/" in token_string
assert "e/" in token_string
assert "o/" not in token_string


def test_token_string_disabled():
Expand All @@ -33,43 +34,59 @@ def test_token_string_disabled():
assert "c/" not in token_string
assert "s/" not in token_string
assert "e/" not in token_string
assert "o/" not in token_string


def test_token_string_no_client_token(monkeypatch):
def test_token_string_with_system(system_token):
token_string = tokens.token_string()
assert "o/" + system_token in token_string


def test_token_string_disabled_override_system(system_token):
token_string = tokens.token_string(enabled=False)
assert "o/" + system_token in token_string


def test_token_string_no_client_token(monkeypatch, system_token):
def _mock_saved_token(*args, **kwargs):
return ""

monkeypatch.setattr(tokens, "environment_token", lambda prefix: "env_token")
monkeypatch.setattr(tokens, "_saved_token", lambda fpath, what: "")
monkeypatch.setattr(tokens, "_saved_token", _mock_saved_token)

token_string = tokens.token_string()
assert "c/" not in token_string
assert "s/" in token_string
assert "e/env_token" in token_string
assert "o/" + system_token in token_string


def test_token_string_no_environment_token(
monkeypatch,
):
def test_token_string_no_environment_token(monkeypatch, system_token):
monkeypatch.setattr(tokens, "environment_token", lambda prefix: "")

token_string = tokens.token_string()
assert "c/" in token_string
assert "s/" in token_string
assert "e/" not in token_string
assert "o/" + system_token in token_string


def test_token_string_full_readonly(monkeypatch):
def test_token_string_full_readonly(monkeypatch, system_token):
monkeypatch.setattr(utils, "READ_CHAOS", "ce")
monkeypatch.setattr(utils, "WRITE_CHAOS", "ce")
token_string = tokens.token_string()
assert "c/" not in token_string
assert "s/" in token_string
assert "e/" not in token_string
assert "o/" + system_token in token_string


def test_token_string_env_readonly(monkeypatch):
def test_token_string_env_readonly(monkeypatch, system_token):
monkeypatch.setattr(utils, "READ_CHAOS", "e")
monkeypatch.setattr(utils, "WRITE_CHAOS", "e")

token_string = tokens.token_string()
assert "c/" in token_string
assert "s/" in token_string
assert "e/" not in token_string
assert "o/" + system_token in token_string

0 comments on commit 896af76

Please sign in to comment.