From fc9e3cc9ceb081396d3b17027b11ddef14dced93 Mon Sep 17 00:00:00 2001 From: Chris Llanwarne Date: Mon, 2 Oct 2023 10:35:30 -0400 Subject: [PATCH] WM-2252: Configurable metadata write role (#7225) Co-authored-by: dvoet --- .../database/slick/MetadataSlickDatabase.scala | 10 +++++++--- .../finalization/WorkflowCallbackActorSpec.scala | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala b/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala index 0a33e4b6b72..eb87f88d101 100644 --- a/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala +++ b/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala @@ -1,7 +1,6 @@ package cromwell.database.slick import java.sql.Timestamp - import cats.syntax.functor._ import cats.instances.future._ import com.typesafe.config.{Config, ConfigFactory} @@ -10,6 +9,7 @@ import cromwell.database.sql.MetadataSqlDatabase import cromwell.database.sql.SqlConverters._ import cromwell.database.sql.joins.{CallOrWorkflowQuery, CallQuery, MetadataJobQueryValue, WorkflowQuery} import cromwell.database.sql.tables.{CustomLabelEntry, InformationSchemaEntry, MetadataEntry, WorkflowMetadataSummaryEntry} +import net.ceedubs.ficus.Ficus._ import slick.basic.DatabasePublisher import slick.jdbc.{ResultSetConcurrency, ResultSetType} @@ -60,6 +60,8 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config) import dataAccess.driver.api._ import MetadataSlickDatabase._ + lazy val pgLargeObjectWriteRole: Option[String] = originalDatabaseConfig.as[Option[String]]("pgLargeObjectWriteRole") + override def existsMetadataEntries()(implicit ec: ExecutionContext): Future[Boolean] = { val action = dataAccess.metadataEntriesExists.result runTransaction(action) @@ -87,6 +89,8 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config) rootWorkflowIdKey, labelMetadataKey) + val roleSet = pgLargeObjectWriteRole.map(role => sqlu"""SET ROLE TO "#$role"""") + // These entries also require a write to the summary queue. def writeSummarizable(): Future[Unit] = if (partitioned.summarizableMetadata.isEmpty) Future.successful(()) else { val batchesToWrite = partitioned.summarizableMetadata.grouped(insertBatchSize).toList @@ -94,13 +98,13 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config) val insertMetadata = dataAccess.metadataEntryIdsAutoInc ++= batch insertMetadata.flatMap(ids => writeSummaryQueueEntries(ids)) } - runTransaction(DBIO.sequence(insertActions)).void + runTransaction(DBIO.sequence(roleSet ++ insertActions)).void } // Non-summarizable metadata that only needs to go to the metadata table can be written much more efficiently // than summarizable metadata. def writeNonSummarizable(): Future[Unit] = if (partitioned.nonSummarizableMetadata.isEmpty) Future.successful(()) else { - val action = DBIO.sequence(partitioned.nonSummarizableMetadata.grouped(insertBatchSize).map(dataAccess.metadataEntries ++= _)) + val action = DBIO.sequence(roleSet ++ partitioned.nonSummarizableMetadata.grouped(insertBatchSize).map(dataAccess.metadataEntries ++= _)) runLobAction(action).void } diff --git a/engine/src/test/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActorSpec.scala b/engine/src/test/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActorSpec.scala index c469d1dcaa2..97479d348ae 100644 --- a/engine/src/test/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActorSpec.scala +++ b/engine/src/test/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActorSpec.scala @@ -35,7 +35,7 @@ class WorkflowCallbackActorSpec private val serviceRegistryActor = TestProbe("testServiceRegistryActor") private val deathWatch = TestProbe("deathWatch") private val mockUri = new URI("http://example.com") - private val basicConfig = WorkflowCallbackConfig.empty.copy(enabled = true) + private val basicConfig = WorkflowCallbackConfig.empty.copy(enabled = true).copy(retryBackoff = SimpleExponentialBackoff(100.millis, 200.millis, 1.1)) private val basicOutputs = WomMocks.mockOutputExpectations(List("foo" -> WomString("bar")).toMap) private val httpSuccess = Future.successful(HttpResponse.apply(StatusCodes.OK))