Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockchain service #2517

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions packages/node-core/src/blockchain.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import { BaseCustomDataSource, BaseDataSource, IProjectNetworkConfig } from '@subql/types-core';
import { DatasourceParams, Header, IBaseIndexerWorker, IBlock, ISubqueryProject } from './indexer';

// TODO probably need to split this in 2 to have a worker specific subset

export interface ICoreBlockchainService<
DS extends BaseDataSource = BaseDataSource,
SubQueryProject extends ISubqueryProject<IProjectNetworkConfig, DS> = ISubqueryProject<IProjectNetworkConfig, DS>
> {
/* The semver of the node */
packageVersion: string;

// Project service
onProjectChange(project: SubQueryProject): Promise<void> | void;
/* Not all networks have a block timestamp, e.g. Shiden */
getBlockTimestamp(height: number): Promise<Date | undefined>;
}

export interface IBlockchainService<
DS extends BaseDataSource = BaseDataSource,
CDS extends DS & BaseCustomDataSource = BaseCustomDataSource & DS,
SubQueryProject extends ISubqueryProject<IProjectNetworkConfig, DS> = ISubqueryProject<IProjectNetworkConfig, DS>,
SafeAPI = any,
LightBlock = any,
FullBlock = any,
Worker extends IBaseIndexerWorker = IBaseIndexerWorker
> extends ICoreBlockchainService<DS, SubQueryProject> {
blockHandlerKind: string;
// TODO SubqueryProject methods

// Block dispatcher service
fetchBlocks(blockNums: number[]): Promise<IBlock<LightBlock>[] | IBlock<FullBlock>[]>; // TODO this probably needs to change to get light block type correct
/* This is the worker equivalent of fetchBlocks, it provides a context to allow syncing anything between workers */
fetchBlockWorker(worker: Worker, blockNum: number, context: { workers: Worker[] }): Promise<Header>;

// Project service
// onProjectChange(project: SubQueryProject): Promise<void> | void;
// /* Not all networks have a block timestamp, e.g. Shiden */
// getBlockTimestamp(height: number): Promise<Date | undefined>;

// Block dispatcher
/* Gets the size of the block, used to calculate a median */
getBlockSize(block: IBlock): number;

// Fetch service
/**
* The finalized header. If the chain doesn't have concrete finalization this could be a probablilistic finalization
* */
getFinalizedHeader(): Promise<Header>;
/**
* Gets the latest height of the chain, this should be unfinalized.
* Or if the chain has instant finalization this would be the same as the finalized height.
* */
getBestHeight(): Promise<number>;
/**
* The chain interval in milliseconds, if it is not consistent then provide a best estimate
* */
getChainInterval(): Promise<number>;

// Unfinalized blocks
getHeaderForHash(hash: string): Promise<Header>;
getHeaderForHeight(height: number): Promise<Header>;

// Dynamic Ds sevice
/**
* Applies and validates parameters to a template DS
* */
updateDynamicDs(params: DatasourceParams, template: DS | CDS): Promise<void>;

isCustomDs: (x: DS | CDS) => x is CDS;
isRuntimeDs: (x: DS | CDS) => x is DS;

// Indexer manager
/**
* Gets an API instance to a specific height so any state queries return data as represented at that height.
* */
getSafeApi(block: LightBlock | FullBlock): Promise<SafeAPI>;
}
1 change: 1 addition & 0 deletions packages/node-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ export * from './indexer';
export * from './subcommands';
export * from './yargs';
export * from './admin';
export * from './blockchain.service';
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type ProcessBlockResponse = {
};

