Skip to content

Commit

Permalink
Fixing up unmanaged hosts mesh
Browse files Browse the repository at this point in the history
  • Loading branch information
dkmstr committed Oct 3, 2024
1 parent 9e0266d commit c21652f
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 111 deletions.
146 changes: 74 additions & 72 deletions server/src/tests/REST/actor/test_initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,138 +194,140 @@ def test_initialize_unmanaged_by_mac(self) -> None:
"""
Test actor initialize v3 for unmanaged actor
"""
user_service = self.user_service_unmanaged
userservice = self.userservice_unmanaged
actor_token: str = (
user_service.deployed_service.service.token if user_service.deployed_service.service else None
userservice.deployed_service.service.token if userservice.deployed_service.service else None
) or ''

unique_id = user_service.get_unique_id()

if actor_token == '':
self.fail('Service token not found')

success = functools.partial(self.invoke_success, 'unmanaged')
failure = functools.partial(self.invoke_failure, 'unmanaged')

TEST_MAC: typing.Final[str] = '00:00:00:00:00:00'
NONEXISTING_MAC: typing.Final[str] = '00:00:00:00:00:00'
USERSERVICE_MAC: typing.Final[str] = userservice.get_unique_id()

# This will succeed, but only alias token is returned because MAC is not registered by UDS
result = success(
actor_token,
mac=TEST_MAC,
mac=NONEXISTING_MAC,
)

# Unmanaged host is the response for initialization of unmanaged actor ALWAYS
self.assertIsInstance(result['token'], str)
self.assertEqual(result['token'], result['own_token'])
self.assertIsInstance(result['master_token'], str)
self.assertIsNone(result['unique_id'])
self.assertIsNone(result['os'])
self.assertIsNone(result['own_token'])
self.assertIsNone(result['token'])

# Store alias token for later tests
alias_token = result['token']
alias_token = result['master_token']
# Ensure that the alias returned is on alias db, and it points to the same service as the one we belong to
alias = models.ServiceTokenAlias.objects.get(alias=alias_token)
self.assertEqual(alias.service, userservice.deployed_service.service)
self.assertEqual(alias.unique_id, NONEXISTING_MAC.lower())

# If repeated, same token is returned
result = success(
actor_token,
mac=TEST_MAC,
mac=NONEXISTING_MAC,
)
self.assertEqual(result['token'], alias_token)

# Now, invoke a "nice" initialize
self.assertEqual(result['master_token'], alias_token)
# Now, invoke with a correct mac (Exists os user services)
result = success(
actor_token,
mac=unique_id,
mac=USERSERVICE_MAC,
)

token = result['token']

self.assertIsInstance(token, str)
self.assertEqual(token, user_service.uuid)
self.assertEqual(token, result['own_token'])
self.assertEqual(result['unique_id'], unique_id)

# Ensure that the alias returned is on alias db, and it points to the same service as the one we belong to

# Note that due the change of mac, a new alias is created
alias_token = result['master_token']
alias = models.ServiceTokenAlias.objects.get(alias=alias_token)
self.assertEqual(alias.service, user_service.deployed_service.service)

# Now, we should be able to "initialize" with valid mac and with original and alias tokens
# If we call initialize and we get "own-token" means that we have already logged in with this data
result = success(alias_token, mac=unique_id)

self.assertEqual(result['token'], user_service.uuid)
self.assertEqual(result['token'], result['own_token'])
self.assertEqual(result['unique_id'], unique_id)
self.assertEqual(alias.service, userservice.deployed_service.service)
self.assertEqual(alias.unique_id, USERSERVICE_MAC.lower())

self.assertEqual(USERSERVICE_MAC, result['unique_id'])
self.assertEqual(result['own_token'], result['token'])
self.assertEqual(result['token'], userservice.uuid)

# Now, invoke with alias, result shouls be the same
result2 = success(
alias_token,
mac=USERSERVICE_MAC,
)

# master_token should be the same as the alias token
self.assertEqual(result, result2)
#
failure('invalid token', mac=unique_id, expect_forbidden=True)
failure('invalid token', mac=USERSERVICE_MAC, expect_forbidden=True)

