Skip to content

Commit

Permalink
Merge pull request #14867 from redpanda-data/cloudv2-14766-support-po…
Browse files Browse the repository at this point in the history
…d-scaling

rptest: Update decommission_and_add test for CloudV2 part 1
  • Loading branch information
savex authored Nov 28, 2023
2 parents 58c96b0 + 2df9119 commit fe2d537
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 71 deletions.
23 changes: 20 additions & 3 deletions tests/rptest/clients/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,33 @@ def _install(self):
self._kubectl_installed = True
return

def _run(self, cmd):
# Log and run
self._redpanda.logger.info(cmd)
res = subprocess.check_output(cmd)
return res

def run_kube_command(self, kcmd):
# prepare
self._install()
_ssh_prefix = self._ssh_prefix()
_kubectl = ["kubectl", '-n', self._namespace]

# Make it universal for str/list
_kcmd = kcmd if isinstance(kcmd, list) else kcmd.split()
# Format command
cmd = _ssh_prefix + _kubectl + _kcmd
# Log and run
return self._run(cmd)

def exec(self, remote_cmd):
self._install()
ssh_prefix = self._ssh_prefix()
cmd = ssh_prefix + [
'kubectl', 'exec', '-n', self._namespace, '-c', 'redpanda',
f'rp-{self._cluster_id}-0', '--', 'bash', '-c'
] + ['"' + remote_cmd + '"']
self._redpanda.logger.info(cmd)
res = subprocess.check_output(cmd)
return res
return self._run(cmd)

def exists(self, remote_path):
self._install()
Expand Down
34 changes: 20 additions & 14 deletions tests/rptest/redpanda_cloud_tests/high_throughput_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,10 @@ def get_node(self):
idx = random.randrange(len(self.cluster.nodes))
return self.cluster.nodes[idx]

def get_broker_pod(self):
idx = random.randrange(len(self.redpanda.pods))
return self.redpanda.pods[idx]

@cluster(num_nodes=2)
def test_throughput_simple(self):
# create default topics
Expand Down Expand Up @@ -899,15 +903,17 @@ def test_decommission_and_add(self):
self.free_preallocated_nodes()

def stage_decommission_and_add(self):
node, node_id, node_str = self.get_node()
pod = self.get_broker_pod()
pod_id = pod.slot_id
pod_str = pod.name

def topic_partitions_on_node():
def topic_partitions_on_pod():
try:
parts = self.redpanda.partitions(self.topic)
except StopIteration:
return 0
n = sum([
1 if r.account == node.account else 0 for p in parts
1 if r.account == pod.account else 0 for p in parts
for r in p.replicas
])
self.logger.debug(f"Partitions in the node-topic: {n}")
Expand All @@ -916,44 +922,44 @@ def topic_partitions_on_node():
# create default topics
self._create_default_topics()

nt_partitions_before = topic_partitions_on_node()
nt_partitions_before = topic_partitions_on_pod()

self.logger.info(
f"Decommissioning node {node_str}, partitions: {nt_partitions_before}"
f"Decommissioning node {pod_str}, partitions: {nt_partitions_before}"
)
decomm_time = time.monotonic()
admin = self.redpanda._admin
admin.decommission_broker(node_id)
admin.decommission_broker(pod_id)
waiter = NodeDecommissionWaiter(self.redpanda,
node_id,
pod_id,
self.logger,
progress_timeout=120)
waiter.wait_for_removal()
self.redpanda.stop_node(node)
assert topic_partitions_on_node() == 0
self.redpanda.stop_node(pod)
assert topic_partitions_on_pod() == 0
decomm_time = time.monotonic() - decomm_time