export interface IBlockDispatcher<B> {
init(onDynamicDsCreated: (height: number) => void): Promise<void>;
// now within enqueueBlock should handle getLatestBufferHeight
enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight: number): void | Promise<void>;
queueSize: number;
Expand Down
64 changes: 34 additions & 30 deletions packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
import {IndexerEvent} from '../../events';
import {getBlockHeight, IBlock, PoiSyncService} from '../../indexer';
import {getLogger} from '../../logger';
import {exitWithError, monitorWrite} from '../../process';
import {profilerWrap} from '../../profiler';
import {Queue, AutoQueue, RampQueue, delay, isTaskFlushedError} from '../../utils';
import {StoreService} from '../store.service';
import {IStoreModelProvider} from '../storeModelProvider';
import {IProjectService, ISubqueryProject} from '../types';
import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher';
import { OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Interval } from '@nestjs/schedule';
import { BaseDataSource } from '@subql/types-core';
import { IBlockchainService } from '../../blockchain.service';
import { NodeConfig } from '../../configure';
import { IProjectUpgradeService } from '../../configure/ProjectUpgrade.service';
import { IndexerEvent } from '../../events';
import { getBlockHeight, IBlock, PoiSyncService, StoreService } from '../../indexer';
import { getLogger } from '../../logger';
import { exitWithError, monitorWrite } from '../../process';
import { profilerWrap } from '../../profiler';
import { Queue, AutoQueue, RampQueue, delay, isTaskFlushedError } from '../../utils';
import { IStoreModelProvider } from '../storeModelProvider';
import { IIndexerManager, IProjectService, ISubqueryProject } from '../types';
import { BaseBlockDispatcher } from './base-block-dispatcher';

const logger = getLogger('BlockDispatcherService');

