Skip to content

Commit

Permalink
WM-2252: Configurable metadata write role (#7225)
Browse files Browse the repository at this point in the history
Co-authored-by: dvoet <[email protected]>
  • Loading branch information
cjllanwarne and dvoet authored Oct 2, 2023
1 parent a83ec6e commit fc9e3cc
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.database.slick

import java.sql.Timestamp

import cats.syntax.functor._
import cats.instances.future._
import com.typesafe.config.{Config, ConfigFactory}
Expand All @@ -10,6 +9,7 @@ import cromwell.database.sql.MetadataSqlDatabase
import cromwell.database.sql.SqlConverters._
import cromwell.database.sql.joins.{CallOrWorkflowQuery, CallQuery, MetadataJobQueryValue, WorkflowQuery}
import cromwell.database.sql.tables.{CustomLabelEntry, InformationSchemaEntry, MetadataEntry, WorkflowMetadataSummaryEntry}
import net.ceedubs.ficus.Ficus._
import slick.basic.DatabasePublisher
import slick.jdbc.{ResultSetConcurrency, ResultSetType}

Expand Down Expand Up @@ -60,6 +60,8 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
import dataAccess.driver.api._
import MetadataSlickDatabase._

lazy val pgLargeObjectWriteRole: Option[String] = originalDatabaseConfig.as[Option[String]]("pgLargeObjectWriteRole")

override def existsMetadataEntries()(implicit ec: ExecutionContext): Future[Boolean] = {
val action = dataAccess.metadataEntriesExists.result
runTransaction(action)
Expand Down Expand Up @@ -87,20 +89,22 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
rootWorkflowIdKey,
labelMetadataKey)

val roleSet = pgLargeObjectWriteRole.map(role => sqlu"""SET ROLE TO "#$role"""")

// These entries also require a write to the summary queue.
def writeSummarizable(): Future[Unit] = if (partitioned.summarizableMetadata.isEmpty) Future.successful(()) else {
val batchesToWrite = partitioned.summarizableMetadata.grouped(insertBatchSize).toList
val insertActions = batchesToWrite.map { batch =>
val insertMetadata = dataAccess.metadataEntryIdsAutoInc ++= batch
insertMetadata.flatMap(ids => writeSummaryQueueEntries(ids))
}
runTransaction(DBIO.sequence(insertActions)).void
runTransaction(DBIO.sequence(roleSet ++ insertActions)).void
}

// Non-summarizable metadata that only needs to go to the metadata table can be written much more efficiently
// than summarizable metadata.
def writeNonSummarizable(): Future[Unit] = if (partitioned.nonSummarizableMetadata.isEmpty) Future.successful(()) else {
val action = DBIO.sequence(partitioned.nonSummarizableMetadata.grouped(insertBatchSize).map(dataAccess.metadataEntries ++= _))
val action = DBIO.sequence(roleSet ++ partitioned.nonSummarizableMetadata.grouped(insertBatchSize).map(dataAccess.metadataEntries ++= _))
runLobAction(action).void
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class WorkflowCallbackActorSpec
private val serviceRegistryActor = TestProbe("testServiceRegistryActor")
private val deathWatch = TestProbe("deathWatch")
private val mockUri = new URI("http://example.com")
private val basicConfig = WorkflowCallbackConfig.empty.copy(enabled = true)
private val basicConfig = WorkflowCallbackConfig.empty.copy(enabled = true).copy(retryBackoff = SimpleExponentialBackoff(100.millis, 200.millis, 1.1))
private val basicOutputs = WomMocks.mockOutputExpectations(List("foo" -> WomString("bar")).toMap)

private val httpSuccess = Future.successful(HttpResponse.apply(StatusCodes.OK))
Expand Down

0 comments on commit fc9e3cc

Please sign in to comment.