Skip to content

Commit

Permalink
Merge pull request #274 from briefercloud/lock-perf-pubsub
Browse files Browse the repository at this point in the history
improve lock perf by using pubsub
  • Loading branch information
vieiralucas authored Dec 2, 2024
2 parents 3c23d7f + 777d16d commit 2e34d60
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 159 deletions.
276 changes: 150 additions & 126 deletions apps/api/src/lock.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import crypto from 'crypto'
import { v4 as uuidv4 } from 'uuid'
import prisma from '@briefer/database'
import prisma, { publish, subscribe } from '@briefer/database'
import { logger } from './logger.js'
import { z } from 'zod'
import { exhaustiveCheck } from '@briefer/types'

const EXPIRATION_TIME = 1000 * 5 // 5 seconds
const MAX_RETRY_TIMEOUT = 500 // 500ms
const DEFAULT_ACQUIRE_TIMEOUT = Infinity
const RETRY_TIMEOUT = 1000 * 30 // 30 seconds in case pubsub fails
const NUM_PARTITIONS = 32

function getPartition(name: string): number {
const hash = crypto.createHash('md5').update(name).digest('hex')
const hashValue = parseInt(hash.slice(0, 8), 16) // Use first 8 hex characters
return hashValue % NUM_PARTITIONS
}