Expand All @@ -24,10 +25,9 @@ type BatchBlockFetcher<B> = (heights: number[]) => Promise<IBlock<B>[]>;
/**
* @description Intended to behave the same as WorkerBlockDispatcherService but doesn't use worker threads or any parallel processing
*/
export abstract class BlockDispatcher<B, DS>
export class BlockDispatcher<B, DS extends BaseDataSource>
extends BaseBlockDispatcher<Queue<IBlock<B> | number>, DS, B>
implements OnApplicationShutdown
{
implements OnApplicationShutdown {
private fetchQueue: AutoQueue<IBlock<B>>;
private processQueue: AutoQueue<void>;

Expand All @@ -36,9 +36,6 @@ export abstract class BlockDispatcher<B, DS>
private fetching = false;
private isShutdown = false;

protected abstract getBlockSize(block: IBlock<B>): number;
protected abstract indexBlock(block: IBlock<B>): Promise<ProcessBlockResponse>;

constructor(
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
Expand All @@ -48,7 +45,8 @@ export abstract class BlockDispatcher<B, DS>
storeModelProvider: IStoreModelProvider,
poiSyncService: PoiSyncService,
project: ISubqueryProject,
fetchBlocksBatches: BatchBlockFetcher<B>
blockchainService: IBlockchainService<DS>,
private indexerManager: IIndexerManager<B, DS>
) {
super(
nodeConfig,
Expand All @@ -63,16 +61,20 @@ export abstract class BlockDispatcher<B, DS>
);
this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process');
this.fetchQueue = new RampQueue(
this.getBlockSize.bind(this),
blockchainService.getBlockSize.bind(this),
nodeConfig.batchSize,
nodeConfig.batchSize * 3,
nodeConfig.timeout,
'Fetch'
);
if (this.nodeConfig.profiler) {
this.fetchBlocksBatches = profilerWrap(fetchBlocksBatches, 'BlockDispatcher', 'fetchBlocksBatches');
this.fetchBlocksBatches = profilerWrap(
blockchainService.fetchBlocks.bind(blockchainService),
'BlockDispatcher',
'fetchBlocksBatches'
);
} else {
this.fetchBlocksBatches = fetchBlocksBatches;
this.fetchBlocksBatches = blockchainService.fetchBlocks.bind(blockchainService);
}
}

Expand Down Expand Up @@ -161,7 +163,10 @@ export abstract class BlockDispatcher<B, DS>
await this.preProcessBlock(header);
monitorWrite(`Processing from main thread`);
// Inject runtimeVersion here to enhance api.at preparation
const processBlockResponse = await this.indexBlock(block);
const processBlockResponse = await this.indexerManager.indexBlock(
block,
await this.projectService.getDataSources(block.getHeader().blockHeight)
);
await this.postProcessBlock(header, processBlockResponse);
//set block to null for garbage collection
(block as any) = null;
Expand All @@ -172,8 +177,7 @@ export abstract class BlockDispatcher<B, DS>
}
logger.error(
e,
`Failed to index block at height ${header.blockHeight} ${
e.handler ? `${e.handler}(${e.stack ?? ''})` : ''
`Failed to index block at height ${header.blockHeight} ${e.handler ? `${e.handler}(${e.stack ?? ''})` : ''
}`
);
throw e;
Expand All @@ -194,7 +198,7 @@ export abstract class BlockDispatcher<B, DS>
// Do nothing, fetching the block was flushed, this could be caused by forked blocks or dynamic datasources
return;
}
exitWithError(new Error(`Failed to enqueue fetched block to process`, {cause: e}), logger);
exitWithError(new Error(`Failed to enqueue fetched block to process`, { cause: e }), logger);
});

this.eventEmitter.emit(IndexerEvent.BlockQueueSize, {
Expand All @@ -203,7 +207,7 @@ export abstract class BlockDispatcher<B, DS>
}
} catch (e: any) {
if (!this.isShutdown) {
exitWithError(new Error(`Failed to process blocks from queue`, {cause: e}), logger);
exitWithError(new Error(`Failed to process blocks from queue`, { cause: e }), logger);
}
} finally {
this.fetching = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,39 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {last} from 'lodash';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
import {IndexerEvent} from '../../events';
import {IBlock, PoiSyncService, WorkerStatusResponse} from '../../indexer';
import {getLogger} from '../../logger';
import {monitorWrite} from '../../process';
import {AutoQueue, isTaskFlushedError} from '../../utils';
import {MonitorServiceInterface} from '../monitor.service';
import {StoreService} from '../store.service';
import {IStoreModelProvider} from '../storeModelProvider';
import {ISubqueryProject, IProjectService, Header} from '../types';
import {isBlockUnavailableError} from '../worker/utils';
import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Interval } from '@nestjs/schedule';
import { BaseDataSource } from '@subql/types-core';
import { last } from 'lodash';
import { IApiConnectionSpecific } from '../../api.service';
import { IBlockchainService } from '../../blockchain.service';
import { NodeConfig } from '../../configure';
import { IProjectUpgradeService } from '../../configure/ProjectUpgrade.service';
import { IndexerEvent } from '../../events';
import {
ConnectionPoolStateManager,
createIndexerWorker,
DynamicDsService,
IBaseIndexerWorker,
IBlock,
InMemoryCacheService,
PoiSyncService,
TerminateableWorker,
UnfinalizedBlocksService,
} from '../../indexer';
import { getLogger } from '../../logger';
import { monitorWrite } from '../../process';
import { AutoQueue, isTaskFlushedError } from '../../utils';
import { MonitorServiceInterface } from '../monitor.service';
import { StoreService } from '../store.service';
import { IStoreModelProvider } from '../storeModelProvider';
import { ISubqueryProject, IProjectService } from '../types';
import { isBlockUnavailableError } from '../worker/utils';
import { BaseBlockDispatcher } from './base-block-dispatcher';

const logger = getLogger('WorkerBlockDispatcherService');

type Worker = {
processBlock: (height: number) => Promise<ProcessBlockResponse>;
getStatus: () => Promise<WorkerStatusResponse>;
getMemoryLeft: () => Promise<number>;
terminate: () => Promise<number>;
};

function initAutoQueue<T>(
workers: number | undefined,
batchSize: number,
Expand All @@ -39,26 +45,37 @@ function initAutoQueue<T>(
return new AutoQueue(workers * batchSize * 2, 1, timeout, name);
}

export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
extends BaseBlockDispatcher<AutoQueue<void>, DS, B>
implements OnApplicationShutdown
{
protected workers: W[] = [];
@Injectable()
export class WorkerBlockDispatcher<
DS extends BaseDataSource = BaseDataSource,
Worker extends IBaseIndexerWorker = IBaseIndexerWorker,
Block = any,
ApiConn extends IApiConnectionSpecific = IApiConnectionSpecific
>
extends BaseBlockDispatcher<AutoQueue<void>, DS, Block>
implements OnApplicationShutdown {
protected workers: TerminateableWorker<Worker>[] = [];
private numWorkers: number;
private isShutdown = false;

protected abstract fetchBlock(worker: W, height: number): Promise<Header>;
private createWorker: () => Promise<TerminateableWorker<Worker>>;

constructor(
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
projectService: IProjectService<DS>,
projectUpgradeService: IProjectUpgradeService,
@Inject('IProjectService') projectService: IProjectService<DS>,
@Inject('IProjectUpgradeService') projectUpgradeService: IProjectUpgradeService,
storeService: StoreService,
storeModelProvider: IStoreModelProvider,
cacheService: InMemoryCacheService,
poiSyncService: PoiSyncService,
project: ISubqueryProject,
private createIndexerWorker: () => Promise<W>,
dynamicDsService: DynamicDsService<DS>,
unfinalizedBlocksService: UnfinalizedBlocksService<Block>,
connectionPoolState: ConnectionPoolStateManager<ApiConn>,
@Inject('ISubqueryProject') project: ISubqueryProject,
@Inject('IBlockchainService') private blockchainService: IBlockchainService<DS>,
workerPath: string,
workerFns: Parameters<typeof createIndexerWorker<Worker, ApiConn, Block, DS>>[1],
monitorService?: MonitorServiceInterface
) {
super(
Expand All @@ -73,13 +90,27 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
poiSyncService,
monitorService
);

this.createWorker = () =>
createIndexerWorker<Worker, ApiConn, Block, DS>(
workerPath,
workerFns,
storeService.getStore(),
cacheService.getCache(),
dynamicDsService,
unfinalizedBlocksService,
connectionPoolState,
project.root,
projectService.startHeight,
monitorService
);
// initAutoQueue will assert that workers is set. unfortunately we cant do anything before the super call
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.numWorkers = nodeConfig.workers!;
}

async init(onDynamicDsCreated: (height: number) => void): Promise<void> {
this.workers = await Promise.all(new Array(this.numWorkers).fill(0).map(() => this.createIndexerWorker()));
this.workers = await Promise.all(new Array(this.numWorkers).fill(0).map(() => this.createWorker()));
return super.init(onDynamicDsCreated);
}

Expand All @@ -93,7 +124,8 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
await Promise.all(this.workers.map((w) => w.terminate()));
}
}
async enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight?: number): Promise<void> {

async enqueueBlocks(heights: (IBlock<Block> | number)[], latestBufferHeight?: number): Promise<void> {
assert(
heights.every((h) => typeof h === 'number'),
'Worker block dispatcher only supports enqueuing numbers, not blocks.'
Expand Down Expand Up @@ -137,7 +169,8 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>

// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this.latestBufferedHeight;
const pendingBlock = this.fetchBlock(worker, height);

const pendingBlock = this.blockchainService.fetchBlockWorker(worker, height, { workers: this.workers });

const processBlock = async () => {
try {
Expand All @@ -150,7 +183,7 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
await this.preProcessBlock(header);

monitorWrite(`Processing from worker #${workerIdx}`);
const {dynamicDsCreated, reindexBlockHeader} = await worker.processBlock(height);
const { dynamicDsCreated, reindexBlockHeader } = await worker.processBlock(height);

await this.postProcessBlock(header, {
dynamicDsCreated,
Expand Down
Loading
Loading