Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ func (e *Executor) initializeState() error {
e.logger.Info().Uint64("height", state.LastBlockHeight).
Str("chain_id", state.ChainID).Msg("initialized state")

// Migrate any old-style pending block (stored at height N+1 via SaveBlockData
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we link #2795 as a todo.

// with empty signature) to the new metadata-key format.
// Todo remove in the future: https://github.com/evstack/ev-node/issues/2795
if err := e.migrateLegacyPendingBlock(e.ctx); err != nil {
return fmt.Errorf("failed to migrate legacy pending block: %w", err)
}

// Determine sync target: use Raft height if node is behind Raft consensus
syncTargetHeight := state.LastBlockHeight
if e.raftNode != nil {
Expand Down Expand Up @@ -429,12 +436,12 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {

// Check if there's an already stored block at the newHeight
// If there is use that instead of creating a new block
pendingHeader, pendingData, err := e.store.GetBlockData(ctx, newHeight)
if err == nil {
pendingHeader, pendingData, err := e.getPendingBlock(ctx)
if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight {
e.logger.Info().Uint64("height", newHeight).Msg("using pending block")
header = pendingHeader
data = pendingData
} else if !errors.Is(err, datastore.ErrNotFound) {
} else if err != nil && !errors.Is(err, datastore.ErrNotFound) {
return fmt.Errorf("failed to get block data: %w", err)
} else {
// get batch from sequencer
Expand All @@ -452,18 +459,9 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to create block: %w", err)
}

// saved early for crash recovery, will be overwritten later with the final signature
batch, err := e.store.NewBatch(ctx)
if err != nil {
return fmt.Errorf("failed to create batch for early save: %w", err)
}
if err = batch.SaveBlockData(header, data, &types.Signature{}); err != nil {
if err := e.savePendingBlock(ctx, header, data); err != nil {
return fmt.Errorf("failed to save block data: %w", err)
}
if err = batch.Commit(); err != nil {
return fmt.Errorf("failed to commit early save batch: %w", err)
}
}

if e.raftNode != nil && !e.raftNode.HasQuorum() {
Expand Down Expand Up @@ -535,6 +533,10 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
}
e.logger.Debug().Uint64("height", newHeight).Msg("proposed block to raft")
}
if err := e.deletePendingBlock(batch); err != nil {
e.logger.Warn().Err(err).Uint64("height", newHeight).Msg("failed to delete pending block metadata")
}

if err := batch.Commit(); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
Expand Down
20 changes: 8 additions & 12 deletions block/internal/executing/executor_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
require.NoError(t, exec1.initializeState())

// Set up context for first executor
exec1.ctx, exec1.cancel = context.WithCancel(context.Background())
exec1.ctx, exec1.cancel = context.WithCancel(t.Context())

// First executor produces a block normally
mockSeq1.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")).
Expand All @@ -101,12 +101,12 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
require.NoError(t, err)

// Verify first block was produced
h1, err := memStore.Height(context.Background())
h1, err := memStore.Height(t.Context())
require.NoError(t, err)
assert.Equal(t, uint64(1), h1)

// Store the produced block data for later verification
originalHeader, originalData, err := memStore.GetBlockData(context.Background(), 1)
originalHeader, originalData, err := memStore.GetBlockData(t.Context(), 1)
require.NoError(t, err)
assert.Equal(t, 2, len(originalData.Txs), "first block should have 2 transactions")

Expand Down Expand Up @@ -158,11 +158,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
pendingHeader.DataHash = pendingData.DACommitment()

// Save pending block data (this is what would happen during a crash)
batch, err := memStore.NewBatch(context.Background())
require.NoError(t, err)
err = batch.SaveBlockData(pendingHeader, pendingData, &types.Signature{})
require.NoError(t, err)
err = batch.Commit()
err = exec1.savePendingBlock(t.Context(), pendingHeader, pendingData)
require.NoError(t, err)

// Stop first executor (simulating crash/restart)
Expand Down Expand Up @@ -199,7 +195,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
require.NoError(t, exec2.initializeState())

// Set up context for second executor
exec2.ctx, exec2.cancel = context.WithCancel(context.Background())
exec2.ctx, exec2.cancel = context.WithCancel(t.Context())
defer exec2.cancel()

// Verify that the state is at height 1 (pending block at height 2 wasn't committed)
Expand All @@ -221,12 +217,12 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
require.NoError(t, err)

// Verify height advanced to 2
h2, err := memStore.Height(context.Background())
h2, err := memStore.Height(t.Context())
require.NoError(t, err)
assert.Equal(t, uint64(2), h2, "height should advance to 2 using pending block")

// Verify the block at height 2 matches the pending block data
finalHeader, finalData, err := memStore.GetBlockData(context.Background(), 2)
finalHeader, finalData, err := memStore.GetBlockData(t.Context(), 2)
require.NoError(t, err)
assert.Equal(t, 3, len(finalData.Txs), "should use pending block with 3 transactions")
assert.Equal(t, []byte("pending_tx1"), []byte(finalData.Txs[0]))
Expand Down Expand Up @@ -388,7 +384,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {
require.NoError(t, err)

// Verify normal operation
h, err := memStore.Height(context.Background())
h, err := memStore.Height(t.Context())
require.NoError(t, err)
assert.Equal(t, uint64(numBlocks+1), h)

Expand Down
147 changes: 147 additions & 0 deletions block/internal/executing/pending.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package executing

import (
"context"
"crypto/sha256"
"errors"
"fmt"

"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/types"
ds "github.com/ipfs/go-datastore"
)

const (
headerKey = "pending_header"
dataKey = "pending_data"
)

// getPendingBlock retrieves the pending block from metadata if it exists
func (e *Executor) getPendingBlock(ctx context.Context) (*types.SignedHeader, *types.Data, error) {
headerBytes, err := e.store.GetMetadata(ctx, headerKey)
if err != nil {
if errors.Is(err, ds.ErrNotFound) {
return nil, nil, nil
}
return nil, nil, err
}

dataBytes, err := e.store.GetMetadata(ctx, dataKey)
if err != nil {
if errors.Is(err, ds.ErrNotFound) {
return nil, nil, fmt.Errorf("pending header exists but data is missing: corrupt state")
}
return nil, nil, err
}

header := new(types.SignedHeader)
if err := header.UnmarshalBinary(headerBytes); err != nil {
return nil, nil, fmt.Errorf("unmarshal pending header: %w", err)
}

data := new(types.Data)
if err := data.UnmarshalBinary(dataBytes); err != nil {
return nil, nil, fmt.Errorf("unmarshal pending data: %w", err)
}
return header, data, nil
}

// savePendingBlock saves a block to metadata as pending
func (e *Executor) savePendingBlock(ctx context.Context, header *types.SignedHeader, data *types.Data) error {
headerBytes, err := header.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal header: %w", err)
}

dataBytes, err := data.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal data: %w", err)
}

batch, err := e.store.NewBatch(ctx)
if err != nil {
return fmt.Errorf("create batch for early save: %w", err)
}

if err := batch.Put(ds.NewKey(store.GetMetaKey(headerKey)), headerBytes); err != nil {
return fmt.Errorf("save pending header: %w", err)
}

if err := batch.Put(ds.NewKey(store.GetMetaKey(dataKey)), dataBytes); err != nil {
return fmt.Errorf("save pending data: %w", err)
}

if err := batch.Commit(); err != nil {
return fmt.Errorf("commit pending block: %w", err)
}
return nil
}

// deletePendingBlock removes pending block metadata
func (e *Executor) deletePendingBlock(batch store.Batch) error {
if err := batch.Delete(ds.NewKey(store.GetMetaKey(headerKey))); err != nil {
return fmt.Errorf("delete pending header: %w", err)
}

if err := batch.Delete(ds.NewKey(store.GetMetaKey(dataKey))); err != nil {
return fmt.Errorf("delete pending data: %w", err)
}
return nil
}

// migrateLegacyPendingBlock detects old-style pending blocks that were stored
// at height N+1 via SaveBlockData with an empty signature (pre-upgrade format)
// and migrates them to the new metadata-key format (m/pending_header, m/pending_data).
//
// This prevents double-signing when a node is upgraded: without migration the
// new code would not find the pending block and would create+sign a new one at
// the same height.
func (e *Executor) migrateLegacyPendingBlock(ctx context.Context) error {
candidateHeight := e.getLastState().LastBlockHeight + 1
pendingHeader, pendingData, err := e.store.GetBlockData(ctx, candidateHeight)
if err != nil {
if !errors.Is(err, ds.ErrNotFound) {
return fmt.Errorf("get block data: %w", err)
}
return nil
}
if len(pendingHeader.Signature) != 0 {
return errors.New("pending block with signatures found")
}
// Migrate: write header+data to the new metadata keys.
if err := e.savePendingBlock(ctx, pendingHeader, pendingData); err != nil {
return fmt.Errorf("save migrated pending block: %w", err)
}

// Clean up old-style keys.
batch, err := e.store.NewBatch(ctx)
if err != nil {
return fmt.Errorf("create cleanup batch: %w", err)
}

headerBytes, err := pendingHeader.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal header for hash: %w", err)
}
headerHash := sha256.Sum256(headerBytes)

for _, key := range []string{
store.GetHeaderKey(candidateHeight),
store.GetDataKey(candidateHeight),
store.GetSignatureKey(candidateHeight),
store.GetIndexKey(headerHash[:]),
} {
if err := batch.Delete(ds.NewKey(key)); err != nil && !errors.Is(err, ds.ErrNotFound) {
return fmt.Errorf("delete legacy key %s: %w", key, err)
}
}

if err := batch.Commit(); err != nil {
return fmt.Errorf("commit cleanup batch: %w", err)
}

e.logger.Info().
Uint64("height", candidateHeight).
Msg("migrated legacy pending block to metadata format")
return nil
}
2 changes: 1 addition & 1 deletion pkg/store/header_store_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func TestHeaderStoreAdapter_HeadPrefersPending(t *testing.T) {

func TestHeaderStoreAdapter_GetFromPendingByHash(t *testing.T) {
t.Parallel()
ctx := context.Background()
ctx := t.Context()

ds, err := NewTestInMemoryKVStore()
require.NoError(t, err)
Expand Down
23 changes: 18 additions & 5 deletions pkg/store/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,43 @@ const (
heightPrefix = "t"
)

func getHeaderKey(height uint64) string {
// GetHeaderKey returns the store key for a block header at the given height.
func GetHeaderKey(height uint64) string {
return GenerateKey([]string{headerPrefix, strconv.FormatUint(height, 10)})
}

func getDataKey(height uint64) string {
func getHeaderKey(height uint64) string { return GetHeaderKey(height) }

// GetDataKey returns the store key for block data at the given height.
func GetDataKey(height uint64) string {
return GenerateKey([]string{dataPrefix, strconv.FormatUint(height, 10)})
}

func getSignatureKey(height uint64) string {
func getDataKey(height uint64) string { return GetDataKey(height) }

// GetSignatureKey returns the store key for a block signature at the given height.
func GetSignatureKey(height uint64) string {
return GenerateKey([]string{signaturePrefix, strconv.FormatUint(height, 10)})
}

func getSignatureKey(height uint64) string { return GetSignatureKey(height) }

func getStateAtHeightKey(height uint64) string {
return GenerateKey([]string{statePrefix, strconv.FormatUint(height, 10)})
}

func getMetaKey(key string) string {
// GetMetaKey returns the store key for a metadata entry.
func GetMetaKey(key string) string {
return GenerateKey([]string{metaPrefix, key})
}

func getIndexKey(hash types.Hash) string {
// GetIndexKey returns the store key for indexing a block by its hash.
func GetIndexKey(hash types.Hash) string {
return GenerateKey([]string{indexPrefix, hash.String()})
}

func getIndexKey(hash types.Hash) string { return GetIndexKey(hash) }

func getHeightKey() string {
return GenerateKey([]string{heightPrefix})
}
Expand Down
Loading
Loading