diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 11420fe7fa76c..61ba640291902 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -117,7 +117,8 @@ org.mockito - mockito-core + mockito-scala_2.12 + 1.17.22 test diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala index dc52babc5f32e..d9ee575e46ad1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala @@ -43,7 +43,7 @@ private[spark] object KubernetesClientUtils extends Logging { val configMapNameExecutor: String = configMapName(s"spark-exec-${KubernetesUtils.uniqueID()}") - val configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}") + def configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}") private def buildStringFromPropertiesMap(configMapName: String, propertiesMap: Map[String, String]): String = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 25970e918ec42..8151ee34b19ff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -90,14 +90,37 @@ class ExecutorPodsAllocator( protected val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) + + // Retry 300 times waiting 2 seconds (10 minutes) + private def getDriverPodWithRetries(name: String): Option[Pod] = { + + def getDriverPod(retriesLeft: Int): Option[Pod] = { + try { + Option(kubernetesClient.pods() + .inNamespace(namespace) + .withName(name) + .get()) + } catch { + case e: Throwable if retriesLeft > 0 => + logWarning(s"Couldn't get Spark Driver pod $name in namespace $namespace" + + s". Trying again in 2 seconds. $retriesLeft retries left.", e) + Thread.sleep(2000) + getDriverPod(retriesLeft - 1) + } + } + + getDriverPod(300) + } + + val driverPod = kubernetesDriverPodName - .map(name => Option(kubernetesClient.pods() - .inNamespace(namespace) - .withName(name) - .get()) - .getOrElse(throw new SparkException( - s"No pod was found named $name in the cluster in the " + - s"namespace $namespace (this was supposed to be the driver pod.)."))) + .map { name => + getDriverPodWithRetries(name) + .getOrElse(throw new SparkException( + s"No pod was found named $name in the cluster in the " + + s"namespace $namespace (this was supposed to be the driver pod.).") + ) + } // Executor IDs that have been requested from Kubernetes but have not been detected in any // snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they were created. @@ -160,6 +183,31 @@ class ExecutorPodsAllocator( } } + // Retry 300 times waiting 2 seconds (10 minutes) + private def createExecutorPodWithRetries(pod: Pod): Pod = { + + def createExecutorPod(retriesLeft: Int): Pod = { + try { + kubernetesClient.pods() + .inNamespace(namespace) + .resource(pod) + .create() + } catch { + case e: Throwable if retriesLeft > 0 => + logWarning(s"Couldn't create Spark Executor pod $pod in namespace $namespace. " + + s"Trying again in 2 seconds. $retriesLeft retries left.", e) + Thread.sleep(2000) + createExecutorPod(retriesLeft - 1) + case e: Throwable => + throw new SparkException(s"Couldn't create Spark Executor pod $pod " + + s"in namespace $namespace", e) + } + } + + createExecutorPod(300) + } + + def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong) private[k8s] def stopApplication(exitCode: Int): Unit = { @@ -211,7 +259,7 @@ class ExecutorPodsAllocator( logWarning(s"Executors with ids ${timedOut.mkString(",")} were not detected in the" + s" Kubernetes cluster after $podCreationTimeout ms despite the fact that a previous" + " allocation attempt tried to create them. The executors may have been deleted but the" + - " application missed the deletion event.") + s" application missed the deletion event.") newlyCreatedExecutors --= timedOut if (shouldDeleteExecutors) { @@ -466,8 +514,7 @@ class ExecutorPodsAllocator( .build() val resources = replacePVCsIfNeeded( podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) - val createdExecutorPod = - kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create() + val createdExecutorPod = createExecutorPodWithRetries(podWithAttachedContainer) try { addOwnerReference(createdExecutorPod, resources) resources diff --git a/resource-managers/kubernetes/core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/resource-managers/kubernetes/core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000000..ca6ee9cea8ec1 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index a813b3a876f87..d31dac82c4152 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -19,9 +19,7 @@ package org.apache.spark.deploy.k8s.submit import java.io.File import java.nio.charset.StandardCharsets import java.nio.file.Files - import scala.collection.JavaConverters._ - import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.api.model.apiextensions.v1.{CustomResourceDefinition, CustomResourceDefinitionBuilder} import io.fabric8.kubernetes.client.{KubernetesClient, Watch} @@ -30,7 +28,6 @@ import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.Mockito.{verify, when} import org.scalatest.BeforeAndAfter import org.scalatestplus.mockito.MockitoSugar._ - import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{Config, _} import org.apache.spark.deploy.k8s.Config.WAIT_FOR_APP_COMPLETION @@ -38,6 +35,7 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.deploy.k8s.submit.Client.submissionId import org.apache.spark.util.Utils +import org.mockito.MockitoSugar.withObjectSpied class ClientSuite extends SparkFunSuite with BeforeAndAfter { @@ -93,7 +91,9 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { private val KEY_TO_PATH = new KeyToPath(SPARK_CONF_FILE_NAME, 420, SPARK_CONF_FILE_NAME) - private def fullExpectedPod(keyToPaths: List[KeyToPath] = List(KEY_TO_PATH)) = + private val driverConfigMapName = KubernetesClientUtils.configMapNameDriver + + private def fullExpectedPod(keyToPaths: List[KeyToPath] = List(KEY_TO_PATH)) = { new PodBuilder(BUILT_DRIVER_POD) .editSpec() .addToContainers(FULL_EXPECTED_CONTAINER) @@ -101,11 +101,12 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { .withName(SPARK_CONF_VOLUME_DRIVER) .withNewConfigMap() .withItems(keyToPaths.asJava) - .withName(KubernetesClientUtils.configMapNameDriver) + .withName(driverConfigMapName) .endConfigMap() .endVolume() .endSpec() .build() + } private def podWithOwnerReference(keyToPaths: List[KeyToPath] = List(KEY_TO_PATH)) = new PodBuilder(fullExpectedPod(keyToPaths)) @@ -193,84 +194,96 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { } test("The client should configure the pod using the builder.") { - val submissionClient = new Client( - kconf, - driverBuilder, - kubernetesClient, - loggingPodStatusWatcher) - submissionClient.run() - verify(podsWithNamespace).resource(fullExpectedPod()) - verify(namedPods).create() + withObjectSpied[KubernetesClientUtils.type] { + when(KubernetesClientUtils.configMapNameDriver).thenReturn(driverConfigMapName) + + val submissionClient = new Client( + kconf, + driverBuilder, + kubernetesClient, + loggingPodStatusWatcher) + submissionClient.run() + verify(podsWithNamespace).resource(fullExpectedPod()) + verify(namedPods).create() + } } test("The client should create Kubernetes resources") { - val submissionClient = new Client( - kconf, - driverBuilder, - kubernetesClient, - loggingPodStatusWatcher) - submissionClient.run() - val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues - assert(otherCreatedResources.size === 2) - val secrets = otherCreatedResources.toArray.filter(_.isInstanceOf[Secret]).toSeq - assert(secrets === ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES) - val configMaps = otherCreatedResources.toArray - .filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap]) - assert(secrets.nonEmpty) - assert(configMaps.nonEmpty) - val configMap = configMaps.head - assert(configMap.getMetadata.getName === - KubernetesClientUtils.configMapNameDriver) - assert(configMap.getImmutable()) - assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME)) - assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf1key=conf1value")) - assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value")) + withObjectSpied[KubernetesClientUtils.type] { + when(KubernetesClientUtils.configMapNameDriver).thenReturn(driverConfigMapName) + + val submissionClient = new Client( + kconf, + driverBuilder, + kubernetesClient, + loggingPodStatusWatcher) + submissionClient.run() + val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues + assert(otherCreatedResources.size === 2) + val secrets = otherCreatedResources.toArray.filter(_.isInstanceOf[Secret]).toSeq + assert(secrets === ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES) + val configMaps = otherCreatedResources.toArray + .filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap]) + assert(secrets.nonEmpty) + assert(configMaps.nonEmpty) + val configMap = configMaps.head + assert(configMap.getMetadata.getName === + KubernetesClientUtils.configMapNameDriver) + assert(configMap.getImmutable()) + assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME)) + assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf1key=conf1value")) + assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value")) + } } test("SPARK-37331: The client should create Kubernetes resources with pre resources") { - val sparkConf = new SparkConf(false) - .set(Config.CONTAINER_IMAGE, "spark-executor:latest") - .set(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS.key, - "org.apache.spark.deploy.k8s.TestStepTwo," + - "org.apache.spark.deploy.k8s.TestStep") - val preResKconf: KubernetesDriverConf = KubernetesTestConf.createDriverConf( - sparkConf = sparkConf, - resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX) - ) - - when(driverBuilder.buildFromFeatures(preResKconf, kubernetesClient)) - .thenReturn(BUILT_KUBERNETES_SPEC_WITH_PRERES) - val submissionClient = new Client( - preResKconf, - driverBuilder, - kubernetesClient, - loggingPodStatusWatcher) - submissionClient.run() - val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues - - // 2 for pre-resource creation/update, 1 for resource creation, 1 for config map - assert(otherCreatedResources.size === 4) - val preRes = otherCreatedResources.toArray - .filter(_.isInstanceOf[CustomResourceDefinition]).toSeq - - // Make sure pre-resource creation/owner reference as expected - assert(preRes.size === 2) - assert(preRes.last === PRE_ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES.head) - - // Make sure original resource and config map process are not affected - val secrets = otherCreatedResources.toArray.filter(_.isInstanceOf[Secret]).toSeq - assert(secrets === ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES) - val configMaps = otherCreatedResources.toArray - .filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap]) - assert(secrets.nonEmpty) - assert(configMaps.nonEmpty) - val configMap = configMaps.head - assert(configMap.getMetadata.getName === - KubernetesClientUtils.configMapNameDriver) - assert(configMap.getImmutable()) - assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME)) - assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf1key=conf1value")) - assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value")) + withObjectSpied[KubernetesClientUtils.type] { + when(KubernetesClientUtils.configMapNameDriver).thenReturn(driverConfigMapName) + + val sparkConf = new SparkConf(false) + .set(Config.CONTAINER_IMAGE, "spark-executor:latest") + .set(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS.key, + "org.apache.spark.deploy.k8s.TestStepTwo," + + "org.apache.spark.deploy.k8s.TestStep") + val preResKconf: KubernetesDriverConf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX) + ) + + when(driverBuilder.buildFromFeatures(preResKconf, kubernetesClient)) + .thenReturn(BUILT_KUBERNETES_SPEC_WITH_PRERES) + val submissionClient = new Client( + preResKconf, + driverBuilder, + kubernetesClient, + loggingPodStatusWatcher) + submissionClient.run() + val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues + + // 2 for pre-resource creation/update, 1 for resource creation, 1 for config map + assert(otherCreatedResources.size === 4) + val preRes = otherCreatedResources.toArray + .filter(_.isInstanceOf[CustomResourceDefinition]).toSeq + + // Make sure pre-resource creation/owner reference as expected + assert(preRes.size === 2) + assert(preRes.last === PRE_ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES.head) + + // Make sure original resource and config map process are not affected + val secrets = otherCreatedResources.toArray.filter(_.isInstanceOf[Secret]).toSeq + assert(secrets === ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES) + val configMaps = otherCreatedResources.toArray + .filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap]) + assert(secrets.nonEmpty) + assert(configMaps.nonEmpty) + val configMap = configMaps.head + assert(configMap.getMetadata.getName === + KubernetesClientUtils.configMapNameDriver) + assert(configMap.getImmutable()) + assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME)) + assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf1key=conf1value")) + assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value")) + } } test("All files from SPARK_CONF_DIR, " + @@ -304,76 +317,88 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { (sparkConf, expectedConfFiles) } - val (sparkConf: SparkConf, expectedConfFiles: Seq[String]) = testSetup + withObjectSpied[KubernetesClientUtils.type] { + when(KubernetesClientUtils.configMapNameDriver).thenReturn(driverConfigMapName) - val expectedKeyToPaths = (expectedConfFiles.map(x => new KeyToPath(x, 420, x)).toList ++ - List(KEY_TO_PATH)).sortBy(x => x.getKey) + val (sparkConf: SparkConf, expectedConfFiles: Seq[String]) = testSetup - when(podsWithNamespace.resource(fullExpectedPod(expectedKeyToPaths))) - .thenReturn(namedPods) - when(namedPods.forceConflicts()).thenReturn(namedPods) - when(namedPods.serverSideApply()).thenReturn(podWithOwnerReference(expectedKeyToPaths)) + val expectedKeyToPaths = (expectedConfFiles.map(x => new KeyToPath(x, 420, x)).toList ++ + List(KEY_TO_PATH)).sortBy(x => x.getKey) - kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf, - resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)) + when(podsWithNamespace.resource(fullExpectedPod(expectedKeyToPaths))) + .thenReturn(namedPods) + when(namedPods.forceConflicts()).thenReturn(namedPods) + when(namedPods.serverSideApply()).thenReturn(podWithOwnerReference(expectedKeyToPaths)) - assert(kconf.sparkConf.getOption("spark.home").isDefined) - when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)).thenReturn(BUILT_KUBERNETES_SPEC) + kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf, + resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)) + + assert(kconf.sparkConf.getOption("spark.home").isDefined) + when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)).thenReturn(BUILT_KUBERNETES_SPEC) - val submissionClient = new Client( - kconf, - driverBuilder, - kubernetesClient, - loggingPodStatusWatcher) - submissionClient.run() - val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues - - val configMaps = otherCreatedResources.toArray - .filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap]) - assert(configMaps.nonEmpty) - val configMapName = KubernetesClientUtils.configMapNameDriver - val configMap: ConfigMap = configMaps.head - assert(configMap.getMetadata.getName == configMapName) - val configMapLoadedFiles = configMap.getData.keySet().asScala.toSet - + val submissionClient = new Client( + kconf, + driverBuilder, + kubernetesClient, + loggingPodStatusWatcher) + submissionClient.run() + val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues + + val configMaps = otherCreatedResources.toArray + .filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap]) + assert(configMaps.nonEmpty) + val configMapName = KubernetesClientUtils.configMapNameDriver + val configMap: ConfigMap = configMaps.head + assert(configMap.getMetadata.getName == configMapName) + val configMapLoadedFiles = configMap.getData.keySet().asScala.toSet - Config.KUBERNETES_NAMESPACE.key - assert(configMapLoadedFiles === expectedConfFiles.toSet ++ Set(SPARK_CONF_FILE_NAME)) - for (f <- configMapLoadedFiles) { - assert(configMap.getData.get(f).contains("conf1key=conf1value")) + assert(configMapLoadedFiles === expectedConfFiles.toSet ++ Set(SPARK_CONF_FILE_NAME)) + for (f <- configMapLoadedFiles) { + assert(configMap.getData.get(f).contains("conf1key=conf1value")) + } } } test("Waiting for app completion should stall on the watcher") { - val submissionClient = new Client( - kconf, - driverBuilder, - kubernetesClient, - loggingPodStatusWatcher) - submissionClient.run() - verify(loggingPodStatusWatcher).watchOrStop(submissionId(kconf.namespace, POD_NAME)) - } + withObjectSpied[KubernetesClientUtils.type] { + when(KubernetesClientUtils.configMapNameDriver).thenReturn(driverConfigMapName) - test("SPARK-42813: Print application info when waitAppCompletion is false") { - val appName = "SPARK-42813" - val logAppender = new LogAppender - withLogAppender(logAppender) { - val sparkConf = new SparkConf(loadDefaults = false) - .set("spark.app.name", appName) - .set(WAIT_FOR_APP_COMPLETION, false) - kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf, - resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)) - when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)) - .thenReturn(BUILT_KUBERNETES_SPEC) val submissionClient = new Client( kconf, driverBuilder, kubernetesClient, loggingPodStatusWatcher) submissionClient.run() + verify(loggingPodStatusWatcher).watchOrStop(submissionId(kconf.namespace, POD_NAME)) + } + } + + test("SPARK-42813: Print application info when waitAppCompletion is false") { + withObjectSpied[KubernetesClientUtils.type] { + when(KubernetesClientUtils.configMapNameDriver).thenReturn(driverConfigMapName) + + val appName = "SPARK-42813" + val logAppender = new LogAppender + withLogAppender(logAppender) { + val sparkConf = new SparkConf(loadDefaults = false) + .set("spark.app.name", appName) + .set(WAIT_FOR_APP_COMPLETION, false) + kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf, + resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)) + when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)) + .thenReturn(BUILT_KUBERNETES_SPEC) + val submissionClient = new Client( + kconf, + driverBuilder, + kubernetesClient, + loggingPodStatusWatcher) + submissionClient.run() + } + val appId = KubernetesTestConf.APP_ID + val sId = submissionId(kconf.namespace, POD_NAME) + assert(logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains( + s"Deployed Spark application $appName with application ID $appId " + + s"and submission ID $sId into Kubernetes")) } - val appId = KubernetesTestConf.APP_ID - val sId = submissionId(kconf.namespace, POD_NAME) - assert(logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains( - s"Deployed Spark application $appName with application ID $appId " + - s"and submission ID $sId into Kubernetes")) } }