Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker queue #630

Merged
merged 1 commit into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
502 changes: 337 additions & 165 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"dotenv": "^16.3.1",
"express": "^4.18.2",
"express-jwt": "^8.4.1",
"graphile-worker": "^0.15.1",
"jwks-rsa": "^3.1.0",
"pino": "^8.16.2",
"pino-http": "^8.5.1",
Expand Down
2 changes: 2 additions & 0 deletions src/database/migrate.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import path from 'path';
import { migrate as pgMigrate } from 'postgres-schema-migrations';
import { runJobQueueMigrations } from '../jobQueue';
import { db } from './db';

export const migrate = async (schema = 'public'): Promise<void> => {
Expand All @@ -10,6 +11,7 @@ export const migrate = async (schema = 'public'): Promise<void> => {
path.resolve(__dirname, 'migrations'),
{ schema },
);
await runJobQueueMigrations();
} finally {
client.release();
}
Expand Down
1 change: 1 addition & 0 deletions src/errors/JobQueueStateError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class JobQueueStateError extends Error {}

Check warning on line 1 in src/errors/JobQueueStateError.ts

View check run for this annotation

Codecov / codecov/patch

src/errors/JobQueueStateError.ts#L1

Added line #L1 was not covered by tests
8 changes: 8 additions & 0 deletions src/handlers/bulkUploadsHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import {
DatabaseError,
InputValidationError,
NotFoundError,
} from '../errors';
import {
extractPaginationParameters,
} from '../queryParameters';
import { addProcessBulkUploadJob } from '../jobQueue';
import type {
Request,
Response,
Expand Down Expand Up @@ -46,6 +48,12 @@
status: BulkUploadStatus.PENDING,
});
const bulkUpload = bulkUploadsQueryResult.rows[0];
if (!bulkUpload) {
throw new NotFoundError('The database did not return an entity after bulk upload creation.');

Check warning on line 52 in src/handlers/bulkUploadsHandlers.ts

View check run for this annotation

Codecov / codecov/patch

src/handlers/bulkUploadsHandlers.ts#L52

Added line #L52 was not covered by tests
}
await addProcessBulkUploadJob({
bulkUploadId: bulkUpload.id,
});
res.status(201)
.contentType('application/json')
.send(bulkUpload);
Expand Down
5 changes: 5 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { app } from './app';
import { startJobQueue } from './jobQueue';

Check warning on line 2 in src/index.ts

View check run for this annotation

Codecov / codecov/patch

src/index.ts#L2

Added line #L2 was not covered by tests
import { getLogger } from './logger';

const logger = getLogger(__filename);
Expand All @@ -16,3 +17,7 @@
logger.info(`Server running on http://${host}:${port}`);
},
);

startJobQueue().catch((err) => {
logger.error(err, 'Job queue failed to start');

Check warning on line 22 in src/index.ts

View check run for this annotation

Codecov / codecov/patch

src/index.ts#L21-L22

Added lines #L21 - L22 were not covered by tests
});
79 changes: 79 additions & 0 deletions src/jobQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import {
Logger,
quickAddJob,
run,
runMigrations,
} from 'graphile-worker';
import { getLogger } from './logger';
import { db } from './database/db';
import { processBulkUpload } from './tasks';
import type { ProcessBulkUploadJobPayload } from './types';

const logger = getLogger(__filename);

enum JobType {
PROCESS_BULK_UPLOAD = 'processBulkUpload',
}

export const jobQueueLogger = new Logger((scope) => (
(level, message, meta) => {

Check warning on line 19 in src/jobQueue.ts

View check run for this annotation

Codecov / codecov/patch

src/jobQueue.ts#L19

Added line #L19 was not covered by tests
switch (level.valueOf()) {
case 'error':
logger.error({ meta, scope }, message);
break;
case 'warn':
logger.warn({ meta, scope }, message);
break;
case 'info':
logger.info({ meta, scope }, message);
break;
case 'debug':
logger.debug({ meta, scope }, message);
break;
default:
logger.info({ meta, scope }, message);

Check warning on line 34 in src/jobQueue.ts

View check run for this annotation

Codecov / codecov/patch

src/jobQueue.ts#L21-L34

Added lines #L21 - L34 were not covered by tests
}
}
));

export const startJobQueue = async () => {
const runner = await run({

Check warning on line 40 in src/jobQueue.ts

View check run for this annotation

Codecov / codecov/patch

src/jobQueue.ts#L40

Added line #L40 was not covered by tests
slifty marked this conversation as resolved.
Show resolved Hide resolved
logger: jobQueueLogger,
pgPool: db.pool,
concurrency: 5,
noHandleSignals: false,
pollInterval: 1000,
taskList: {
processBulkUpload,
},
});
await runner.promise;

Check warning on line 50 in src/jobQueue.ts

View check run for this annotation

Codecov / codecov/patch

src/jobQueue.ts#L50

Added line #L50 was not covered by tests
};