def test_initialize_unmanaged_by_ip(self) -> None:
"""
Test actor initialize v3 for unmanaged actor
"""
user_service = services_fixtures.create_db_one_assigned_userservice(
userservice = services_fixtures.create_db_one_assigned_userservice(
self.provider,
self.admins[0],
self.groups,
'unmanaged',
)
# Set an IP as unique_id
unique_id = '1.2.3.4'
user_service.unique_id = unique_id
user_service.save()
USERSERVICE_IP: typing.Final[str] = '1.2.3.4'
userservice.unique_id = USERSERVICE_IP
userservice.save()

actor_token: str = (
user_service.deployed_service.service.token if user_service.deployed_service.service else None
userservice.deployed_service.service.token if userservice.deployed_service.service else None
) or ''

success = functools.partial(self.invoke_success, 'unmanaged', mac='00:00:00:00:00:00')
failure = functools.partial(self.invoke_failure, 'unmanaged', mac='00:00:00:00:00:00')

TEST_IP: typing.Final[str] = '00:00:00:00:00:00'
NONEXISTING_IP: typing.Final[str] = '00:00:00:00:00:00'

# This will succeed, but only alias token is returned because MAC is not registered by UDS
result = success(
actor_token,
ip=TEST_IP,
ip=NONEXISTING_IP,
)

# Unmanaged host is the response for initialization of unmanaged actor ALWAYS
self.assertIsInstance(result['token'], str)
self.assertEqual(result['token'], result['own_token'])
self.assertIsInstance(result['master_token'], str)
self.assertIsNone(result['unique_id'])
self.assertIsNone(result['os'])
self.assertIsNone(result['own_token'])
self.assertIsNone(result['token'])

# Store alias token for later tests
alias_token = result['token']

# If repeated, same token is returned
result = success(
actor_token,
ip=TEST_IP,
)
self.assertEqual(result['token'], alias_token)
alias_token = result['master_token']
# Ensure that the alias returned is on alias db, and it points to the same service as the one we belong to
alias = models.ServiceTokenAlias.objects.get(alias=alias_token)
self.assertEqual(alias.service, userservice.deployed_service.service)
self.assertEqual(alias.unique_id, NONEXISTING_IP.lower())

# Now, invoke a "nice" initialize
# Now, invoke with a correct mac (Exists os user services)
result = success(
actor_token,
ip=unique_id,
mac=USERSERVICE_IP,
)

token = result['token']

self.assertIsInstance(token, str)
self.assertEqual(token, user_service.uuid)
self.assertEqual(token, result['own_token'])
self.assertEqual(result['unique_id'], unique_id)

# Ensure that the alias returned is on alias db, and it points to the same service as the one we belong to

# Note that due the change of mac, a new alias is created
alias_token = result['master_token']
alias = models.ServiceTokenAlias.objects.get(alias=alias_token)
self.assertEqual(alias.service, user_service.deployed_service.service)

# Now, we should be able to "initialize" with valid mac and with original and alias tokens
# If we call initialize and we get "own-token" means that we have already logged in with this data
result = success(alias_token, ip=unique_id)

self.assertEqual(result['token'], user_service.uuid)
self.assertEqual(result['token'], result['own_token'])
self.assertEqual(result['unique_id'], unique_id)
self.assertEqual(alias.service, userservice.deployed_service.service)
self.assertEqual(alias.unique_id, USERSERVICE_IP.lower())

self.assertEqual(USERSERVICE_IP, result['unique_id'])
self.assertEqual(result['own_token'], result['token'])
self.assertEqual(result['token'], userservice.uuid)

# Now, invoke with alias, result shouls be the same
result2 = success(
alias_token,
mac=USERSERVICE_IP,
)

# master_token should be the same as the alias token
self.assertEqual(result, result2)
#
failure('invalid token', ip=unique_id, expect_forbidden=True)
failure('invalid token', mac=USERSERVICE_IP, expect_forbidden=True)
149 changes: 149 additions & 0 deletions server/src/tests/REST/actor/test_unmanaged.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import logging


from uds import models
from uds.core.managers.crypto import CryptoManager
from ...utils import rest


logger = logging.getLogger(__name__)


class ActorUnmanagedTest(rest.test.RESTActorTestCase):
"""
Test actor functionality
"""

def invoke_success(
self,
token: str,
*,
mac: typing.Optional[str] = None,
ip: typing.Optional[str] = None,
) -> dict[str, typing.Any]:
response = self.client.post(
'/uds/rest/actor/v3/unmanaged',
data={
'id': [{'mac': mac or '42:AC:11:22:33', 'ip': ip or '1.2.3.4'}],
'token': token,
'seecret': 'test_secret',
'port': 1234,
},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertIsInstance(data['result'], dict)
return data['result']

def invoke_failure(
self,
token: str,
*,
mac: typing.Optional[str] = None,
ip: typing.Optional[str] = None,
expect_forbidden: bool = False,
) -> dict[str, typing.Any]:
response = self.client.post(
'/uds/rest/actor/v3/unmanaged',
data={
'id': [{'mac': mac or '42:AC:11:22:33', 'ip': ip or '1.2.3.4'}],
'token': token,
'seecret': 'test_secret',
'port': 1234,
},
content_type='application/json',
)
self.assertEqual(response.status_code, 200 if not expect_forbidden else 403)
if expect_forbidden:
return {}

data = response.json()
self.assertIsInstance(data['result'], dict)
return data['result']

def test_unmanaged(self) -> None:
"""
Test actor initialize v3 for unmanaged actor
"""
userservice = self.userservice_unmanaged
actor_token: str = (
userservice.deployed_service.service.token if userservice.deployed_service.service else None
) or ''

if actor_token == '':
self.fail('Service token not found')

TEST_MAC: typing.Final[str] = '00:00:00:00:00:00'

# This will succeed, but only alias token is returned because MAC is not registered by UDS
result = self.invoke_success(
actor_token,
mac=TEST_MAC,
)

# 'private_key': key, # To be removed on 5.0
# 'key': key,
# 'server_certificate': certificate, # To be removed on 5.0
# 'certificate': certificate,
# 'password': password,
# 'ciphers': getattr(settings, 'SECURE_CIPHERS', ''),

self.assertIn('private_key', result)
self.assertIn('key', result)
self.assertIn('server_certificate', result)
self.assertIn('certificate', result)
self.assertIn('password', result)
self.assertIn('ciphers', result)

# Create a token_alias assiciated with the service
token_alias = CryptoManager.manager().random_string(40)
models.ServiceTokenAlias.objects.create(
alias=token_alias,
unique_id=TEST_MAC,
service=userservice.service_pool.service,
)

result2 = self.invoke_success(
actor_token,
mac=TEST_MAC,
)

# Keys showld be different
self.assertIn('private_key', result2)
self.assertIn('key', result2)
self.assertIn('server_certificate', result2)
self.assertIn('certificate', result2)
self.assertIn('password', result2)
self.assertEqual(result['ciphers'], result2['ciphers'])
2 changes: 1 addition & 1 deletion server/src/tests/REST/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_overview(self) -> None:
def test_chart_pool(self) -> None:
# First, create fixtures for the pool
DAYS = 30
for pool in [self.user_service_managed, self.user_service_unmanaged]:
for pool in [self.user_service_managed, self.userservice_unmanaged]:
stats_counters.create_stats_interval_total(
id=pool.deployed_service.id,
counter_type=[counters.types.stats.CounterType.ASSIGNED, counters.types.stats.CounterType.INUSE, counters.types.stats.CounterType.CACHED],
Expand Down
2 changes: 1 addition & 1 deletion server/src/tests/services/proxmox/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def test_get_machines(self) -> None:
self.assertGreaterEqual(len(choices), 1)
for choice in choices:
self.assertIsInstance(choice, dict)
self.assertIsInstance(choice['id'], int)
self.assertIsInstance(choice['id'], str)
self.assertIsInstance(choice['text'], str)

api.get_pool_info.assert_called_once()
Loading

0 comments on commit c21652f

Please sign in to comment.