Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rptest: Update decommission_and_add test for CloudV2 part 1 #14867

Merged
merged 4 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions tests/rptest/clients/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,33 @@ def _install(self):
self._kubectl_installed = True
return

def _run(self, cmd):
# Log and run
self._redpanda.logger.info(cmd)
res = subprocess.check_output(cmd)
return res

def run_kube_command(self, kcmd):
# prepare
self._install()
_ssh_prefix = self._ssh_prefix()
_kubectl = ["kubectl", '-n', self._namespace]

# Make it universal for str/list
_kcmd = kcmd if isinstance(kcmd, list) else kcmd.split()
# Format command
cmd = _ssh_prefix + _kubectl + _kcmd
# Log and run
return self._run(cmd)

def exec(self, remote_cmd):
self._install()
ssh_prefix = self._ssh_prefix()
cmd = ssh_prefix + [
'kubectl', 'exec', '-n', self._namespace, '-c', 'redpanda',
f'rp-{self._cluster_id}-0', '--', 'bash', '-c'
] + ['"' + remote_cmd + '"']
self._redpanda.logger.info(cmd)
res = subprocess.check_output(cmd)
return res
return self._run(cmd)

def exists(self, remote_path):
self._install()
Expand Down
34 changes: 20 additions & 14 deletions tests/rptest/redpanda_cloud_tests/high_throughput_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,10 @@ def get_node(self):
idx = random.randrange(len(self.cluster.nodes))
return self.cluster.nodes[idx]

def get_broker_pod(self):
idx = random.randrange(len(self.redpanda.pods))
return self.redpanda.pods[idx]

@cluster(num_nodes=2)
def test_throughput_simple(self):
# create default topics
Expand Down Expand Up @@ -822,15 +826,17 @@ def test_decommission_and_add(self):
self.free_preallocated_nodes()

def stage_decommission_and_add(self):
node, node_id, node_str = self.get_node()
pod = self.get_broker_pod()
pod_id = pod.slot_id
pod_str = pod.name

def topic_partitions_on_node():
def topic_partitions_on_pod():
try:
parts = self.redpanda.partitions(self.topic)
except StopIteration:
return 0
n = sum([
1 if r.account == node.account else 0 for p in parts
1 if r.account == pod.account else 0 for p in parts
for r in p.replicas
])
self.logger.debug(f"Partitions in the node-topic: {n}")
Expand All @@ -839,44 +845,44 @@ def topic_partitions_on_node():
# create default topics
self._create_default_topics()

nt_partitions_before = topic_partitions_on_node()
nt_partitions_before = topic_partitions_on_pod()

self.logger.info(
f"Decommissioning node {node_str}, partitions: {nt_partitions_before}"
f"Decommissioning node {pod_str}, partitions: {nt_partitions_before}"
)
decomm_time = time.monotonic()
admin = self.redpanda._admin
admin.decommission_broker(node_id)
admin.decommission_broker(pod_id)
waiter = NodeDecommissionWaiter(self.redpanda,
node_id,
pod_id,
self.logger,
progress_timeout=120)
waiter.wait_for_removal()
self.redpanda.stop_node(node)
assert topic_partitions_on_node() == 0
self.redpanda.stop_node(pod)
assert topic_partitions_on_pod() == 0
decomm_time = time.monotonic() - decomm_time

