Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Exit JVM if an error is thrown or the main loop exits #119

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ akka {
loglevel = "INFO"

logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"


logger-startup-timeout = 30s

http.client {

# The time period within which the TCP connecting process must be completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class KeepAliveFramework(settings: KeepAliveFrameWorkSettings, authorization: Op
import TaskState._
def activeTask(status: TaskStatus) = Seq(TASK_STAGING, TASK_STARTING, TASK_RUNNING).contains(status.getState)
// We're only interested in the bad task statuses for our pod
val failedTasks = taskStatuses.filterNot { case (id, status) => activeTask(status) }
val failedTasks = taskStatuses.filterNot { case (taskId, status) => activeTask(status) }
if (failedTasks.nonEmpty) {
logger.info(s"Restarting Pod $id")
val newId = KeepAlivePodSpecHelper.createNewIncarnationId(id)
Expand Down Expand Up @@ -110,6 +110,7 @@ class KeepAliveFramework(settings: KeepAliveFrameWorkSettings, authorization: Op
// We let the framework run "forever"
val result = Await.result(end, Duration.Inf)
logger.warn(s"Framework finished with $result")
Await.result(system.terminate(), Duration.Inf)
}

object KeepAliveFramework {
Expand Down Expand Up @@ -202,7 +203,14 @@ object KeepAliveFramework {
MesosClientSettings.fromConfig(conf).withMasters(Seq(mesosUrl.toURL)),
conf.getInt("keep-alive-framework.tasks-started"),
mesosRole)
new KeepAliveFramework(settings, provider)
try {
new KeepAliveFramework(settings, provider)
} catch {
case _: Throwable => {
Await.result(system.terminate(), Duration.Inf)
sys.exit(1)
}
}
case _ =>
sys.exit(1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import java.util.UUID
import com.mesosphere.usi.core.models.resources.{ResourceType, ScalarRequirement}
import com.mesosphere.usi.core.models.template.{RunTemplate, SimpleRunTemplateFactory}
import com.mesosphere.usi.core.models.{PodId, RunningPodSpec}
import com.typesafe.scalalogging.StrictLogging

/**
* This is a helper object that generates pod specs and snapshots.
*/
object KeepAlivePodSpecHelper {
object KeepAlivePodSpecHelper extends StrictLogging {

val runSpec: RunTemplate = SimpleRunTemplateFactory(
resourceRequirements = List(ScalarRequirement(ResourceType.CPUS, 0.001), ScalarRequirement(ResourceType.MEM, 32)),
Expand All @@ -18,8 +19,9 @@ object KeepAlivePodSpecHelper {
)

def generatePodSpec(): RunningPodSpec = {
val podId = PodId(s"hello-world.${UUID.randomUUID()}.1")
val podId = PodId(s"hello-world_${UUID.randomUUID()}_1")

logger.info(s"Generating PodSpec for '${podId}'")
val podSpec = RunningPodSpec(id = podId, runSpec = runSpec)
podSpec
}
Expand All @@ -28,12 +30,13 @@ object KeepAlivePodSpecHelper {
(1 to numberOfPods).map(_ => generatePodSpec())(collection.breakOut)

def createNewIncarnationId(podId: PodId): PodId = {
val idAndIncarnation = """^(.+\..*)\.(\d+)$""".r
val idAndIncarnation = """^(.+_.*)_(\d+)$""".r
val (podIdWithoutIncarnation, currentIncarnation) = podId.value match {
case idAndIncarnation(id, inc) =>
id -> inc.toLong
case _ => throw new IllegalArgumentException(s"Failed to create new incarnation id for ${podId.value}")
}
PodId(s"$podIdWithoutIncarnation.${currentIncarnation + 1}")
PodId(s"${podIdWithoutIncarnation}_${currentIncarnation + 1}")
}

}
2 changes: 1 addition & 1 deletion mesos-client/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ akka {
# outgoing connection might be closed if no call is issued to Mesos for a while
idle-timeout = infinite
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import java.net.URL
import akka.NotUsed
import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash, Status}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.{Authorization, HttpCredentials}
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Authorization, HttpCredentials}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import akka.util.Timeout
import com.typesafe.scalalogging.StrictLogging

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
import scala.util.{Failure, Success}

/**
Expand All @@ -25,7 +27,7 @@ import scala.util.{Failure, Success}
* @param authorization A [[CredentialsProvider]] if the connection is secured.
*/
case class Session(url: URL, streamId: String, authorization: Option[CredentialsProvider] = None)(
implicit askTimout: Timeout) {
implicit askTimout: Timeout) extends StrictLogging {

/**
* Construct a new [[HttpRequest]] for a serialized Mesos call and a set of authorization, ie session token.
Expand All @@ -52,6 +54,28 @@ case class Session(url: URL, streamId: String, authorization: Option[Credentials
Flow[Array[Byte]].map(createPostRequest(_, None)).via(connection)
}

private def poolSettings():Future[ConnectionPoolSettings] = Future {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this being a future?

// Constructs the connection pool settings with defaults and overrides the max connections and pipelining limit so
// that only one request at a time is processed. See https://doc.akka.io/docs/akka-http/current/configuration.html
// for details.
// *IMPORTANT*: DO NOT CHANGE maxConnections OR pipeliningLimit! Otherwise, USI won't guarantee request order to Mesos!
ConnectionPoolSettings("").withMaxConnections(1).withPipeliningLimit(1)
}(ExecutionContext.global)

private def poolSettingsWithTimeout():ConnectionPoolSettings = {
// We create the pool settings with a timeout: It seems by default, the creation of the settings requires a reverse
// dns lookup, which may take more than 5 seconds which triggers the akka streams subscription timeout.
// If we let the creation here take the full time, we won't start without any reasonable logging message
try {
Await.result(poolSettings(), 2.seconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this. I'm sorry. Why can we not document the fact that apps should increase the Akka stream subscription timeout? It feels we are just introducing complexity here.

} catch {
case e: TimeoutException => {
logger.error("Failed to create ConnectionPoolSettings. Is your reverse DNS lookup slow?", e)
throw e
}
}
}

/**
* A connection flow factory that will create a flow that only processes one request at a time.
*
Expand All @@ -60,19 +84,15 @@ case class Session(url: URL, streamId: String, authorization: Option[Credentials
* @return A connection flow for single requests.
*/
private def connection(implicit system: ActorSystem, mat: Materializer): Flow[HttpRequest, HttpResponse, NotUsed] = {
// Constructs the connection pool settings with defaults and overrides the max connections and pipelining limit so
// that only one request at a time is processed. See https://doc.akka.io/docs/akka-http/current/configuration.html
// for details.
// *IMPORTANT*: DO NOT CHANGE maxConnections OR pipeliningLimit! Otherwise, USI won't guarantee request order to Mesos!
val poolSettings = ConnectionPoolSettings("").withMaxConnections(1).withPipeliningLimit(1)
val settings = poolSettingsWithTimeout()

Flow[HttpRequest]
.map(_ -> NotUsed)
.via(if (Session.isSecured(url)) {
Http()
.newHostConnectionPoolHttps(host = url.getHost, port = Session.effectivePort(url), settings = poolSettings)
.newHostConnectionPoolHttps(host = url.getHost, port = Session.effectivePort(url), settings = settings)
} else {
Http().newHostConnectionPool(host = url.getHost, port = Session.effectivePort(url), settings = poolSettings)
Http().newHostConnectionPool(host = url.getHost, port = Session.effectivePort(url), settings = settings)
})
.map {
case (Success(response), NotUsed) => response
Expand Down