class AlreadyAcquiredError extends Error {
constructor(public readonly lockName: string) {
Expand All @@ -15,146 +22,163 @@ class AlreadyAcquiredError extends Error {
}
}

export class AcquireLockTimeoutError extends Error {
constructor(
public readonly name: string,
public readonly ownerId: string,
public readonly startTime: number,
public readonly acquireTimeout: number,
public readonly attempt: number
) {
super(
`Failed to acquire lock ${name} with ownerId ${ownerId} after ${acquireTimeout}ms and ${attempt} attempts.`
)
this.name = 'AcquireLockTimeoutError'
}
}

export async function acquireLock<T>(
name: string,
cb: () => Promise<T>,
{ acquireTimeout = DEFAULT_ACQUIRE_TIMEOUT }: { acquireTimeout?: number } = {}
cb: () => Promise<T>
): Promise<T> {
const startTime = Date.now()
const ownerId = uuidv4()
let acquired = false
let attempt = 0
let timeout: NodeJS.Timeout | null = null

const inner = async (attempt: number): Promise<T> => {
if (Date.now() - startTime > acquireTimeout) {
throw new AcquireLockTimeoutError(
name,
ownerId,
startTime,
acquireTimeout,
attempt
)
}
const channel = `lock_releases_${getPartition(name)}`

logger().trace({ name, ownerId, attempt }, 'Acquiring lock')
try {
const lock = await prisma().lock.findFirst({
where: {
name,
},
})
if (!lock) {
// this is safe because if someone else creates the lock in the meantime
// this will raise a unique constraint error that we catch below to retry
await prisma().lock.create({
data: {
name,
isLocked: true,
ownerId,
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
acquiredAt: new Date(),
},
})
} else if (!lock.isLocked || lock.expiresAt < new Date()) {
// this is safe because if someone else updates the lock in the meantime
// this will fail to find the lock to update because expiresAt will be changed
// that will raise a not found error that we catch below to retry
await prisma().lock.update({
return new Promise<T>(async (resolve, reject) => {
const cleanSubscription = await subscribe(channel, async (event) => {
if (acquired) {
return
}

if (event === name) {
logger().trace(
{ name, ownerId, channel },
'Got lock released message. Anticipating lock acquisition attempt'
)
tryAcquire()
}
})

const tryAcquire = async () => {
if (acquired) {
return
}

attempt++
logger().trace({ name, ownerId, attempt, channel }, 'Acquiring lock')
try {
const lock = await prisma().lock.findFirst({
where: {
id: lock.id,
expiresAt: lock.expiresAt,
},
data: {
isLocked: true,
ownerId,
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
acquiredAt: new Date(),
name,
},
})
} else {
// lock is already acquired
throw new AlreadyAcquiredError(name)
}
} catch (err) {
let code = ''
if (err instanceof AlreadyAcquiredError) {
code = 'AlreadyAcquiredError'
} else {
const parsed = z
.object({ code: z.union([z.literal('P2002'), z.literal('P2025')]) })
.safeParse(err)
if (parsed.success) {
switch (parsed.data.code) {
case 'P2002':
code = 'UniqueConstraintError'
break
case 'P2025':
code = 'NotFound'
break
default:
exhaustiveCheck(parsed.data.code)
if (!lock) {
// this is safe because if someone else creates the lock in the meantime
// this will raise a unique constraint error that we catch below to retry
await prisma().lock.create({
data: {
name,
isLocked: true,
ownerId,
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
acquiredAt: new Date(),
},
})
} else if (!lock.isLocked || lock.expiresAt < new Date()) {
// this is safe because if someone else updates the lock in the meantime
// this will fail to find the lock to update because expiresAt will be changed
// that will raise a not found error that we catch below to retry
await prisma().lock.update({
where: {
id: lock.id,
expiresAt: lock.expiresAt,
},
data: {
isLocked: true,
ownerId,
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
acquiredAt: new Date(),
},
})
} else {
// lock is already acquired
throw new AlreadyAcquiredError(name)
}
} catch (err) {
let code = ''
if (err instanceof AlreadyAcquiredError) {
code = 'AlreadyAcquiredError'
} else {
const parsed = z
.object({ code: z.union([z.literal('P2002'), z.literal('P2025')]) })
.safeParse(err)
if (parsed.success) {
switch (parsed.data.code) {
case 'P2002':
code = 'UniqueConstraintError'
break
case 'P2025':
code = 'NotFound'
break
default:
exhaustiveCheck(parsed.data.code)
}
}
}
}

if (code !== '') {
const timeout = Math.min(MAX_RETRY_TIMEOUT, Math.pow(2, attempt) * 100)
logger().trace(
{ name, ownerId, attempt, code, timeout },
'Lock is already acquired. Retrying.'
if (code !== '') {
logger().trace(
{
name,
ownerId,
attempt,
code,
retryTimeout: RETRY_TIMEOUT,
channel,
},
`Lock is already acquired. Retrying in ${RETRY_TIMEOUT}.`
)
if (timeout) {
clearTimeout(timeout)
}
timeout = setTimeout(tryAcquire, RETRY_TIMEOUT)
return
}

logger().error(
{ name, ownerId, channel, err },
'Failed to acquire lock'
)
await new Promise((resolve) => setTimeout(resolve, timeout))
return inner(attempt + 1)
reject(err)
return
}

logger().error({ name, ownerId, err }, 'Failed to acquire lock')
throw err
}

const extendExpirationInterval = setInterval(async () => {
await prisma().lock.updateMany({
where: {
name,
ownerId,
},
data: {
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
},
})
}, EXPIRATION_TIME / 3)
const extendExpirationInterval = setInterval(async () => {
await prisma().lock.updateMany({
where: {
name,
ownerId,
},
data: {
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
},
})
}, EXPIRATION_TIME / 3)

logger().debug({ name, ownerId }, 'Lock acquired')
logger().debug({ name, ownerId, channel }, 'Lock acquired')
acquired = true
await cleanSubscription()

try {
return await cb()
} finally {
logger().trace({ name, ownerId }, 'Releasing lock')
clearInterval(extendExpirationInterval)
await prisma().lock.updateMany({
where: {
name,
ownerId,
},
data: {
isLocked: false,
},
})
logger().debug({ name, ownerId }, 'Lock released')
try {
resolve(await cb())
} catch (err) {
reject(err)
} finally {
logger().trace({ name, ownerId, channel }, 'Releasing lock')
clearInterval(extendExpirationInterval)
await prisma().lock.updateMany({
where: {
name,
ownerId,
},
data: {
isLocked: false,
},
})
await publish(channel, name)
logger().debug({ name, ownerId, channel }, 'Lock released')
}
}
}

return inner(0)
tryAcquire()
})
}
54 changes: 25 additions & 29 deletions apps/api/src/schedule/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,40 +201,36 @@ export async function runSchedule(socketServer: IOServer) {
let stop = false
const loop = new Promise<void>(async (resolve) => {
logger().trace('Acquiring lock to be the schedule executor')
await acquireLock(
'schedule-executor',
async () => {
await acquireLock('schedule-executor', async () => {
if (stop) {
logger().trace(
'Schedule executor lock acquired but server is shutting down'
)
return
}

logger().trace('Schedule executor lock acquired')
while (true) {
if (stop) {
logger().trace(
'Schedule executor lock acquired but server is shutting down'
)
return
break
}

logger().trace('Schedule executor lock acquired')
while (true) {
if (stop) {
break
}

try {
await updateSchedule()
} catch (err) {
logger().error(
{ err, module: 'schedule' },
'Failed to update schedule'
)
}

if (stop) {
break
}
try {
await updateSchedule()
} catch (err) {
logger().error(
{ err, module: 'schedule' },
'Failed to update schedule'
)
}

await new Promise((resolve) => setTimeout(resolve, 5000))
if (stop) {
break
}
},
{ acquireTimeout: Infinity }
)

await new Promise((resolve) => setTimeout(resolve, 5000))
}
})

resolve()
})
Expand Down
3 changes: 1 addition & 2 deletions apps/api/src/yjs/v2/executor/ai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ export class AIExecutor {
}

tick()
}),
{ acquireTimeout: Infinity }
})
)
} catch (err) {
logger().error(
Expand Down
3 changes: 1 addition & 2 deletions apps/api/src/yjs/v2/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ export class Executor {
}

tick()
}),
{ acquireTimeout: Infinity }
})
)
} catch (err) {
logger().error(
Expand Down

0 comments on commit 2e34d60

Please sign in to comment.