Skip to content

Commit

Permalink
Merge pull request #630 from PhilanthropyDataCommons/559-add-worker-q…
Browse files Browse the repository at this point in the history
…ueue

Add worker queue
  • Loading branch information
slifty authored Nov 30, 2023
2 parents 8f82c68 + 00bc5ce commit f800538
Show file tree
Hide file tree
Showing 14 changed files with 501 additions and 165 deletions.
502 changes: 337 additions & 165 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"dotenv": "^16.3.1",
"express": "^4.18.2",
"express-jwt": "^8.4.1",
"graphile-worker": "^0.15.1",
"jwks-rsa": "^3.1.0",
"pino": "^8.16.2",
"pino-http": "^8.5.1",
Expand Down
2 changes: 2 additions & 0 deletions src/database/migrate.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import path from 'path';
import { migrate as pgMigrate } from 'postgres-schema-migrations';
import { runJobQueueMigrations } from '../jobQueue';
import { db } from './db';

export const migrate = async (schema = 'public'): Promise<void> => {
Expand All @@ -10,6 +11,7 @@ export const migrate = async (schema = 'public'): Promise<void> => {
path.resolve(__dirname, 'migrations'),
{ schema },
);
await runJobQueueMigrations();
} finally {
client.release();
}
Expand Down
1 change: 1 addition & 0 deletions src/errors/JobQueueStateError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class JobQueueStateError extends Error {}
8 changes: 8 additions & 0 deletions src/handlers/bulkUploadsHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import {
import {
DatabaseError,
InputValidationError,
NotFoundError,
} from '../errors';
import {
extractPaginationParameters,
} from '../queryParameters';
import { addProcessBulkUploadJob } from '../jobQueue';
import type {
Request,
Response,
Expand Down Expand Up @@ -46,6 +48,12 @@ const createBulkUpload = (
status: BulkUploadStatus.PENDING,
});
const bulkUpload = bulkUploadsQueryResult.rows[0];
if (!bulkUpload) {
throw new NotFoundError('The database did not return an entity after bulk upload creation.');
}
await addProcessBulkUploadJob({
bulkUploadId: bulkUpload.id,
});
res.status(201)
.contentType('application/json')
.send(bulkUpload);
Expand Down
5 changes: 5 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { app } from './app';
import { startJobQueue } from './jobQueue';
import { getLogger } from './logger';

const logger = getLogger(__filename);
Expand All @@ -16,3 +17,7 @@ app.listen(
logger.info(`Server running on http://${host}:${port}`);
},
);

startJobQueue().catch((err) => {
logger.error(err, 'Job queue failed to start');
});
79 changes: 79 additions & 0 deletions src/jobQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import {
Logger,
quickAddJob,
run,
runMigrations,
} from 'graphile-worker';
import { getLogger } from './logger';
import { db } from './database/db';
import { processBulkUpload } from './tasks';
import type { ProcessBulkUploadJobPayload } from './types';

const logger = getLogger(__filename);

enum JobType {
PROCESS_BULK_UPLOAD = 'processBulkUpload',
}

export const jobQueueLogger = new Logger((scope) => (
(level, message, meta) => {
switch (level.valueOf()) {
case 'error':
logger.error({ meta, scope }, message);
break;
case 'warn':
logger.warn({ meta, scope }, message);
break;
case 'info':
logger.info({ meta, scope }, message);
break;
case 'debug':
logger.debug({ meta, scope }, message);
break;
default:
logger.info({ meta, scope }, message);
}
}
));

export const startJobQueue = async () => {
const runner = await run({
logger: jobQueueLogger,
pgPool: db.pool,
concurrency: 5,
noHandleSignals: false,
pollInterval: 1000,
taskList: {
processBulkUpload,
},
});
await runner.promise;
};

export const runJobQueueMigrations = async () => (
runMigrations({
logger: jobQueueLogger,
pgPool: db.pool,
})
);

export const addJob = async (
jobType: JobType,
payload: unknown,
) => (
quickAddJob(
{
logger: jobQueueLogger,
pgPool: db.pool,
},
jobType,
payload,
)
);

export const addProcessBulkUploadJob = async (payload: ProcessBulkUploadJobPayload) => (
addJob(
JobType.PROCESS_BULK_UPLOAD,
payload,
)
);
12 changes: 12 additions & 0 deletions src/tasks/__tests__/processBulkUpload.unit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { getMockJobHelpers } from '../../test/mockGraphileWorker';
import { processBulkUpload } from '../processBulkUpload';
import { InternalValidationError } from '../../errors';

describe('processBulkUpload', () => {
it('should error when passed an invalid payload', async () => {
await expect(processBulkUpload(
{},
getMockJobHelpers(),
)).rejects.toBeInstanceOf(InternalValidationError);
});
});
1 change: 1 addition & 0 deletions src/tasks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './processBulkUpload';
17 changes: 17 additions & 0 deletions src/tasks/processBulkUpload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { isProcessBulkUploadJobPayload } from '../types';
import { InternalValidationError } from '../errors';
import type { JobHelpers } from 'graphile-worker';

export const processBulkUpload = async (
payload: unknown,
helpers: JobHelpers,
): Promise<void> => {
if (!isProcessBulkUploadJobPayload(payload)) {
helpers.logger.debug('Malformed bulk upload job payload', { errors: isProcessBulkUploadJobPayload.errors ?? [] });
throw new InternalValidationError(
'The bulk upload job payload is not properly formed',
isProcessBulkUploadJobPayload.errors ?? [],
);
}
helpers.logger.debug(`Started processBulkUpload Job for Bulk Upload ID ${payload.bulkUploadId}`);
};
11 changes: 11 additions & 0 deletions src/test/integrationSuiteSetup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import {
} from './harnessFunctions';
import { mockJwks } from './mockJwt';

// This mock prevents our queue manager from actually being invoked.
// It's necessary because of the way we leverage PGOPTIONS to specify
// the schema / search path when preparing the test worker to interact
// with specific schemas.
//
// We may eventually want to be able to write tests that interact with the queue
// and we may eventually face issues with blunt mocking of graphile-worker.
// When that happens, we'll need to remove this mock and change the way we're
// setting the schema / path.
jest.mock('graphile-worker');

beforeAll(async () => {
mockJwks.start();
});
Expand Down
6 changes: 6 additions & 0 deletions src/test/mockGraphileWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { jobQueueLogger } from '../jobQueue';
import type { JobHelpers } from 'graphile-worker';

export const getMockJobHelpers = (): JobHelpers => ({
logger: jobQueueLogger,
} as JobHelpers);
20 changes: 20 additions & 0 deletions src/types/ProcessBulkUploadJobPayload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { ajv } from '../ajv';
import type { JSONSchemaType } from 'ajv';

export interface ProcessBulkUploadJobPayload {
bulkUploadId: number;
}

export const processBulkUploadJobPayloadSchema: JSONSchemaType<ProcessBulkUploadJobPayload> = {
type: 'object',
properties: {
bulkUploadId: {
type: 'integer',
},
},
required: [
'bulkUploadId',
],
};

export const isProcessBulkUploadJobPayload = ajv.compile(processBulkUploadJobPayloadSchema);
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export * from './PaginationParametersQuery';
export * from './PlatformProviderResponse';
export * from './PostgresErrorCode';
export * from './PresignedPostRequest';
export * from './ProcessBulkUploadJobPayload';
export * from './Proposal';
export * from './ProposalFieldValue';
export * from './ProposalVersion';
Expand Down

0 comments on commit f800538

Please sign in to comment.