Skip to content

Commit

Permalink
fix(tef): add retry for k8s executor and driver pods operations & ran…
Browse files Browse the repository at this point in the history
…dom driver cm name
  • Loading branch information
ejblanco authored and pradomota committed Sep 19, 2023
1 parent ce5ddad commit a3102cf
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 142 deletions.
3 changes: 2 additions & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-scala_2.12</artifactId>
<version>1.17.22</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] object KubernetesClientUtils extends Logging {

val configMapNameExecutor: String = configMapName(s"spark-exec-${KubernetesUtils.uniqueID()}")

val configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}")
def configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}")

private def buildStringFromPropertiesMap(configMapName: String,
propertiesMap: Map[String, String]): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,37 @@ class ExecutorPodsAllocator(

protected val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)


// Retry 300 times waiting 2 seconds (10 minutes)
private def getDriverPodWithRetries(name: String): Option[Pod] = {

def getDriverPod(retriesLeft: Int): Option[Pod] = {
try {
Option(kubernetesClient.pods()
.inNamespace(namespace)
.withName(name)
.get())
} catch {
case e: Throwable if retriesLeft > 0 =>
logWarning(s"Couldn't get Spark Driver pod $name in namespace $namespace" +
s". Trying again in 2 seconds. $retriesLeft retries left.", e)
Thread.sleep(2000)
getDriverPod(retriesLeft - 1)
}
}

getDriverPod(300)
}


val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
s"No pod was found named $name in the cluster in the " +
s"namespace $namespace (this was supposed to be the driver pod.).")))
.map { name =>
getDriverPodWithRetries(name)
.getOrElse(throw new SparkException(
s"No pod was found named $name in the cluster in the " +
s"namespace $namespace (this was supposed to be the driver pod.).")
)
}

// Executor IDs that have been requested from Kubernetes but have not been detected in any
// snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they were created.
Expand Down Expand Up @@ -160,6 +183,31 @@ class ExecutorPodsAllocator(
}
}

// Retry 300 times waiting 2 seconds (10 minutes)
private def createExecutorPodWithRetries(pod: Pod): Pod = {

def createExecutorPod(retriesLeft: Int): Pod = {
try {
kubernetesClient.pods()
.inNamespace(namespace)
.resource(pod)
.create()
} catch {
case e: Throwable if retriesLeft > 0 =>
logWarning(s"Couldn't create Spark Executor pod $pod in namespace $namespace. " +
s"Trying again in 2 seconds. $retriesLeft retries left.", e)
Thread.sleep(2000)
createExecutorPod(retriesLeft - 1)
case e: Throwable =>
throw new SparkException(s"Couldn't create Spark Executor pod $pod " +
s"in namespace $namespace", e)
}
}

createExecutorPod(300)
}


def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong)

private[k8s] def stopApplication(exitCode: Int): Unit = {
Expand Down Expand Up @@ -211,7 +259,7 @@ class ExecutorPodsAllocator(
logWarning(s"Executors with ids ${timedOut.mkString(",")} were not detected in the" +
s" Kubernetes cluster after $podCreationTimeout ms despite the fact that a previous" +
" allocation attempt tried to create them. The executors may have been deleted but the" +
" application missed the deletion event.")
s" application missed the deletion event.")

newlyCreatedExecutors --= timedOut
if (shouldDeleteExecutors) {
Expand Down Expand Up @@ -466,8 +514,7 @@ class ExecutorPodsAllocator(
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
val createdExecutorPod = createExecutorPodWithRetries(podWithAttachedContainer)
try {
addOwnerReference(createdExecutorPod, resources)
resources
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mock-maker-inline
Loading

0 comments on commit a3102cf

Please sign in to comment.