Skip to content

Commit

Permalink
Add job queue infrastructure
Browse files Browse the repository at this point in the history
We want to process bulk uploads asynchronously, and so this means we
need a worker queue.  Since this initial task is going to be infrequent,
we felt it did not warrant adding a new dependency to our stack (e.g.
reddis or rabbitmq), and so we're going to just leverage postgres as our
backend.

I explored two tools for this purpose: pg-boss, and graphile-worker.
Both packages appeared to meet our technical needs, and both seem to be
maintained and similarly popular.  I decided to go with graphile-worker
as it appears to be part of a suite of related tools and the primary
developer appears to have built a community around that suite of tools
(and earns some income to support the development of the project).

Issue #559 Set up a queue / processing system for handling bulk proposal uploads
  • Loading branch information
slifty committed Nov 28, 2023
1 parent 03497c0 commit e014f49
Show file tree
Hide file tree
Showing 11 changed files with 450 additions and 165 deletions.
502 changes: 337 additions & 165 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"dotenv": "^16.3.1",
"express": "^4.18.2",
"express-jwt": "^8.4.1",
"graphile-worker": "^0.15.1",
"jwks-rsa": "^3.1.0",
"pino": "^8.16.2",
"pino-http": "^8.5.1",
Expand Down
8 changes: 8 additions & 0 deletions src/handlers/bulkUploadsHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import {
import {
DatabaseError,
InputValidationError,
NotFoundError,
} from '../errors';
import { addProcessBulkUploadJob } from '../jobQueue';
import type {
Request,
Response,
Expand Down Expand Up @@ -39,6 +41,12 @@ const createBulkUpload = (
status: BulkUploadStatus.PENDING,
});
const bulkUpload = bulkUploadsQueryResult.rows[0];
if (!bulkUpload) {
throw new NotFoundError('The database did not return an entity after bulk upload creation.');

Check warning on line 45 in src/handlers/bulkUploadsHandlers.ts

View check run for this annotation

Codecov / codecov/patch

src/handlers/bulkUploadsHandlers.ts#L45

Added line #L45 was not covered by tests
}
await addProcessBulkUploadJob({
bulkUploadId: bulkUpload.id,
});
res.status(201)
.contentType('application/json')
.send(bulkUpload);
Expand Down
5 changes: 5 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { app } from './app';
import { startJobQueue } from './jobQueue';

Check warning on line 2 in src/index.ts

View check run for this annotation

Codecov / codecov/patch

src/index.ts#L2

Added line #L2 was not covered by tests
import { getLogger } from './logger';

const logger = getLogger(__filename);
Expand All @@ -16,3 +17,7 @@ app.listen(
logger.info(`Server running on http://${host}:${port}`);
},
);

startJobQueue().catch((err) => {
logger.error(err, 'Job queue failed to start');

Check warning on line 22 in src/index.ts

View check run for this annotation

Codecov / codecov/patch

src/index.ts#L21-L22

Added lines #L21 - L22 were not covered by tests
});
40 changes: 40 additions & 0 deletions src/jobQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import {
run,
quickAddJob,
} from 'graphile-worker';
import { processBulkUpload } from './tasks';
import type { ProcessBulkUploadJobPayload } from './types';

enum JobType {
PROCESS_BULK_UPLOAD = 'processBulkUpload',
}

export const startJobQueue = async () => {
const runner = await run({

Check warning on line 13 in src/jobQueue.ts

View check run for this annotation

Codecov / codecov/patch

src/jobQueue.ts#L13

Added line #L13 was not covered by tests
concurrency: 5,
noHandleSignals: false,
pollInterval: 1000,
taskList: {
processBulkUpload,
},
});
await runner.promise;

Check warning on line 21 in src/jobQueue.ts

View check run for this annotation

Codecov / codecov/patch

src/jobQueue.ts#L21

Added line #L21 was not covered by tests
};

const addJob = async (
jobType: JobType,
payload: unknown,
) => {
await quickAddJob(
{},
jobType,
payload,
);
};

export const addProcessBulkUploadJob = async (payload: ProcessBulkUploadJobPayload) => (
addJob(
JobType.PROCESS_BULK_UPLOAD,
payload,
)
);
8 changes: 8 additions & 0 deletions src/tasks/__tests__/processBulkUpload.unit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { processBulkUpload } from '../processBulkUpload';
import { InternalValidationError } from '../../errors';

describe('processBulkUpload', () => {
it('should error when passed an invalid payload', async () => {
await expect(processBulkUpload({})).rejects.toBeInstanceOf(InternalValidationError);
});
});
1 change: 1 addition & 0 deletions src/tasks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './processBulkUpload';
18 changes: 18 additions & 0 deletions src/tasks/processBulkUpload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { getLogger } from '../logger';
import { isProcessBulkUploadJobPayload } from '../types';
import { InternalValidationError } from '../errors';

const logger = getLogger(__filename);

export const processBulkUpload = async (
payload: unknown,
): Promise<void> => {
if (!isProcessBulkUploadJobPayload(payload)) {
logger.debug(isProcessBulkUploadJobPayload.errors, 'Malformed bulk upload job payload');
throw new InternalValidationError(
'The bulk upload job payload is not properly formed',
isProcessBulkUploadJobPayload.errors ?? [],
);
}
logger.debug(`Started processBulkUpload Job for Bulk Upload ID ${payload.bulkUploadId}`);

Check warning on line 17 in src/tasks/processBulkUpload.ts

View check run for this annotation

Codecov / codecov/patch

src/tasks/processBulkUpload.ts#L17

Added line #L17 was not covered by tests
};
11 changes: 11 additions & 0 deletions src/test/integrationSuiteSetup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import {
} from './harnessFunctions';
import { mockJwks } from './mockJwt';

// This mock prevents our queue manager from actually being invoked.
// It's necessary because of the way we leverage PGOPTIONS to specify
// the schema / search path when preparing the test worker to interact
// with specific schemas.
//
// We may eventually want to be able to write tests that interact with the queue
// and we may eventually face issues with blunt mocking of graphile-worker.
// When that happens, we'll need to remove this mock and change the way we're
// setting the schema / path.
jest.mock('graphile-worker');

beforeAll(async () => {
mockJwks.start();
});
Expand Down
20 changes: 20 additions & 0 deletions src/types/ProcessBulkUploadJobPayload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { ajv } from '../ajv';
import type { JSONSchemaType } from 'ajv';

export interface ProcessBulkUploadJobPayload {
bulkUploadId: number;
}

export const processBulkUploadJobPayloadSchema: JSONSchemaType<ProcessBulkUploadJobPayload> = {
type: 'object',
properties: {
bulkUploadId: {
type: 'integer',
},
},
required: [
'bulkUploadId',
],
};

export const isProcessBulkUploadJobPayload = ajv.compile(processBulkUploadJobPayloadSchema);
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export * from './PaginationParametersQuery';
export * from './PlatformProviderResponse';
export * from './PostgresErrorCode';
export * from './PresignedPostRequest';
export * from './ProcessBulkUploadJobPayload';
export * from './Proposal';
export * from './ProposalFieldValue';
export * from './ProposalVersion';
Expand Down

0 comments on commit e014f49

Please sign in to comment.