self.logger.info(f"Adding a node")
self.redpanda.clean_node(node,
self.redpanda.clean_node(pod,
preserve_logs=True,
preserve_current_install=True)
self.redpanda.start_node(node,
self.redpanda.start_node(pod,
auto_assign_node_id=False,
omit_seeds_on_idx_one=False)
wait_until(self.redpanda.healthy, timeout_sec=600, backoff_sec=1)
new_node_id = self.redpanda.node_id(node, force_refresh=True)
new_node_id = self.redpanda.node_id(pod, force_refresh=True)

self.logger.info(
f"Node added, new node_id: {new_node_id}, waiting for {int(nt_partitions_before/2)} partitions to move there in {int(decomm_time*2)} s"
)
wait_until(
lambda: topic_partitions_on_node() > nt_partitions_before / 2,
lambda: topic_partitions_on_pod() > nt_partitions_before / 2,
timeout_sec=max(120, decomm_time * 2),
backoff_sec=2,
err_msg=
f"{int(nt_partitions_before/2)} partitions failed to move to node {new_node_id} in {max(60, decomm_time*2)} s"
)
self.logger.info(f"{topic_partitions_on_node()} partitions moved")
self.logger.info(f"{topic_partitions_on_pod()} partitions moved")

@cluster(num_nodes=3, log_allow_list=NOS3_LOG_ALLOW_LIST)
def test_cloud_cache_thrash(self):
Expand Down
27 changes: 27 additions & 0 deletions tests/rptest/services/cloud_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
class CloudBroker():
def __init__(self, pod, kubectl, logger) -> None:
self.logger = logger
# Validate
if not isinstance(pod, dict) or pod['kind'] != 'Pod':
self.logger.error("Invalid pod data provided")
# Metadata
self.operating_system = 'k8s'
self._meta = pod['metadata']
self.name = self._meta['name']
self.slot_id = int(
self._meta['labels']['operator.redpanda.com/node-id'])
self.uuid = self._meta['uid']

# Save other data
self._spec = pod['spec']
self._status = pod['status']

# save client
self._kubeclient = kubectl

# Backward compatibility
self.account = self._meta

# Backward compatibility
def ssh_output(self, cmd):
return self._kubeclient.exec(cmd)
79 changes: 25 additions & 54 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from rptest.services.admin import Admin
from rptest.services.redpanda_installer import RedpandaInstaller, VERSION_RE as RI_VERSION_RE, int_tuple as ri_int_tuple
from rptest.services.redpanda_cloud import CloudCluster, CloudTierName, get_config_profile_name
from rptest.services.cloud_broker import CloudBroker
from rptest.services.rolling_restarter import RollingRestarter
from rptest.services.storage import ClusterStorage, NodeStorage, NodeCacheStorage
from rptest.services.storage_failure_injection import FailureInjectionConfig
Expand Down Expand Up @@ -1450,26 +1451,6 @@ class RedpandaServiceCloud(RedpandaServiceK8s):
use `RedpandaServiceCloud`.
"""

# This parameter used in make_redpanda_service
# and multiple other services for detection
# TODO: Update it to GLOBAL_CLOUD_CLUSTER_CONFIG
GLOBAL_CLOUD_OAUTH_URL = 'cloud_oauth_url'
# Deprecated. Left for future reference
# GLOBAL_CLOUD_OAUTH_CLIENT_ID = 'cloud_oauth_client_id'
# GLOBAL_CLOUD_OAUTH_CLIENT_SECRET = 'cloud_oauth_client_secret'
# GLOBAL_CLOUD_OAUTH_AUDIENCE = 'cloud_oauth_audience'
# GLOBAL_CLOUD_API_URL = 'cloud_api_url'
# GLOBAL_CLOUD_CLUSTER_ID = 'cloud_cluster_id'
# GLOBAL_CLOUD_DELETE_CLUSTER = 'cloud_delete_cluster'
# GLOBAL_TELEPORT_AUTH_SERVER = 'cloud_teleport_auth_server'
# GLOBAL_TELEPORT_BOT_TOKEN = 'cloud_teleport_bot_token'
# GLOBAL_CLOUD_CLUSTER_REGION = 'cloud_cluster_region'
# GLOBAL_CLOUD_CLUSTER_PROVIDER = 'cloud_provider'
# GLOBAL_CLOUD_CLUSTER_TYPE = 'cloud_cluster_type'
# GLOBAL_CLOUD_CLUSTER_NETWORK = 'cloud_cluster_network'
# GLOBAL_CLOUD_PEER_VPC_ID = 'cloud_cluster_peer_vpc_id'
# GLOBAL_CLOUD_PEER_OWNER_ID = 'cloud_cluster_peer_owner_id'

GLOBAL_CLOUD_CLUSTER_CONFIG = 'cloud_cluster'

def __init__(self,
Expand Down Expand Up @@ -1515,47 +1496,19 @@ def __init__(self,
# log cloud cluster id
self.logger.debug(f"initial cluster_id: {self._cc_config['id']}")

# Removed in favor to serialization
# self._cloud_oauth_url = context.globals.get(
# self.GLOBAL_CLOUD_OAUTH_URL, None)
# self._cloud_oauth_client_id = context.globals.get(
# self.GLOBAL_CLOUD_OAUTH_CLIENT_ID, None)
# self._cloud_oauth_client_secret = context.globals.get(
# self.GLOBAL_CLOUD_OAUTH_CLIENT_SECRET, None)
# self._cloud_oauth_audience = context.globals.get(
# self.GLOBAL_CLOUD_OAUTH_AUDIENCE, None)
# self._cloud_teleport_proxy = context.globals.get(
# self.GLOBAL_TELEPORT_AUTH_SERVER, None)
# self._cloud_teleport_bot_token = context.globals.get(
# self.GLOBAL_TELEPORT_BOT_TOKEN, None)
# self._cloud_api_url = context.globals.get(self.GLOBAL_CLOUD_API_URL,
# None)
# self._cloud_cluster_id = context.globals.get(
# self.GLOBAL_CLOUD_CLUSTER_ID, '')
# self._cloud_delete_cluster = context.globals.get(
# self.GLOBAL_CLOUD_DELETE_CLUSTER, True)
# self._cloud_cluster_provider = context.globals.get(
# self.GLOBAL_CLOUD_CLUSTER_PROVIDER, "AWS").upper()
# self._cloud_cluster_region = context.globals.get(
# self.GLOBAL_CLOUD_CLUSTER_REGION, "us-west-2")
# self._cloud_cluster_type = context.globals.get(
# self.GLOBAL_CLOUD_CLUSTER_TYPE, "FMC").upper()

# self._cloud_cluster_network = context.globals.get(
# self.GLOBAL_CLOUD_CLUSTER_NETWORK, "public").lower()
# self._cloud_peer_vpc_id = context.globals.get(
# self.GLOBAL_CLOUD_PEER_VPC_ID, None)
# self._cloud_peer_owner_id = context.globals.get(
# self.GLOBAL_CLOUD_PEER_OWNER_ID, None)

# Create cluster class
self._cloud_cluster = CloudCluster(
context,
self.logger,
self._cc_config,
provider_config=self._provider_config)

# Prepare kubectl
self._kubectl = None

# Backward compatibility with RedpandaService
# Fake out sasl_enabled callable
self.sasl_enabled = lambda: True
# Always true for Cloud Cluster
self._dedicated_nodes = True
self.logger.info(
'ResourceSettings: setting dedicated_nodes=True because serving from redpanda cloud'
Expand Down Expand Up @@ -1587,6 +1540,24 @@ def start(self, **kwargs):
tp_proxy=self._cloud_cluster.config.teleport_auth_server,
tp_token=self._cloud_cluster.config.teleport_bot_token)

# Get pods and form node list
self.pods = []
_r = self._kubectl.run_kube_command("get pods -o json")
_pods = json.loads(_r.decode())
for p in _pods['items']:
if not p['metadata']['name'].startswith(
f"rp-{self._cloud_cluster.config.id}"):
continue
else:
_node = CloudBroker(p, self._kubectl, self.logger)
self.pods.append(_node)

def get_node_by_id(self, id):
for p in self.pods:
if p.slot_id == id:
return p
return None

def stop_node(self, node, **kwargs):
pass

Expand Down

0 comments on commit fe2d537

Please sign in to comment.