From 41c60ddf6fd3fd53c73e54d29bf05d0964d0ff09 Mon Sep 17 00:00:00 2001 From: Ian He Date: Thu, 26 Feb 2026 18:39:32 +1300 Subject: [PATCH 1/6] fix: handle non-sequential unfinalized blocks with backfill --- .../indexer/unfinalizedBlocks.service.spec.ts | 310 ++++++++++++++++-- .../src/indexer/unfinalizedBlocks.service.ts | 118 ++++++- 2 files changed, 398 insertions(+), 30 deletions(-) diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts index e489079b55..6a70dddec7 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts @@ -1,10 +1,10 @@ // Copyright 2020-2025 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { IBlockchainService } from '../blockchain.service'; -import { Header, IBlock } from '../indexer'; -import { StoreCacheService, CacheMetadataModel } from './storeModelProvider'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {IBlockchainService} from '../blockchain.service'; +import {Header, IBlock} from '../indexer'; +import {StoreCacheService, CacheMetadataModel} from './storeModelProvider'; import { METADATA_LAST_FINALIZED_PROCESSED_KEY, METADATA_UNFINALIZED_BLOCKS_KEY, @@ -46,8 +46,8 @@ const BlockchainService = { function getMockMetadata(): any { const data: Record = {}; return { - upsert: ({ key, value }: any) => (data[key] = value), - findOne: ({ where: { key } }: any) => ({ value: data[key] }), + upsert: ({key, value}: any) => (data[key] = value), + findOne: ({where: {key}}: any) => ({value: data[key]}), findByPk: (key: string) => data[key], find: (key: string) => data[key], } as any; @@ -62,7 +62,7 @@ function mockStoreCache(): StoreCacheService { function mockBlock(height: number, hash: string, parentHash?: string): IBlock { return { getHeader: () => { - return { blockHeight: height, parentHash: parentHash ?? '', blockHash: hash, timestamp: new Date() }; + return {blockHeight: height, parentHash: parentHash ?? '', blockHash: hash, timestamp: new Date()}; }, block: { header: { @@ -79,7 +79,7 @@ describe('UnfinalizedBlocksService', () => { beforeEach(async () => { unfinalizedBlocksService = new UnfinalizedBlocksService( - { unfinalizedBlocks: true } as any, + {unfinalizedBlocks: true} as any, mockStoreCache(), BlockchainService ); @@ -153,7 +153,7 @@ describe('UnfinalizedBlocksService', () => { const res = await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(113, '0xabc113')); // Last valid block - expect(res).toMatchObject({ blockHash: '0xabc111', blockHeight: 111, parentHash: '' }); + expect(res).toMatchObject({blockHash: '0xabc111', blockHeight: 111, parentHash: ''}); // After this the call stack is something like: // indexerManager -> blockDispatcher -> project -> project -> reindex -> blockDispatcher.resetUnfinalizedBlocks @@ -178,7 +178,21 @@ describe('UnfinalizedBlocksService', () => { const res = await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(117, '0xabc117')); // Last valid block - expect(res).toMatchObject({ blockHash: '0xabc112', blockHeight: 112, parentHash: '' }); + expect(res).toMatchObject({blockHash: '0xabc112', blockHeight: 112, parentHash: ''}); + }); + + it('can handle a fork when latest unfinalized block has different parent', async () => { + unfinalizedBlocksService.registerFinalizedBlock(mockBlock(110, '0xabcd').block.header); + + await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(111, '0xabc111', '0xabcd')); + await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(112, '0xabc112', '0xabc111')); + await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(113, '0xabc113', '0xabc112')); + await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(114, '0xabc114', '0xabc113')); + await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(115, '0xabc115', '0xabc114')); + const res = await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(116, '0xabc116', '0xabc115f')); + + // Last valid block + expect(res).toMatchObject({blockHash: '0xabc110f', blockHeight: 110, parentHash: '0xabc109f'}); }); it('can handle a fork when all unfinalized blocks are invalid', async () => { @@ -193,7 +207,7 @@ describe('UnfinalizedBlocksService', () => { const res = await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(113, '0xabc113')); // Last valid block - expect(res).toMatchObject({ blockHash: '0xabc110f', blockHeight: 110, parentHash: '0xabc109f' }); + expect(res).toMatchObject({blockHash: '0xabc110f', blockHeight: 110, parentHash: '0xabc109f'}); }); it('can handle a fork and when unfinalized blocks < finalized head', async () => { @@ -208,7 +222,7 @@ describe('UnfinalizedBlocksService', () => { const res = await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(113, '0xabc113')); // Last valid block - expect(res).toMatchObject({ blockHash: '0xabc110f', blockHeight: 110, parentHash: '0xabc109f' }); + expect(res).toMatchObject({blockHash: '0xabc110f', blockHeight: 110, parentHash: '0xabc109f'}); }); it('can handle a fork and when unfinalized blocks < finalized head 2', async () => { @@ -229,7 +243,7 @@ describe('UnfinalizedBlocksService', () => { const res = await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(113, '0xabc113')); // Last valid block - expect(res).toMatchObject({ blockHash: '0xabc110f', blockHeight: 110, parentHash: '0xabc109f' }); + expect(res).toMatchObject({blockHash: '0xabc110f', blockHeight: 110, parentHash: '0xabc109f'}); }); it('can handle a fork and when unfinalized blocks < finalized head with a large difference', async () => { @@ -244,25 +258,25 @@ describe('UnfinalizedBlocksService', () => { const res = await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(113, '0xabc113')); // Last valid block - expect(res).toMatchObject({ blockHash: '0xabc110f', blockHeight: 110, parentHash: '0xabc109f' }); + expect(res).toMatchObject({blockHash: '0xabc110f', blockHeight: 110, parentHash: '0xabc109f'}); }); it('can rewind any unfinalized blocks when restarted and unfinalized blocks is disabled', async () => { - const storeCache = new StoreCacheService(null as any, { storeCacheThreshold: 300 } as any, new EventEmitter2()); + const storeCache = new StoreCacheService(null as any, {storeCacheThreshold: 300} as any, new EventEmitter2()); storeCache.init('height', {} as any, undefined); await storeCache.metadata.set( METADATA_UNFINALIZED_BLOCKS_KEY, JSON.stringify([ - { blockHeight: 90, blockHash: '0xabcd' }, - { blockHeight: 91, blockHash: '0xabc91' }, - { blockHeight: 92, blockHash: '0xabc92' }, + {blockHeight: 90, blockHash: '0xabcd'}, + {blockHeight: 91, blockHash: '0xabc91'}, + {blockHeight: 92, blockHash: '0xabc92'}, ]) ); await storeCache.metadata.set(METADATA_LAST_FINALIZED_PROCESSED_KEY, 90); const unfinalizedBlocksService2 = new UnfinalizedBlocksService( - { unfinalizedBlocks: false } as any, + {unfinalizedBlocks: false} as any, storeCache, BlockchainService ); @@ -272,8 +286,264 @@ describe('UnfinalizedBlocksService', () => { await unfinalizedBlocksService2.init(reindex); expect(reindex).toHaveBeenCalledWith( - expect.objectContaining({ blockHash: '0xabc90f', blockHeight: 90, parentHash: '0xabc89f' }) + expect.objectContaining({blockHash: '0xabc90f', blockHeight: 90, parentHash: '0xabc89f'}) ); expect((unfinalizedBlocksService2 as any).lastCheckedBlockHeight).toBe(90); }); + + describe('backfill functionality', () => { + it('backfills small gap when non-sequential block is registered', async () => { + // Create a custom blockchain service that returns proper parentHash chain + const customBlockchainService = { + ...BlockchainService, + async getHeaderForHeight(height: number): Promise
{ + return Promise.resolve({ + blockHeight: height, + blockHash: `0xabc${height}`, + parentHash: `0xabc${height - 1}`, + timestamp: new Date(), + }); + }, + } as IBlockchainService; + + const service = new UnfinalizedBlocksService( + {unfinalizedBlocks: true} as any, + mockStoreCache(), + customBlockchainService + ); + + await service.init(() => Promise.resolve()); + service.registerFinalizedBlock(mockBlock(110, '0xabcd').block.header); + + // Register block 111 with correct parentHash + await service.processUnfinalizedBlocks(mockBlock(111, '0xabc111', '0xabc110')); + + // Register block 115 (gap of 3 blocks: 112, 113, 114) + await service.processUnfinalizedBlocks(mockBlock(115, '0xabc115', '0xabc114')); + + // Verify all blocks are present including backfilled ones + const unfinalizedBlocks = (service as any).unfinalizedBlocks; + expect(unfinalizedBlocks.length).toBe(5); + expect(unfinalizedBlocks[0].blockHeight).toBe(111); + expect(unfinalizedBlocks[1].blockHeight).toBe(112); + expect(unfinalizedBlocks[2].blockHeight).toBe(113); + expect(unfinalizedBlocks[3].blockHeight).toBe(114); + expect(unfinalizedBlocks[4].blockHeight).toBe(115); + }); + + it('resets chain when gap exceeds UNFINALIZED_THRESHOLD', async () => { + // UNFINALIZED_THRESHOLD is 200 + const service = new UnfinalizedBlocksService( + {unfinalizedBlocks: true} as any, + mockStoreCache(), + BlockchainService + ); + + await service.init(() => Promise.resolve()); + service.registerFinalizedBlock(mockBlock(110, '0xabcd').block.header); + + // Register block 111 + await service.processUnfinalizedBlocks(mockBlock(111, '0xabc111')); + + // Register block 500 (gap of 389 blocks, exceeds threshold of 200) + // safeHeight = 500 - 200 = 300 + // lastUnfinalizedHeight (111) < 300, so chain should be reset + await service.processUnfinalizedBlocks(mockBlock(500, '0xabc500')); + + // Verify only block 500 is present (chain was reset) + const unfinalizedBlocks = (service as any).unfinalizedBlocks; + expect(unfinalizedBlocks.length).toBe(1); + expect(unfinalizedBlocks[0].blockHeight).toBe(500); + }); + + it('backfills when gap is within UNFINALIZED_THRESHOLD', async () => { + const customBlockchainService = { + ...BlockchainService, + async getHeaderForHeight(height: number): Promise
{ + return Promise.resolve({ + blockHeight: height, + blockHash: `0xabc${height}`, + parentHash: `0xabc${height - 1}`, + timestamp: new Date(), + }); + }, + } as IBlockchainService; + + const service = new UnfinalizedBlocksService( + {unfinalizedBlocks: true} as any, + mockStoreCache(), + customBlockchainService + ); + + await service.init(() => Promise.resolve()); + service.registerFinalizedBlock(mockBlock(110, '0xabcd').block.header); + + // Register block 111 + await service.processUnfinalizedBlocks(mockBlock(111, '0xabc111', '0xabc110')); + + // Register block 250 (gap of 139 blocks, within threshold of 200) + // safeHeight = 250 - 200 = 50 + // lastUnfinalizedHeight (111) > 50, so backfill is needed + await service.processUnfinalizedBlocks(mockBlock(250, '0xabc250', '0xabc249')); + + // Verify all blocks are present including backfilled ones + const unfinalizedBlocks = (service as any).unfinalizedBlocks; + expect(unfinalizedBlocks.length).toBe(140); // 111 + 139 backfilled + 250 + expect(unfinalizedBlocks[0].blockHeight).toBe(111); + expect(unfinalizedBlocks[139].blockHeight).toBe(250); + }); + + it('detects fork during backfill when parentHash chain is broken', async () => { + // Create a blockchain service that returns wrong parentHash for block 113 + const customBlockchainService = { + ...BlockchainService, + async getHeaderForHeight(height: number): Promise
{ + // Return broken parentHash for block 113 + if (height === 113) { + return Promise.resolve({ + blockHeight: height, + blockHash: `0xabc${height}`, + parentHash: '0xwrong', + timestamp: new Date(), + }); + } + return Promise.resolve({ + blockHeight: height, + blockHash: `0xabc${height}`, + parentHash: `0xabc${height - 1}`, + timestamp: new Date(), + }); + }, + } as IBlockchainService; + + const service = new UnfinalizedBlocksService( + {unfinalizedBlocks: true} as any, + mockStoreCache(), + customBlockchainService + ); + + await service.init(() => Promise.resolve()); + service.registerFinalizedBlock(mockBlock(110, '0xabcd').block.header); + + // Register block 111 + await service.processUnfinalizedBlocks(mockBlock(111, '0xabc111', '0xabc110')); + + // Register block 115 - should detect fork during backfill + const result = await service.processUnfinalizedBlocks(mockBlock(115, '0xabc115', '0xabc114')); + + // Should return the finalized block (110) since all unfinalized blocks are > finalized height + // and getLastCorrectFinalizedBlock falls back to finalizedHeader + expect(result).toBeDefined(); + expect(result?.blockHeight).toBe(110); + }); + + it('detects fork after backfill when new block has wrong parentHash', async () => { + // Create a blockchain service that returns proper chain for backfill + const customBlockchainService = { + ...BlockchainService, + async getHeaderForHeight(height: number): Promise
{ + return Promise.resolve({ + blockHeight: height, + blockHash: `0xabc${height}`, + parentHash: `0xabc${height - 1}`, + timestamp: new Date(), + }); + }, + } as IBlockchainService; + + const service = new UnfinalizedBlocksService( + {unfinalizedBlocks: true} as any, + mockStoreCache(), + customBlockchainService + ); + + await service.init(() => Promise.resolve()); + service.registerFinalizedBlock(mockBlock(110, '0xabcd').block.header); + + // Register block 111 + await service.processUnfinalizedBlocks(mockBlock(111, '0xabc111', '0xabc110')); + + // Register block 115 with wrong parentHash (should connect to 0xabc114) + const result = await service.processUnfinalizedBlocks(mockBlock(115, '0xabc115', '0xwrong')); + + // Should return the finalized block (110) since all unfinalized blocks are > finalized height + // and getLastCorrectFinalizedBlock falls back to finalizedHeader + expect(result).toBeDefined(); + expect(result?.blockHeight).toBe(110); + }); + + it('throws error when block fetch fails during backfill', async () => { + // Create a blockchain service that fails to fetch block 113 + const customBlockchainService = { + ...BlockchainService, + async getHeaderForHeight(height: number): Promise
{ + if (height === 113) { + throw new Error('Network error'); + } + return Promise.resolve({ + blockHeight: height, + blockHash: `0xabc${height}`, + parentHash: `0xabc${height - 1}`, + timestamp: new Date(), + }); + }, + } as IBlockchainService; + + const service = new UnfinalizedBlocksService( + {unfinalizedBlocks: true} as any, + mockStoreCache(), + customBlockchainService + ); + + await service.init(() => Promise.resolve()); + service.registerFinalizedBlock(mockBlock(110, '0xabcd').block.header); + + // Register block 111 + await service.processUnfinalizedBlocks(mockBlock(111, '0xabc111', '0xabc110')); + + // Register block 115 - should throw during backfill + await expect(service.processUnfinalizedBlocks(mockBlock(115, '0xabc115', '0xabc114'))).rejects.toThrow( + 'Failed to backfill missing unfinalized block at height 113' + ); + }); + + it('continues chain correctly after successful backfill', async () => { + const customBlockchainService = { + ...BlockchainService, + async getHeaderForHeight(height: number): Promise
{ + return Promise.resolve({ + blockHeight: height, + blockHash: `0xabc${height}`, + parentHash: `0xabc${height - 1}`, + timestamp: new Date(), + }); + }, + } as IBlockchainService; + + const service = new UnfinalizedBlocksService( + {unfinalizedBlocks: true} as any, + mockStoreCache(), + customBlockchainService + ); + + await service.init(() => Promise.resolve()); + service.registerFinalizedBlock(mockBlock(110, '0xabcd').block.header); + + // Register blocks with gaps and then continue with sequential + await service.processUnfinalizedBlocks(mockBlock(111, '0xabc111', '0xabc110')); + await service.processUnfinalizedBlocks(mockBlock(115, '0xabc115', '0xabc114')); + await service.processUnfinalizedBlocks(mockBlock(116, '0xabc116', '0xabc115')); + await service.processUnfinalizedBlocks(mockBlock(117, '0xabc117', '0xabc116')); + + const unfinalizedBlocks = (service as any).unfinalizedBlocks; + expect(unfinalizedBlocks.length).toBe(7); + expect(unfinalizedBlocks[0].blockHeight).toBe(111); + expect(unfinalizedBlocks[6].blockHeight).toBe(117); + + // Verify chain connectivity + for (let i = 1; i < unfinalizedBlocks.length; i++) { + expect(unfinalizedBlocks[i].parentHash).toBe(unfinalizedBlocks[i - 1].blockHash); + } + }); + }); }); diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts index d0852bd358..1d3c3deced 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts @@ -9,7 +9,6 @@ import {IBlockchainService} from '../blockchain.service'; import {NodeConfig} from '../configure'; import {Header, IBlock} from '../indexer/types'; import {getLogger} from '../logger'; -import {exitWithError} from '../process'; import {mainThreadOnly} from '../utils'; import {ProofOfIndex} from './entities'; import {PoiBlock} from './poi'; @@ -98,11 +97,13 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ // If not for workers this could be private async processUnfinalizedBlockHeader(header?: Header): Promise
{ + let forkedHeader; if (header) { - await this.registerUnfinalizedBlock(header); + forkedHeader = await this.registerUnfinalizedBlock(header); + } + if (!forkedHeader) { + forkedHeader = await this.hasForked(); } - - const forkedHeader = await this.hasForked(); if (!forkedHeader) { // Remove blocks that are now confirmed finalized @@ -126,20 +127,113 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ this._finalizedHeader = header; } - private async registerUnfinalizedBlock(header: Header): Promise { + private async registerUnfinalizedBlock(header: Header): Promise
{ if (header.blockHeight <= this.finalizedBlockNumber) return; - // Ensure order + const lastUnfinalized = last(this.unfinalizedBlocks); const lastUnfinalizedHeight = last(this.unfinalizedBlocks)?.blockHeight; - if (lastUnfinalizedHeight !== undefined && lastUnfinalizedHeight + 1 !== header.blockHeight) { - exitWithError( - `Unfinalized block is not sequential, lastUnfinalizedBlock='${lastUnfinalizedHeight}', newUnfinalizedBlock='${header.blockHeight}'`, - logger + + // If this is the first unfinalized block or it's sequential, just add it + if (lastUnfinalizedHeight === undefined || lastUnfinalizedHeight + 1 === header.blockHeight) { + this.unfinalizedBlocks.push(header); + if (lastUnfinalized && lastUnfinalized.blockHash !== header.parentHash) { + return header; + } + await this.saveUnfinalizedBlocks(this.unfinalizedBlocks); + return; + } + + // Non-sequential block detected + const safeHeight = header.blockHeight - UNFINALIZED_THRESHOLD; + + // If the last unfinalized block is below the safe height, it's already covered by finalization + // We can safely drop all existing unfinalized blocks and start fresh with the new block + if (lastUnfinalizedHeight < safeHeight) { + logger.info( + `Resetting unfinalized blocks chain. ` + + `Last unfinalized block ${lastUnfinalizedHeight} is below safe height ${safeHeight}. ` + + `Dropping ${this.unfinalizedBlocks.length} blocks and adding block ${header.blockHeight}` ); + this._unfinalizedBlocks = []; + this.unfinalizedBlocks.push(header); + await this.saveUnfinalizedBlocks(this.unfinalizedBlocks); + return; } + // Gap is within threshold, need to backfill to maintain parentHash chain for fork detection + const gapStart = lastUnfinalizedHeight + 1; + const gapEnd = header.blockHeight - 1; + const gapSize = gapEnd - gapStart + 1; + + logger.info(`Backfilling missing unfinalized blocks from ${gapStart} to ${gapEnd} (${gapSize} blocks)`); + + // Backfill missing blocks + const backfillResult = await this.backfillBlocks(gapStart, gapEnd, header); + + // Add the original header after successful backfill this.unfinalizedBlocks.push(header); + + if (backfillResult.forkDetected) { + return backfillResult.forkHeader; + } await this.saveUnfinalizedBlocks(this.unfinalizedBlocks); + return; + } + + /** + * Backfills missing blocks between the last unfinalized block and a new block. + * Validates parentHash chain during backfill to detect forks. + * + * @param startHeight - The first missing block height + * @param endHeight - The last missing block height + * @param nextHeader - The next header that triggered backfill (for validation) + * @returns Object indicating if fork was detected and the fork header if so + */ + private async backfillBlocks( + startHeight: number, + endHeight: number, + nextHeader: Header + ): Promise<{forkDetected: boolean; forkHeader?: Header}> { + // Fetch and validate each missing block + for (let height = startHeight; height <= endHeight; height++) { + try { + const header = await this.blockchainService.getHeaderForHeight(height); + + // Validate parentHash chain + const previousHeader = last(this.unfinalizedBlocks); + + if (previousHeader && header.parentHash !== previousHeader.blockHash) { + logger.warn( + `Fork detected during backfill at height ${height}. ` + + `Expected parentHash: ${previousHeader.blockHash}, ` + + `Got: ${header.parentHash}` + ); + // Return the previous header (last valid block before the fork) + return {forkDetected: true, forkHeader: previousHeader}; + } + + this.unfinalizedBlocks.push(header); + } catch (e: any) { + logger.error(`Failed to fetch block ${height} during backfill: ${e.message}`); + throw new Error(`Failed to backfill missing unfinalized block at height ${height}: ${e.message}`); + } + } + + // Validate the next header connects properly to the last backfilled block + const lastBackfilledHeader = last(this.unfinalizedBlocks); + if (lastBackfilledHeader && nextHeader.parentHash !== lastBackfilledHeader.blockHash) { + logger.warn( + `Fork detected: new block ${nextHeader.blockHeight} doesn't connect to backfilled chain. ` + + `Expected parentHash: ${lastBackfilledHeader.blockHash}, ` + + `Got: ${nextHeader.parentHash}` + ); + // Return the last backfilled header as the fork point + return {forkDetected: true, forkHeader: lastBackfilledHeader}; + } + + logger.info(`Successfully backfilled ${endHeight - startHeight + 1} missing blocks`); + + return {forkDetected: false}; } private async deleteFinalizedBlock(): Promise { @@ -221,6 +315,10 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ // Work backwards through the blocks until we find a matching hash for (const bestHeader of bestVerifiableBlocks.reverse()) { + assert( + bestHeader.blockHeight === checkingHeader.blockHeight, + 'Expect best header and checking header to be at the same height' + ); if (bestHeader.blockHash === checkingHeader.blockHash || bestHeader.blockHash === checkingHeader.parentHash) { return bestHeader; } From 88da4929439202b683290e5d175588290f3ed6c7 Mon Sep 17 00:00:00 2001 From: Ian He Date: Thu, 26 Feb 2026 19:56:34 +1300 Subject: [PATCH 2/6] further guard the timing of backfill --- .../indexer/unfinalizedBlocks.service.spec.ts | 26 ++++++- .../src/indexer/unfinalizedBlocks.service.ts | 77 ++++++++++++------- 2 files changed, 73 insertions(+), 30 deletions(-) diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts index 6a70dddec7..411be7aceb 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts @@ -15,6 +15,8 @@ import { * Block hashes all have the format '0xabc' + block number * If they are forked they will have an `f` at the end */ +let mockBestHeight = 150; + const BlockchainService = { async getFinalizedHeader(): Promise
{ return Promise.resolve({ @@ -41,6 +43,10 @@ const BlockchainService = { timestamp: new Date(), }); }, + // eslint-disable-next-line @typescript-eslint/require-await + async getBestHeight(): Promise { + return mockBestHeight; + }, } as IBlockchainService; function getMockMetadata(): any { @@ -78,6 +84,7 @@ describe('UnfinalizedBlocksService', () => { let unfinalizedBlocksService: UnfinalizedBlocksService; beforeEach(async () => { + mockBestHeight = 150; unfinalizedBlocksService = new UnfinalizedBlocksService( {unfinalizedBlocks: true} as any, mockStoreCache(), @@ -261,6 +268,23 @@ describe('UnfinalizedBlocksService', () => { expect(res).toMatchObject({blockHash: '0xabc110f', blockHeight: 110, parentHash: '0xabc109f'}); }); + it('discards unfinalized blocks older than best height window before backfill', async () => { + unfinalizedBlocksService.registerFinalizedBlock(mockBlock(110, '0xabcd').block.header); + + await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(201, '0xabc201')); + await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(202, '0xabc202')); + + expect((unfinalizedBlocksService as any).unfinalizedBlocks).toHaveLength(2); + + mockBestHeight = 800; + + await unfinalizedBlocksService.processUnfinalizedBlocks(mockBlock(650, '0xabc650')); + + expect((unfinalizedBlocksService as any).unfinalizedBlocks).toMatchObject([ + mockBlock(650, '0xabc650').block.header, + ]); + }); + it('can rewind any unfinalized blocks when restarted and unfinalized blocks is disabled', async () => { const storeCache = new StoreCacheService(null as any, {storeCacheThreshold: 300} as any, new EventEmitter2()); @@ -388,7 +412,7 @@ describe('UnfinalizedBlocksService', () => { // Verify all blocks are present including backfilled ones const unfinalizedBlocks = (service as any).unfinalizedBlocks; - expect(unfinalizedBlocks.length).toBe(140); // 111 + 139 backfilled + 250 + expect(unfinalizedBlocks.length).toBe(140); // 111 + 138 backfilled + 250 expect(unfinalizedBlocks[0].blockHeight).toBe(111); expect(unfinalizedBlocks[139].blockHeight).toBe(250); }); diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts index 1d3c3deced..e2c6a2ae81 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts @@ -3,10 +3,12 @@ import assert from 'assert'; import {Inject, Injectable} from '@nestjs/common'; +import {OnEvent} from '@nestjs/event-emitter'; import {Transaction} from '@subql/x-sequelize'; import {isEqual, last} from 'lodash'; import {IBlockchainService} from '../blockchain.service'; import {NodeConfig} from '../configure'; +import {IndexerEvent} from '../events'; import {Header, IBlock} from '../indexer/types'; import {getLogger} from '../logger'; import {mainThreadOnly} from '../utils'; @@ -43,6 +45,7 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ private _unfinalizedBlocks?: UnfinalizedBlocks; private _finalizedHeader?: Header; protected lastCheckedBlockHeight?: number; + private _latestBestHeight?: number; @mainThreadOnly() private blockToHeader(block: IBlock): Header { @@ -71,6 +74,7 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ this._unfinalizedBlocks = await this.getMetadataUnfinalizedBlocks(); this.lastCheckedBlockHeight = await this.getLastFinalizedVerifiedHeight(); this._finalizedHeader = await this.blockchainService.getFinalizedHeader(); + this._latestBestHeight = this._finalizedHeader.blockHeight; if (this.unfinalizedBlocks.length) { logger.info('Processing unfinalized blocks'); @@ -127,55 +131,70 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ this._finalizedHeader = header; } + @OnEvent(IndexerEvent.BlockBest) + updateBestHeight(payload: {height: number}): void { + this._latestBestHeight = payload.height; + } + + private get bestHeight(): number { + return this._latestBestHeight ?? this.finalizedBlockNumber; + } + private async registerUnfinalizedBlock(header: Header): Promise
{ if (header.blockHeight <= this.finalizedBlockNumber) return; + const bestHeight = Math.max(this.bestHeight, header.blockHeight); + const safeHeight = Math.max(bestHeight - UNFINALIZED_THRESHOLD, 0); + + const currentBlocks = this.unfinalizedBlocks; + if (currentBlocks.length) { + const filteredBlocks = currentBlocks.filter(({blockHeight}) => blockHeight >= safeHeight); + if (filteredBlocks.length !== currentBlocks.length) { + logger.info( + `Dropping ${currentBlocks.length - filteredBlocks.length} unfinalized blocks below safe height ${safeHeight}` + ); + this._unfinalizedBlocks = filteredBlocks; + } + } + const lastUnfinalized = last(this.unfinalizedBlocks); - const lastUnfinalizedHeight = last(this.unfinalizedBlocks)?.blockHeight; + const lastUnfinalizedHeight = lastUnfinalized?.blockHeight; // If this is the first unfinalized block or it's sequential, just add it if (lastUnfinalizedHeight === undefined || lastUnfinalizedHeight + 1 === header.blockHeight) { this.unfinalizedBlocks.push(header); - if (lastUnfinalized && lastUnfinalized.blockHash !== header.parentHash) { + if (lastUnfinalized && header.parentHash && lastUnfinalized.blockHash !== header.parentHash) { return header; } await this.saveUnfinalizedBlocks(this.unfinalizedBlocks); return; } - // Non-sequential block detected - const safeHeight = header.blockHeight - UNFINALIZED_THRESHOLD; - - // If the last unfinalized block is below the safe height, it's already covered by finalization - // We can safely drop all existing unfinalized blocks and start fresh with the new block - if (lastUnfinalizedHeight < safeHeight) { - logger.info( - `Resetting unfinalized blocks chain. ` + - `Last unfinalized block ${lastUnfinalizedHeight} is below safe height ${safeHeight}. ` + - `Dropping ${this.unfinalizedBlocks.length} blocks and adding block ${header.blockHeight}` - ); - this._unfinalizedBlocks = []; - this.unfinalizedBlocks.push(header); - await this.saveUnfinalizedBlocks(this.unfinalizedBlocks); - return; - } - - // Gap is within threshold, need to backfill to maintain parentHash chain for fork detection - const gapStart = lastUnfinalizedHeight + 1; + // Calculate gap start, only backfill from safeHeight onwards + const gapStart = Math.max(lastUnfinalizedHeight + 1, safeHeight); const gapEnd = header.blockHeight - 1; - const gapSize = gapEnd - gapStart + 1; - logger.info(`Backfilling missing unfinalized blocks from ${gapStart} to ${gapEnd} (${gapSize} blocks)`); + // If there's still a gap to backfill + if (gapStart <= gapEnd) { + const gapSize = gapEnd - gapStart + 1; + logger.info(`Backfilling missing unfinalized blocks from ${gapStart} to ${gapEnd} (${gapSize} blocks)`); - // Backfill missing blocks - const backfillResult = await this.backfillBlocks(gapStart, gapEnd, header); + // Backfill missing blocks + const backfillResult = await this.backfillBlocks(gapStart, gapEnd, header); - // Add the original header after successful backfill - this.unfinalizedBlocks.push(header); + // Add the original header after successful backfill + // Note: We push even if fork was detected. The in-memory state will be reset + // when the caller processes the fork and triggers a reindex. + this.unfinalizedBlocks.push(header); - if (backfillResult.forkDetected) { - return backfillResult.forkHeader; + if (backfillResult.forkDetected) { + return backfillResult.forkHeader; + } + } else { + // No gap to backfill, just add the new header + this.unfinalizedBlocks.push(header); } + await this.saveUnfinalizedBlocks(this.unfinalizedBlocks); return; } From faaeacd6f84caea3efad55ddafd65fc1893a9797 Mon Sep 17 00:00:00 2001 From: Ian He Date: Thu, 26 Feb 2026 19:58:51 +1300 Subject: [PATCH 3/6] bump version --- packages/node-core/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node-core/package.json b/packages/node-core/package.json index 951a60c104..317770f369 100644 --- a/packages/node-core/package.json +++ b/packages/node-core/package.json @@ -1,6 +1,6 @@ { "name": "@subql/node-core", - "version": "19.1.0", + "version": "19.2.0-0", "description": "Common node features that are agnostic to blockchains", "homepage": "https://github.com/subquery/subql", "repository": "github:subquery/subql", From 6e539880d9cc1df7bb7f5479df5b0bcb69055fe4 Mon Sep 17 00:00:00 2001 From: Ian He Date: Thu, 26 Feb 2026 20:01:37 +1300 Subject: [PATCH 4/6] revert version bump --- packages/node-core/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node-core/package.json b/packages/node-core/package.json index 317770f369..951a60c104 100644 --- a/packages/node-core/package.json +++ b/packages/node-core/package.json @@ -1,6 +1,6 @@ { "name": "@subql/node-core", - "version": "19.2.0-0", + "version": "19.1.0", "description": "Common node features that are agnostic to blockchains", "homepage": "https://github.com/subquery/subql", "repository": "github:subquery/subql", From 2d9ab538166bd7fcf70ffb86c352c55d967ca75d Mon Sep 17 00:00:00 2001 From: Ian He Date: Thu, 26 Feb 2026 20:04:05 +1300 Subject: [PATCH 5/6] fix issue --- packages/node-core/src/indexer/unfinalizedBlocks.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts index e2c6a2ae81..9a0f21b76e 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts @@ -74,7 +74,7 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ this._unfinalizedBlocks = await this.getMetadataUnfinalizedBlocks(); this.lastCheckedBlockHeight = await this.getLastFinalizedVerifiedHeight(); this._finalizedHeader = await this.blockchainService.getFinalizedHeader(); - this._latestBestHeight = this._finalizedHeader.blockHeight; + this._latestBestHeight = await this.blockchainService.getBestHeight(); if (this.unfinalizedBlocks.length) { logger.info('Processing unfinalized blocks'); From 1c4a4976a97ae76708683e0de9dae9befd39b12f Mon Sep 17 00:00:00 2001 From: Ian He Date: Thu, 26 Feb 2026 20:07:16 +1300 Subject: [PATCH 6/6] fix issue --- .../src/indexer/unfinalizedBlocks.service.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts index 9a0f21b76e..bc6a408a96 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts @@ -74,7 +74,6 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ this._unfinalizedBlocks = await this.getMetadataUnfinalizedBlocks(); this.lastCheckedBlockHeight = await this.getLastFinalizedVerifiedHeight(); this._finalizedHeader = await this.blockchainService.getFinalizedHeader(); - this._latestBestHeight = await this.blockchainService.getBestHeight(); if (this.unfinalizedBlocks.length) { logger.info('Processing unfinalized blocks'); @@ -136,14 +135,14 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ this._latestBestHeight = payload.height; } - private get bestHeight(): number { - return this._latestBestHeight ?? this.finalizedBlockNumber; - } - private async registerUnfinalizedBlock(header: Header): Promise
{ if (header.blockHeight <= this.finalizedBlockNumber) return; - const bestHeight = Math.max(this.bestHeight, header.blockHeight); + if (this._latestBestHeight === undefined) { + this._latestBestHeight = await this.blockchainService.getBestHeight(); + } + + const bestHeight = Math.max(this._latestBestHeight, header.blockHeight); const safeHeight = Math.max(bestHeight - UNFINALIZED_THRESHOLD, 0); const currentBlocks = this.unfinalizedBlocks;