export const runJobQueueMigrations = async () => (
runMigrations({
logger: jobQueueLogger,
pgPool: db.pool,
})
);

export const addJob = async (
jobType: JobType,
payload: unknown,
) => (
quickAddJob(
{
logger: jobQueueLogger,
pgPool: db.pool,
},
jobType,
payload,
)
);

export const addProcessBulkUploadJob = async (payload: ProcessBulkUploadJobPayload) => (
addJob(
JobType.PROCESS_BULK_UPLOAD,
payload,
)
);
12 changes: 12 additions & 0 deletions src/tasks/__tests__/processBulkUpload.unit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { getMockJobHelpers } from '../../test/mockGraphileWorker';
import { processBulkUpload } from '../processBulkUpload';
import { InternalValidationError } from '../../errors';

describe('processBulkUpload', () => {
it('should error when passed an invalid payload', async () => {
await expect(processBulkUpload(
{},
getMockJobHelpers(),
)).rejects.toBeInstanceOf(InternalValidationError);
});
});
1 change: 1 addition & 0 deletions src/tasks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './processBulkUpload';
17 changes: 17 additions & 0 deletions src/tasks/processBulkUpload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { isProcessBulkUploadJobPayload } from '../types';
import { InternalValidationError } from '../errors';
import type { JobHelpers } from 'graphile-worker';

export const processBulkUpload = async (
payload: unknown,
helpers: JobHelpers,
): Promise<void> => {
if (!isProcessBulkUploadJobPayload(payload)) {
helpers.logger.debug('Malformed bulk upload job payload', { errors: isProcessBulkUploadJobPayload.errors ?? [] });
throw new InternalValidationError(
jasonaowen marked this conversation as resolved.
Show resolved Hide resolved
'The bulk upload job payload is not properly formed',
isProcessBulkUploadJobPayload.errors ?? [],
);
}
helpers.logger.debug(`Started processBulkUpload Job for Bulk Upload ID ${payload.bulkUploadId}`);

Check warning on line 16 in src/tasks/processBulkUpload.ts

View check run for this annotation

Codecov / codecov/patch

src/tasks/processBulkUpload.ts#L16

Added line #L16 was not covered by tests
};
11 changes: 11 additions & 0 deletions src/test/integrationSuiteSetup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import {
} from './harnessFunctions';
import { mockJwks } from './mockJwt';

// This mock prevents our queue manager from actually being invoked.
// It's necessary because of the way we leverage PGOPTIONS to specify
// the schema / search path when preparing the test worker to interact
// with specific schemas.
//
// We may eventually want to be able to write tests that interact with the queue
// and we may eventually face issues with blunt mocking of graphile-worker.
// When that happens, we'll need to remove this mock and change the way we're
// setting the schema / path.
jest.mock('graphile-worker');

beforeAll(async () => {
mockJwks.start();
});
Expand Down
6 changes: 6 additions & 0 deletions src/test/mockGraphileWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { jobQueueLogger } from '../jobQueue';
import type { JobHelpers } from 'graphile-worker';

export const getMockJobHelpers = (): JobHelpers => ({
Copy link
Member Author

@slifty slifty Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasonaowen this is what I did for mocking job helpers -- we only use the logger helper, and since this is JUST for test code I think it's OK to be a little more "fast and loose" / not try to mock all the other things that JobHelpers can do. Worst case a future test fails and we expand the mock at that point.

logger: jobQueueLogger,
} as JobHelpers);
20 changes: 20 additions & 0 deletions src/types/ProcessBulkUploadJobPayload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { ajv } from '../ajv';
import type { JSONSchemaType } from 'ajv';

export interface ProcessBulkUploadJobPayload {
bulkUploadId: number;
}

export const processBulkUploadJobPayloadSchema: JSONSchemaType<ProcessBulkUploadJobPayload> = {
type: 'object',
properties: {
bulkUploadId: {
type: 'integer',
},
},
required: [
'bulkUploadId',
],
};

export const isProcessBulkUploadJobPayload = ajv.compile(processBulkUploadJobPayloadSchema);
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export * from './PaginationParametersQuery';
export * from './PlatformProviderResponse';
export * from './PostgresErrorCode';
export * from './PresignedPostRequest';
export * from './ProcessBulkUploadJobPayload';
export * from './Proposal';
export * from './ProposalFieldValue';
export * from './ProposalVersion';
Expand Down