Skip to content

Commit

Permalink
[Spark] Refine background jobs (lakesoul-io#565)
Browse files Browse the repository at this point in the history
* add more connections

Signed-off-by: chenxu <[email protected]>

* clean close connection

Signed-off-by: chenxu <[email protected]>

* close connection in clean task

Signed-off-by: chenxu <[email protected]>

* add compaction job name

Signed-off-by: chenxu <[email protected]>

* fix build

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Nov 24, 2024
1 parent cba53a2 commit dd54ddb
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ public static LinkedHashMap<String, String> parsePartitionDesc(String partitionD
public static void fillDataSourceConfig(HikariConfig config) {
config.setConnectionTimeout(10000);
config.setIdleTimeout(10000);
config.setMaximumPoolSize(2);
config.setMaximumPoolSize(8);
config.setKeepaliveTime(5000);
config.setMinimumIdle(0);
config.setMaxLifetime(1800000);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
Expand Down Expand Up @@ -396,4 +397,4 @@ public static long parseMemoryExpression(String memoryExpression) {
return 0;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ object SparkMetaVersion {
dbManager.listTablePathsByNamespace(namespace.mkString("."))
}

def getTableInfo(table_path: String): TableInfo = {
getTableInfo(LakeSoulCatalog.showCurrentNamespace().mkString("."), table_path)
}

def getTableInfo(namespace: String, table_path: String): TableInfo = {
def getTableInfoByPath(table_path: String): TableInfo = {
val path = SparkUtil.makeQualifiedPath(table_path).toUri.toString
val info = dbManager.getTableInfoByPath(path)
if (info == null) {
Expand All @@ -105,7 +101,7 @@ object SparkMetaVersion {
case _ => -1
}
TableInfo(
namespace,
info.getTableNamespace,
Some(table_path),
info.getTableId,
info.getTableSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@

package com.dmetasoul.lakesoul.spark.clean

import com.dmetasoul.lakesoul.meta.DBConnector
import com.dmetasoul.lakesoul.spark.ParametersTool
import org.apache.spark.sql.SparkSession
import com.dmetasoul.lakesoul.spark.clean.CleanUtils.sqlToDataframe
import com.dmetasoul.lakesoul.spark.clean.CleanUtils.{executeMetaSql, sqlToDataframe}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession

import java.time.{LocalDateTime, Period, ZoneId}
import java.util.TimeZone

object CleanExpiredData {

private val conn = DBConnector.getConn
var serverTimeZone = TimeZone.getDefault.getID
private var defaultPartitionTTL: Int = -1
private var defaultRedundantTTL: Int = -1
Expand Down Expand Up @@ -206,8 +204,7 @@ object CleanExpiredData {
| partition_desc='$partitionDesc')
|""".stripMargin

val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def cleanSinglePartitionInfo(tableId: String, partitionDesc: String, deadTimestamp: Long): Unit = {
Expand All @@ -221,8 +218,7 @@ object CleanExpiredData {
|AND
| timestamp < $deadTimestamp
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def getLatestCommitTimestamp(table_id: String, partitionDesc: String, spark: SparkSession): Long = {
Expand Down Expand Up @@ -299,4 +295,4 @@ object CleanExpiredData {
val expiredDateTimestamp = zeroTime.toInstant().toEpochMilli
expiredDateTimestamp
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ object CleanUtils {
}

def sqlToDataframe(sql: String, spark: SparkSession): DataFrame = {
val stmt = conn.prepareStatement(sql)
val resultSet = stmt.executeQuery()
createResultSetToDF(resultSet, spark)
tryWithResource(DBConnector.getConn) { conn =>
tryWithResource(conn.prepareStatement(sql)) { stmt =>
val resultSet = stmt.executeQuery()
createResultSetToDF(resultSet, spark)
}
}
}

def setTableDataExpiredDays(tablePath: String, expiredDays: Int): Unit = {
Expand All @@ -85,8 +88,7 @@ object CleanUtils {
|SET properties = properties::jsonb || '{"partition.ttl": "$expiredDays"}'::jsonb
|WHERE table_id = (SELECT table_id from table_info where table_path='$tablePath');
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def setCompactionExpiredDays(tablePath: String, expiredDays: Int): Unit = {
Expand All @@ -96,8 +98,7 @@ object CleanUtils {
|SET properties = properties::jsonb || '{"compaction.ttl": "$expiredDays"}'::jsonb
|WHERE table_id = (SELECT table_id from table_info where table_path='$tablePath');
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def setTableOnlySaveOnceCompactionValue(tablePath: String, value: Boolean): Unit = {
Expand All @@ -107,8 +108,7 @@ object CleanUtils {
|SET properties = properties::jsonb || '{"only_save_once_compaction": "$value"}'::jsonb
|WHERE table_id = (SELECT table_id from table_info where table_path='$tablePath');
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def cancelTableDataExpiredDays(tablePath: String): Unit = {
Expand All @@ -118,8 +118,7 @@ object CleanUtils {
|SET properties = properties::jsonb - 'partition.ttl'
|WHERE table_id = (SELECT table_id from table_info where table_path='$tablePath');
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def cancelCompactionExpiredDays(tablePath: String): Unit = {
Expand All @@ -129,8 +128,7 @@ object CleanUtils {
|SET properties = properties::jsonb - 'compaction.ttl'
|WHERE table_id = (SELECT table_id from table_info where table_path='$tablePath');
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def setPartitionInfoTimestamp(tableId: String, timestamp: Long, version: Int): Unit = {
Expand All @@ -141,8 +139,7 @@ object CleanUtils {
|WHERE table_id = '$tableId'
|AND version = $version
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def readPartitionInfo(tableId: String, spark: SparkSession): DataFrame = {
Expand All @@ -164,4 +161,17 @@ object CleanUtils {
|""".stripMargin
sqlToDataframe(sql, spark)
}
}

def tryWithResource[R <: AutoCloseable, T](createResource: => R)(f: R => T): T = {
val resource = createResource
try f.apply(resource) finally resource.close()
}

def executeMetaSql(sql: String): Unit = {
tryWithResource(DBConnector.getConn) { conn =>
tryWithResource(conn.prepareStatement(sql)) { stmt =>
stmt.execute()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ trait LakeSoulTableOperations extends AnalysisHelper {
fileNumLimit: Option[Int],
newBucketNum: Option[Int],
fileSizeLimit: Option[Long]): Unit = {
val t = snapshotManagement.getTableInfoOnly
sparkSession.sparkContext.setJobDescription(
s"Compact(${t.namespace}.${t.short_table_name.getOrElse(t.table_path)}/$condition" +
s",f=$force,c=$cleanOldCompaction,n=$fileNumLimit,s=$fileSizeLimit,b=$newBucketNum)")
toDataset(sparkSession, CompactionCommand(
snapshotManagement,
condition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SnapshotManagement(path: String, namespace: String) extends Logging {
def snapshot: Snapshot = currentSnapshot

private def createSnapshot: Snapshot = {
val table_info = SparkMetaVersion.getTableInfo(table_namespace, table_path)
val table_info = SparkMetaVersion.getTableInfoByPath(table_path)
val partition_info_arr = SparkMetaVersion.getAllPartitionInfo(table_info.table_id)

if (table_info.table_schema.isEmpty) {
Expand Down Expand Up @@ -83,7 +83,7 @@ class SnapshotManagement(path: String, namespace: String) extends Logging {
//get table info only
def getTableInfoOnly: TableInfo = {
if (LakeSoulSourceUtils.isLakeSoulTableExists(table_path)) {
SparkMetaVersion.getTableInfo(table_path)
SparkMetaVersion.getTableInfoByPath(table_path)
} else {
val table_id = "table_" + UUID.randomUUID().toString
TableInfo(table_namespace, Some(table_path), table_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class LakeSoulCatalog(val spark: SparkSession) extends TableCatalog
case _ => identifier
}
if (isPathIdentifier(ident)) {
val tableInfo = SparkMetaVersion.getTableInfo(ident.namespace().mkString("."), ident.name())
val tableInfo = SparkMetaVersion.getTableInfoByPath(ident.name())
if (tableInfo == null) {
throw new NoSuchTableException(ident)
}
Expand Down Expand Up @@ -624,7 +624,7 @@ object LakeSoulCatalog {

def listTables(namespaces: Array[String]): Array[Identifier] = {
SparkMetaVersion.listTables(namespaces).asScala.map(tablePath => {
val tableInfo = SparkMetaVersion.getTableInfo(tablePath)
val tableInfo = SparkMetaVersion.getTableInfoByPath(tablePath)
Identifier.of(namespaces, tableInfo.short_table_name.getOrElse(tableInfo.table_path.toUri.toString))
}).toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class LakeSoulSinkSuite extends StreamTest with LakeSoulTestUtils {
(1, 1000), (2, 2000), (3, 3000))

val snapshotManagement = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(outputDir.getCanonicalPath)).toString)
val tableInfo = SparkMetaVersion.getTableInfo(snapshotManagement.table_path)
val tableInfo = SparkMetaVersion.getTableInfoByPath(snapshotManagement.table_path)
assert(tableInfo.hash_column.equals("id")
&& tableInfo.range_column.isEmpty
&& tableInfo.bucket_num == 2)
Expand Down Expand Up @@ -237,7 +237,7 @@ class LakeSoulSinkSuite extends StreamTest with LakeSoulTestUtils {
(1, 1000), (2, 2000), (3, 3000))

val snapshotManagement = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(outputDir.getCanonicalPath)).toString)
val tableInfo = SparkMetaVersion.getTableInfo(snapshotManagement.table_path)
val tableInfo = SparkMetaVersion.getTableInfoByPath(snapshotManagement.table_path)
assert(tableInfo.range_column.equals("id")
&& tableInfo.hash_column.isEmpty
&& tableInfo.bucket_num == -1)
Expand Down Expand Up @@ -285,7 +285,7 @@ class LakeSoulSinkSuite extends StreamTest with LakeSoulTestUtils {
(1, 1, 1000), (2, 2, 2000), (3, 3, 3000))

val snapshotManagement = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(outputDir.getCanonicalPath)).toString)
val tableInfo = SparkMetaVersion.getTableInfo(snapshotManagement.table_path)
val tableInfo = SparkMetaVersion.getTableInfoByPath(snapshotManagement.table_path)
assert(tableInfo.range_column.equals("range")
&& tableInfo.hash_column.equals("hash")
&& tableInfo.bucket_num == 2)
Expand Down

0 comments on commit dd54ddb

Please sign in to comment.