self.logger.info(f"Adding a node")
self.redpanda.clean_node(node,
self.redpanda.clean_node(pod,
preserve_logs=True,
preserve_current_install=True)
self.redpanda.start_node(node,
self.redpanda.start_node(pod,
auto_assign_node_id=False,
omit_seeds_on_idx_one=False)
wait_until(self.redpanda.healthy, timeout_sec=600, backoff_sec=1)
new_node_id = self.redpanda.node_id(node, force_refresh=True)
new_node_id = self.redpanda.node_id(pod, force_refresh=True)

self.logger.info(
f"Node added, new node_id: {new_node_id}, waiting for {int(nt_partitions_before/2)} partitions to move there in {int(decomm_time*2)} s"
)
wait_until(
lambda: topic_partitions_on_node() > nt_partitions_before / 2,
lambda: topic_partitions_on_pod() > nt_partitions_before / 2,
timeout_sec=max(120, decomm_time * 2),
backoff_sec=2,
err_msg=
f"{int(nt_partitions_before/2)} partitions failed to move to node {new_node_id} in {max(60, decomm_time*2)} s"
)
self.logger.info(f"{topic_partitions_on_node()} partitions moved")
self.logger.info(f"{topic_partitions_on_pod()} partitions moved")

@cluster(num_nodes=3, log_allow_list=NOS3_LOG_ALLOW_LIST)
def test_cloud_cache_thrash(self):
Expand Down
27 changes: 27 additions & 0 deletions tests/rptest/services/cloud_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
class CloudBroker():
def __init__(self, pod, kubectl, logger) -> None:
self.logger = logger
# Validate
if not isinstance(pod, dict) or pod['kind'] != 'Pod':
self.logger.error("Invalid pod data provided")
# Metadata
self.operating_system = 'k8s'
self._meta = pod['metadata']
self.name = self._meta['name']
self.slot_id = int(
self._meta['labels']['operator.redpanda.com/node-id'])
self.uuid = self._meta['uid']

# Save other data
self._spec = pod['spec']
self._status = pod['status']

# save client
self._kubeclient = kubectl

# Backward compatibility
self.account = self._meta

# Backward compatibility
def ssh_output(self, cmd):
return self._kubeclient.exec(cmd)
79 changes: 25 additions & 54 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from rptest.services.admin import Admin
from rptest.services.redpanda_installer import RedpandaInstaller, VERSION_RE as RI_VERSION_RE, int_tuple as ri_int_tuple
from rptest.services.redpanda_cloud import CloudCluster, CloudTierName, get_config_profile_name
from rptest.services.cloud_broker import CloudBroker
from rptest.services.rolling_restarter import RollingRestarter
from rptest.services.storage import ClusterStorage, NodeStorage, NodeCacheStorage
from rptest.services.storage_failure_injection import FailureInjectionConfig
Expand Down Expand Up @@ -1414,26 +1415,6 @@ class RedpandaServiceCloud(RedpandaServiceK8s):
use `RedpandaServiceCloud`.
"""

# This parameter used in make_redpanda_service
# and multiple other services for detection
# TODO: Update it to GLOBAL_CLOUD_CLUSTER_CONFIG
GLOBAL_CLOUD_OAUTH_URL = 'cloud_oauth_url'
# Deprecated. Left for future reference
# GLOBAL_CLOUD_OAUTH_CLIENT_ID = 'cloud_oauth_client_id'
# GLOBAL_CLOUD_OAUTH_CLIENT_SECRET = 'cloud_oauth_client_secret'
# GLOBAL_CLOUD_OAUTH_AUDIENCE = 'cloud_oauth_audience'
# GLOBAL_CLOUD_API_URL = 'cloud_api_url'
# GLOBAL_CLOUD_CLUSTER_ID = 'cloud_cluster_id'
# GLOBAL_CLOUD_DELETE_CLUSTER = 'cloud_delete_cluster'
# GLOBAL_TELEPORT_AUTH_SERVER = 'cloud_teleport_auth_server'
# GLOBAL_TELEPORT_BOT_TOKEN = 'cloud_teleport_bot_token'
# GLOBAL_CLOUD_CLUSTER_REGION = 'cloud_cluster_region'
# GLOBAL_CLOUD_CLUSTER_PROVIDER = 'cloud_provider'
# GLOBAL_CLOUD_CLUSTER_TYPE = 'cloud_cluster_type'
# GLOBAL_CLOUD_CLUSTER_NETWORK = 'cloud_cluster_network'
# GLOBAL_CLOUD_PEER_VPC_ID = 'cloud_cluster_peer_vpc_id'
# GLOBAL_CLOUD_PEER_OWNER_ID = 'cloud_cluster_peer_owner_id'

GLOBAL_CLOUD_CLUSTER_CONFIG = 'cloud_cluster'

def __init__(self,
Expand Down Expand Up @@ -1479,47 +1460,19 @@ def __init__(self,
# log cloud cluster id
self.logger.debug(f"initial cluster_id: {self._cc_config['id']}")

# Removed in favor to serialization
# self._cloud_oauth_url = context.globals.get(
# self.GLOBAL_CLOUD_OAUTH_URL, None)
# self._cloud_oauth_client_id = context.globals.get(
# self.GLOBAL_CLOUD_OAUTH_CLIENT_ID, None)
# self._cloud_oauth_client_secret = context.globals.get(
# self.GLOBAL_CLOUD_OAUTH_CLIENT_SECRET, None)
# self._cloud_oauth_audience = context.globals.get(
# self.GLOBAL_CLOUD_OAUTH_AUDIENCE, None)
# self._cloud_teleport_proxy = context.globals.get(
# self.GLOBAL_TELEPORT_AUTH_SERVER, None)
# self._cloud_teleport_bot_token = context.globals.get(
# self.GLOBAL_TELEPORT_BOT_TOKEN, None)
# self._cloud_api_url = context.globals.get(self.GLOBAL_CLOUD_API_URL,
# None)
# self._cloud_cluster_id = context.globals.get(
# self.GLOBAL_CLOUD_CLUSTER_ID, '')
# self._cloud_delete_cluster = context.globals.get(
# self.GLOBAL_CLOUD_DELETE_CLUSTER, True)
# self._cloud_cluster_provider = context.globals.get(
# self.GLOBAL_CLOUD_CLUSTER_PROVIDER, "AWS").upper()
# self._cloud_cluster_region = context.globals.get(
# self.GLOBAL_CLOUD_CLUSTER_REGION, "us-west-2")
# self._cloud_cluster_type = context.globals.get(
# self.GLOBAL_CLOUD_CLUSTER_TYPE, "FMC").upper()

# self._cloud_cluster_network = context.globals.get(
# self.GLOBAL_CLOUD_CLUSTER_NETWORK, "public").lower()
# self._cloud_peer_vpc_id = context.globals.get(
# self.GLOBAL_CLOUD_PEER_VPC_ID, None)
# self._cloud_peer_owner_id = context.globals.get(
# self.GLOBAL_CLOUD_PEER_OWNER_ID, None)

# Create cluster class
self._cloud_cluster = CloudCluster(
context,
self.logger,
self._cc_config,
provider_config=self._provider_config)

# Prepare kubectl
self._kubectl = None

# Backward compatibility with RedpandaService
# Fake out sasl_enabled callable
self.sasl_enabled = lambda: True
# Always true for Cloud Cluster
self._dedicated_nodes = True
self.logger.info(
'ResourceSettings: setting dedicated_nodes=True because serving from redpanda cloud'
Expand Down Expand Up @@ -1551,6 +1504,24 @@ def start(self, **kwargs):
tp_proxy=self._cloud_cluster.config.teleport_auth_server,
tp_token=self._cloud_cluster.config.teleport_bot_token)

# Get pods and form node list
self.pods = []
_r = self._kubectl.run_kube_command("get pods -o json")
_pods = json.loads(_r.decode())
for p in _pods['items']:
if not p['metadata']['name'].startswith(
f"rp-{self._cloud_cluster.config.id}"):
continue
else:
_node = CloudBroker(p, self._kubectl, self.logger)
self.pods.append(_node)

def get_node_by_id(self, id):
for p in self.pods:
if p.slot_id == id:
return p
return None

def stop_node(self, node, **kwargs):
pass

Expand Down