diff --git a/core/aggsigdb/memory_internal_test.go b/core/aggsigdb/memory_internal_test.go index cd9c455ab1..2c722217f1 100644 --- a/core/aggsigdb/memory_internal_test.go +++ b/core/aggsigdb/memory_internal_test.go @@ -121,13 +121,13 @@ type testDeadliner struct { ch chan core.Duty } -func (d *testDeadliner) Add(duty core.Duty) bool { +func (d *testDeadliner) Add(duty core.Duty) core.DeadlineStatus { d.mu.Lock() defer d.mu.Unlock() d.added = append(d.added, duty) - return true + return core.DeadlineScheduled } func (d *testDeadliner) C() <-chan core.Duty { diff --git a/core/aggsigdb/memory_test.go b/core/aggsigdb/memory_test.go index 2ae491fe47..6c9d5dce1b 100644 --- a/core/aggsigdb/memory_test.go +++ b/core/aggsigdb/memory_test.go @@ -235,8 +235,8 @@ func newNoopDeadliner() core.Deadliner { type noopDeadliner struct{} -func (noopDeadliner) Add(core.Duty) bool { - return true +func (noopDeadliner) Add(core.Duty) core.DeadlineStatus { + return core.DeadlineScheduled } func (noopDeadliner) C() <-chan core.Duty { diff --git a/core/consensus/qbft/qbft.go b/core/consensus/qbft/qbft.go index b1af4510ff..b274cee9d0 100644 --- a/core/consensus/qbft/qbft.go +++ b/core/consensus/qbft/qbft.go @@ -530,9 +530,9 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err ctx, span = core.StartDutyTrace(ctx, duty, "core/qbft.runInstance") - if !c.deadliner.Add(duty) { - span.AddEvent("Expired Duty Skipped") - log.Warn(ctx, "Skipping consensus for expired duty", nil) + if status := c.deadliner.Add(duty); status == core.DeadlineExpired || status == core.DeadlineExempt { + span.AddEvent("Expired/exempt duty skipped") + log.Warn(ctx, "Skipping consensus for expired/exempt duty", nil, z.Any("duty", duty)) return nil } @@ -710,8 +710,8 @@ func (c *Consensus) handle(ctx context.Context, _ peer.ID, req proto.Message) (p ) } - if !c.deadliner.Add(duty) { - return nil, false, errors.New("duty expired", z.Any("duty", duty), c.dropFilter) + if status := c.deadliner.Add(duty); status == core.DeadlineExpired || status == core.DeadlineExempt { + return nil, false, errors.New("duty expired or exempt", z.Any("duty", duty), c.dropFilter) } select { diff --git a/core/consensus/qbft/qbft_internal_test.go b/core/consensus/qbft/qbft_internal_test.go index 0e44872212..4d1d2f2344 100644 --- a/core/consensus/qbft/qbft_internal_test.go +++ b/core/consensus/qbft/qbft_internal_test.go @@ -616,7 +616,7 @@ func TestQBFTConsensus_handle(t *testing.T) { var tc Consensus deadliner := coremocks.NewDeadliner(t) - deadliner.On("Add", mock.Anything).Maybe().Return(true) + deadliner.On("Add", mock.Anything).Maybe().Return(core.DeadlineScheduled) tc.deadliner = deadliner tc.mutable.instances = make(map[core.Duty]*instance.IO[Msg]) tc.gaterFunc = func(core.Duty) bool { return true } @@ -717,7 +717,7 @@ func TestInstanceIO_MaybeStart(t *testing.T) { var c Consensus deadliner := coremocks.NewDeadliner(t) - deadliner.On("Add", mock.Anything).Return(true) + deadliner.On("Add", mock.Anything).Return(core.DeadlineScheduled) c.deadliner = deadliner c.gaterFunc = func(core.Duty) bool { return true } c.mutable.instances = make(map[core.Duty]*instance.IO[Msg]) @@ -749,7 +749,7 @@ func TestInstanceIO_MaybeStart(t *testing.T) { var c Consensus deadliner := coremocks.NewDeadliner(t) - deadliner.On("Add", mock.Anything).Return(true) + deadliner.On("Add", mock.Anything).Return(core.DeadlineScheduled) c.deadliner = deadliner c.gaterFunc = func(core.Duty) bool { return true } c.mutable.instances = make(map[core.Duty]*instance.IO[Msg]) diff --git a/core/consensus/qbft/qbft_test.go b/core/consensus/qbft/qbft_test.go index 137cc85509..318adba184 100644 --- a/core/consensus/qbft/qbft_test.go +++ b/core/consensus/qbft/qbft_test.go @@ -134,7 +134,7 @@ func testQBFTConsensus(t *testing.T, threshold, nodes int) { gaterFunc := func(core.Duty) bool { return true } deadliner := coremocks.NewDeadliner(t) - deadliner.On("Add", mock.Anything).Return(true) + deadliner.On("Add", mock.Anything).Return(core.DeadlineScheduled) deadliner.On("C").Return((<-chan core.Duty)(deadlineChan)) // Create a mock beacon client for test diff --git a/core/deadline.go b/core/deadline.go index 663e711fb9..8fbdb6c20d 100644 --- a/core/deadline.go +++ b/core/deadline.go @@ -25,14 +25,31 @@ const ( // DeadlineFunc is a function that returns the deadline for a duty. type DeadlineFunc func(Duty) (time.Time, bool) +// DeadlineStatus is the result of adding a duty to a Deadliner. +type DeadlineStatus int + +const ( + // DeadlineExpired indicates the duty's deadline has already passed, + // so it was not scheduled and will never be emitted on C(). + DeadlineExpired DeadlineStatus = iota + // DeadlineScheduled indicates the duty was scheduled for future deadline expiry + // and will eventually be emitted on C(). + DeadlineScheduled + // DeadlineExempt indicates the duty type never expires (e.g. exits), + // so it was not scheduled and will never be emitted on C(). + DeadlineExempt +) + // Deadliner provides duty Deadline functionality. The C method isn’t thread safe and // may only be used by a single goroutine. So, multiple instances are required // for different components and use cases. type Deadliner interface { - // Add returns true if the duty was added for future deadline scheduling. It is idempotent - // and returns true if the duty was previously added and still awaits deadline scheduling. It - // returns false if the duty has already expired and cannot therefore be added for scheduling. - Add(duty Duty) bool + // Add schedules the duty for future deadline expiry and returns DeadlineScheduled. + // It is idempotent and returns DeadlineScheduled if the duty was previously added and still awaits expiry. + // It returns DeadlineExpired if the duty's deadline has already passed. + // It returns DeadlineExempt if the duty type never expires. + // In the latter two cases the duty is not scheduled and will never be emitted on C(). + Add(duty Duty) DeadlineStatus // C returns the same read channel every time and contains deadlined duties. // It should only be called by a single goroutine. @@ -42,7 +59,7 @@ type Deadliner interface { // deadlineInput represents the input to inputChan. type deadlineInput struct { duty Duty - success chan<- bool + success chan<- DeadlineStatus } // deadliner implements the Deadliner interface. @@ -76,7 +93,7 @@ func NewDutyDeadlineFunc(ctx context.Context, eth2Cl eth2wrap.Client) (DeadlineF return nil, err } - slotDuration, _, err := eth2wrap.FetchSlotsConfig(ctx, eth2Cl) + slotDuration, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(ctx, eth2Cl) if err != nil { return nil, err } @@ -100,9 +117,11 @@ func NewDutyDeadlineFunc(ctx context.Context, eth2Cl eth2wrap.Client) (DeadlineF duration = slotDuration / 3 case DutySyncMessage: duration = 2 * slotDuration / 3 - case DutyAttester, DutyAggregator, DutyPrepareAggregator: - // Even though attestations and aggregations are acceptable even after 2 slots, the rewards are heavily diminished. - duration = 2 * slotDuration + case DutyAttester, DutyAggregator: + // Attestations and aggregations are kept for a full epoch so late partial signatures are not dropped. + duration = time.Duration(slotsPerEpoch) * slotDuration + case DutyPrepareAggregator, DutyPrepareSyncContribution: + duration = 2 * time.Duration(slotsPerEpoch) * slotDuration default: duration = slotDuration } @@ -157,19 +176,18 @@ func (d *deadliner) run(ctx context.Context, deadlineFunc DeadlineFunc) { deadline, canExpire := deadlineFunc(input.duty) if !canExpire { // Drop duties that never expire - input.success <- false + input.success <- DeadlineExempt continue } - expired := deadline.Before(d.clock.Now()) - - input.success <- !expired - - // Ignore expired duties - if expired { + // Ignore (and signal) duties that have already expired. + if deadline.Before(d.clock.Now()) { + input.success <- DeadlineExpired continue } + input.success <- DeadlineScheduled + duties[input.duty] = true if deadline.Before(currDeadline) { @@ -194,21 +212,22 @@ func (d *deadliner) run(ctx context.Context, deadlineFunc DeadlineFunc) { } } -// Add adds a duty to be notified of the deadline. It returns true if the duty was added successfully. -func (d *deadliner) Add(duty Duty) bool { - success := make(chan bool) +// Add adds a duty to be notified of the deadline. +// See the Deadliner interface for the meaning of the returned DeadlineStatus. +func (d *deadliner) Add(duty Duty) DeadlineStatus { + success := make(chan DeadlineStatus) select { case <-d.quit: - return false + return DeadlineExpired case d.inputChan <- deadlineInput{duty: duty, success: success}: } select { case <-d.quit: - return false - case ok := <-success: - return ok + return DeadlineExpired + case status := <-success: + return status } } diff --git a/core/deadline_test.go b/core/deadline_test.go index 2581587842..0a10efcce4 100644 --- a/core/deadline_test.go +++ b/core/deadline_test.go @@ -48,22 +48,22 @@ func TestDeadliner(t *testing.T) { wg := &sync.WaitGroup{} // Add our duties to the deadliner. - expectedFalseCh := make(chan bool, len(expiredDuties)) - expectedTrueCh := make(chan bool, len(nonExpiredDuties)+len(voluntaryExits)) + expectedExpiredCh := make(chan core.DeadlineStatus, len(expiredDuties)) + expectedScheduledCh := make(chan core.DeadlineStatus, len(nonExpiredDuties)+len(voluntaryExits)) - addDuties(t, wg, expiredDuties, expectedFalseCh, deadliner) - addDuties(t, wg, nonExpiredDuties, expectedTrueCh, deadliner) - addDuties(t, wg, voluntaryExits, expectedTrueCh, deadliner) + addDuties(t, wg, expiredDuties, expectedExpiredCh, deadliner) + addDuties(t, wg, nonExpiredDuties, expectedScheduledCh, deadliner) + addDuties(t, wg, voluntaryExits, expectedScheduledCh, deadliner) // Wait till all the duties are added to the deadliner. wg.Wait() for range len(expiredDuties) { - require.False(t, <-expectedFalseCh) + require.Equal(t, core.DeadlineExpired, <-expectedExpiredCh) } for range len(nonExpiredDuties) + len(voluntaryExits) { - require.True(t, <-expectedTrueCh) + require.Equal(t, core.DeadlineScheduled, <-expectedScheduledCh) } var maxSlot uint64 @@ -88,6 +88,31 @@ func TestDeadliner(t *testing.T) { require.Equal(t, nonExpiredDuties, actualDuties) } +// TestDeadlinerExempt verifies that a duty whose deadline func reports it as never-expiring +// returns DeadlineExempt from Add and is never emitted on C(). +func TestDeadlinerExempt(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + clock := clockwork.NewFakeClock() + neverExpires := func(core.Duty) (time.Time, bool) { + return time.Time{}, false + } + + deadliner := core.NewDeadlinerForT(ctx, t, neverExpires, clock) + + require.Equal(t, core.DeadlineExempt, deadliner.Add(core.NewVoluntaryExit(123))) + + // Advance the clock well beyond any deadline; exempt duties must never be emitted on C(). + clock.Advance(time.Hour) + + select { + case got := <-deadliner.C(): + require.Failf(t, "exempt duty must not be emitted on C()", "got duty %v", got) + case <-time.After(100 * time.Millisecond): + } +} + func TestNewDutyDeadlineFunc(t *testing.T) { bmock, err := beaconmock.New(t.Context()) require.NoError(t, err) @@ -95,10 +120,12 @@ func TestNewDutyDeadlineFunc(t *testing.T) { genesisTime, err := eth2wrap.FetchGenesisTime(t.Context(), bmock) require.NoError(t, err) - slotDuration, _, err := eth2wrap.FetchSlotsConfig(t.Context(), bmock) + slotDuration, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(t.Context(), bmock) require.NoError(t, err) margin := slotDuration / 12 + oneEpoch := time.Duration(slotsPerEpoch) * slotDuration + twoEpochs := 2 * oneEpoch currentSlot := uint64(time.Since(genesisTime) / slotDuration) now := genesisTime.Add(time.Duration(currentSlot) * slotDuration) @@ -129,15 +156,15 @@ func TestNewDutyDeadlineFunc(t *testing.T) { }, { duty: core.NewAttesterDuty(currentSlot), - expectedDuration: 2*slotDuration + margin, + expectedDuration: oneEpoch + margin, }, { duty: core.NewAggregatorDuty(currentSlot), - expectedDuration: 2*slotDuration + margin, + expectedDuration: oneEpoch + margin, }, { duty: core.NewPrepareAggregatorDuty(currentSlot), - expectedDuration: 2*slotDuration + margin, + expectedDuration: twoEpochs + margin, }, { duty: core.NewSyncMessageDuty(currentSlot), @@ -157,7 +184,7 @@ func TestNewDutyDeadlineFunc(t *testing.T) { }, { duty: core.NewPrepareSyncContributionDuty(currentSlot), - expectedDuration: slotDuration + margin, + expectedDuration: twoEpochs + margin, }, } @@ -173,12 +200,12 @@ func TestNewDutyDeadlineFunc(t *testing.T) { } // addDuties runs a goroutine which adds the duties to the deadliner channel. -func addDuties(t *testing.T, wg *sync.WaitGroup, duties []core.Duty, expCh chan bool, deadliner core.Deadliner) { +func addDuties(t *testing.T, wg *sync.WaitGroup, duties []core.Duty, expCh chan core.DeadlineStatus, deadliner core.Deadliner) { t.Helper() wg.Add(1) - go func(duties []core.Duty, expCh chan bool) { + go func(duties []core.Duty, expCh chan core.DeadlineStatus) { defer wg.Done() for _, duty := range duties { diff --git a/core/dutydb/memory.go b/core/dutydb/memory.go index 2f2e0dca53..c831366718 100644 --- a/core/dutydb/memory.go +++ b/core/dutydb/memory.go @@ -74,8 +74,8 @@ func (db *MemDB) Store(_ context.Context, duty core.Duty, unsignedSet core.Unsig db.mu.Lock() defer db.mu.Unlock() - if !db.deadliner.Add(duty) { - return errors.New("not storing unsigned data for expired duty", z.Any("duty", duty)) + if status := db.deadliner.Add(duty); status == core.DeadlineExpired || status == core.DeadlineExempt { + return errors.New("not storing unsigned data for expired or exempt duty", z.Any("duty", duty)) } switch duty.Type { diff --git a/core/dutydb/memory_internal_test.go b/core/dutydb/memory_internal_test.go index c39394aa6f..c154ff5ffd 100644 --- a/core/dutydb/memory_internal_test.go +++ b/core/dutydb/memory_internal_test.go @@ -54,8 +54,8 @@ func TestCancelledQueries(t *testing.T) { type noopDeadliner struct{} -func (t noopDeadliner) Add(duty core.Duty) bool { - return true +func (t noopDeadliner) Add(duty core.Duty) core.DeadlineStatus { + return core.DeadlineScheduled } func (t noopDeadliner) C() <-chan core.Duty { diff --git a/core/dutydb/memory_test.go b/core/dutydb/memory_test.go index 428c7127da..8c263f7664 100644 --- a/core/dutydb/memory_test.go +++ b/core/dutydb/memory_test.go @@ -530,13 +530,13 @@ type testDeadliner struct { ch chan core.Duty } -func (d *testDeadliner) Add(duty core.Duty) bool { +func (d *testDeadliner) Add(duty core.Duty) core.DeadlineStatus { d.mu.Lock() defer d.mu.Unlock() d.added = append(d.added, duty) - return true + return core.DeadlineScheduled } func (d *testDeadliner) C() <-chan core.Duty { diff --git a/core/mocks/deadliner.go b/core/mocks/deadliner.go index e39ab8f6b0..a809b65c60 100644 --- a/core/mocks/deadliner.go +++ b/core/mocks/deadliner.go @@ -15,18 +15,18 @@ type Deadliner struct { } // Add provides a mock function with given fields: duty -func (_m *Deadliner) Add(duty core.Duty) bool { +func (_m *Deadliner) Add(duty core.Duty) core.DeadlineStatus { ret := _m.Called(duty) if len(ret) == 0 { panic("no return value specified for Add") } - var r0 bool - if rf, ok := ret.Get(0).(func(core.Duty) bool); ok { + var r0 core.DeadlineStatus + if rf, ok := ret.Get(0).(func(core.Duty) core.DeadlineStatus); ok { r0 = rf(duty) } else { - r0 = ret.Get(0).(bool) + r0 = ret.Get(0).(core.DeadlineStatus) } return r0 diff --git a/core/parsigdb/memory.go b/core/parsigdb/memory.go index 74f6f7365e..9590dd474d 100644 --- a/core/parsigdb/memory.go +++ b/core/parsigdb/memory.go @@ -16,6 +16,14 @@ import ( "github.com/obolnetwork/charon/core" ) +// maxExemptEntriesPerShare bounds how many distinct exempt-duty entries (e.g. exit epochs or +// builder registrations) are retained per (share index, validator, duty type). Exempt duties are +// never trimmed by the deadliner, so without a cap a single peer could store an unbounded number of +// them (e.g. by replaying a valid exit signature across arbitrary slots). A DV cluster only ever +// produces one such entry per validator per share, so 10 gives generous head-room for operator +// error while keeping memory bounded (N shares * 10 per validator per type). +const maxExemptEntriesPerShare = 10 + // NewMemDBMetadata returns a new in-memory partial signature database instance. func NewMemDBMetadata(slotDuration uint64, genesisTime time.Time) MemDBMetadata { return MemDBMetadata{ @@ -32,11 +40,12 @@ type MemDBMetadata struct { // NewMemDB returns a new in-memory partial signature database instance. func NewMemDB(threshold int, deadliner core.Deadliner, metadata MemDBMetadata) *MemDB { return &MemDB{ - entries: make(map[key][]core.ParSignedData), - keysByDuty: make(map[core.Duty][]key), - threshold: threshold, - deadliner: deadliner, - metadata: metadata, + entries: make(map[key][]core.ParSignedData), + keysByDuty: make(map[core.Duty][]key), + exemptEntries: make(map[exemptEntryKey][]key), + threshold: threshold, + deadliner: deadliner, + metadata: metadata, } } @@ -48,8 +57,12 @@ type MemDB struct { entries map[key][]core.ParSignedData keysByDuty map[core.Duty][]key - threshold int - deadliner core.Deadliner + // exemptEntries indexes exempt-duty entries (which the deadliner never trims) by + // (share index, validator, duty type) in insertion order, so they can be capped and + // evicted oldest-first to bound memory. + exemptEntries map[exemptEntryKey][]key + threshold int + deadliner core.Deadliner metadata MemDBMetadata } @@ -97,12 +110,31 @@ func (db *MemDB) StoreInternal(ctx context.Context, duty core.Duty, signedSet co // StoreExternal stores an externally received partially signed duty data set. func (db *MemDB) StoreExternal(ctx context.Context, duty core.Duty, signedSet core.ParSignedDataSet) error { - _ = db.deadliner.Add(duty) // TODO(corver): Distinguish between no deadline supported vs already expired. + // The deadliner drives all trimming: entries are only ever deleted when their duty is emitted on deadliner.C(). + // A duty whose deadline has already passed is never scheduled, so storing it would leak forever. + // Duties that never expire must still be stored. + status := db.deadliner.Add(duty) + if status == core.DeadlineExpired { + var shareIdx int + for _, sig := range signedSet { + shareIdx = sig.ShareIdx + break + } + + log.Warn(ctx, "Dropping partial signatures received for expired duty", nil, + z.Any("duty", duty), z.Int("share_idx", shareIdx)) + + return nil + } + + // Exempt duties (exits, builder registrations) are never trimmed by the deadliner, so their + // entries are capped per (share index, validator, duty type) in store to bound memory. + exempt := status == core.DeadlineExempt output := make(map[core.PubKey][]core.ParSignedData) for pubkey, sig := range signedSet { - sigs, ok, err := db.store(key{Duty: duty, PubKey: pubkey}, sig) + sigs, ok, err := db.store(ctx, key{Duty: duty, PubKey: pubkey}, sig, exempt) if err != nil { return err } else if !ok { @@ -159,7 +191,7 @@ func (db *MemDB) Trim(ctx context.Context) { // store returns true if the value was added to the list of signatures at the provided key // and returns a copy of the resulting list. -func (db *MemDB) store(k key, value core.ParSignedData) ([]core.ParSignedData, bool, error) { +func (db *MemDB) store(ctx context.Context, k key, value core.ParSignedData, exempt bool) ([]core.ParSignedData, bool, error) { db.mu.Lock() defer db.mu.Unlock() @@ -200,8 +232,19 @@ func (db *MemDB) store(k key, value core.ParSignedData) ([]core.ParSignedData, b return nil, false, err } + isNewKey := len(db.entries[k]) == 0 + db.entries[k] = append(db.entries[k], clone) - db.keysByDuty[k.Duty] = append(db.keysByDuty[k.Duty], k) + + if exempt { + // Exempt duties are never emitted on deadliner.C(), so they are tracked and capped + // here (under the same lock) instead of being trimmed via keysByDuty. + db.trackExemptUnsafe(ctx, k, value.ShareIdx) + } else if isNewKey { + // Index each key once; Trim deletes by key, so appending per share signature would + // only add redundant duplicates (O(validators*shares) instead of O(validators)). + db.keysByDuty[k.Duty] = append(db.keysByDuty[k.Duty], k) + } if k.Duty.Type == core.DutyExit { exitCounter.WithLabelValues(k.PubKey.String()).Inc() @@ -282,3 +325,67 @@ type key struct { Duty core.Duty PubKey core.PubKey } + +// exemptEntryKey indexes exempt-duty entries by share index, validator and duty type. +// Distinct entries within this key correspond to distinct duties (e.g. exit epochs). +type exemptEntryKey struct { + ShareIdx int + PubKey core.PubKey + DutyType core.DutyType +} + +// trackExemptUnsafe records a newly stored exempt-duty entry for capping. It assumes db.mu is held +// and that k was just added as a new entry for shareIdx. +func (db *MemDB) trackExemptUnsafe(ctx context.Context, k key, shareIdx int) { + ek := exemptEntryKey{ShareIdx: shareIdx, PubKey: k.PubKey, DutyType: k.Duty.Type} + + stored := db.exemptEntries[ek] + if len(stored) > 0 { + log.Warn(ctx, "Received exempt duty for validator with a different epoch/slot than already stored", nil, + z.Any("duty", k.Duty), + z.Any("pubkey", k.PubKey), + z.Int("share_idx", shareIdx), + z.Int("curr_sigs_per_share", len(stored)), + z.Int("max_allowed_sigs_per_share", maxExemptEntriesPerShare), + ) + } + + stored = append(stored, k) + + // We append one entry at a time and always trim back to the cap, so a single new entry can + // exceed it by at most one; evict the oldest (this share's signature) in that case. + if len(stored) > maxExemptEntriesPerShare { + db.evictExemptShareEntryUnsafe(ctx, stored[0], shareIdx) + stored = stored[1:] + } + + db.exemptEntries[ek] = stored +} + +// evictExemptShareEntryUnsafe removes the given share's partial signature from the entry at k, +// deleting the entry entirely if no other shares remain. It warns with the evicted data for +// forensics, since eviction only happens when a share exceeds the per-share cap. It assumes db.mu is held. +func (db *MemDB) evictExemptShareEntryUnsafe(ctx context.Context, k key, shareIdx int) { + // Log the evicted key (not the data) for forensics; this is a hot path, so avoid marshaling. + log.Warn(ctx, "Evicting oldest exempt partial signature exceeding per-share cap", nil, + z.Any("duty", k.Duty), + z.Any("pubkey", k.PubKey), + z.Int("share_idx", shareIdx), + z.Int("max_allowed_sigs_per_share", maxExemptEntriesPerShare), + ) + + sigs := db.entries[k] + + remaining := sigs[:0] + for _, sig := range sigs { + if sig.ShareIdx != shareIdx { + remaining = append(remaining, sig) + } + } + + if len(remaining) == 0 { + delete(db.entries, k) + } else { + db.entries[k] = remaining + } +} diff --git a/core/parsigdb/memory_internal_test.go b/core/parsigdb/memory_internal_test.go index c9a6cd9999..3dbe0c0cd8 100644 --- a/core/parsigdb/memory_internal_test.go +++ b/core/parsigdb/memory_internal_test.go @@ -154,6 +154,90 @@ func TestMemDBThreshold(t *testing.T) { require.Equal(t, 2, timesCalled) } +// TestMemDBStoreExternalExpired verifies that StoreExternal drops partial signatures for duties the +// deadliner reports as already expired (which can never be trimmed and would otherwise leak), while +// still storing scheduled and never-expiring duties. +func TestMemDBStoreExternalExpired(t *testing.T) { + tests := []struct { + name string + status core.DeadlineStatus + wantStored bool // stored in entries + wantInKeysByDuty bool // also tracked for deadliner trimming (non-exempt only) + }{ + {name: "expired_dropped", status: core.DeadlineExpired, wantStored: false, wantInKeysByDuty: false}, + {name: "scheduled_stored", status: core.DeadlineScheduled, wantStored: true, wantInKeysByDuty: true}, + // Exempt duties are stored but tracked via exemptEntries (capped), not keysByDuty. + {name: "exempt_stored", status: core.DeadlineExempt, wantStored: true, wantInKeysByDuty: false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db := NewMemDB(7, fixedDeadliner{status: tt.status}, NewMemDBMetadata(eth2util.Mainnet.SlotDuration, time.Unix(eth2util.Mainnet.GenesisTimestamp, 0))) + + pubkey := testutil.RandomCorePubKey(t) + parAtt, err := core.NewPartialVersionedAttestation(testutil.RandomDenebVersionedAttestation(), 1) + require.NoError(t, err) + + err = db.StoreExternal(context.Background(), core.NewAttesterDuty(123), core.ParSignedDataSet{pubkey: parAtt}) + require.NoError(t, err) + + db.mu.Lock() + gotEntries, gotKeys := len(db.entries), len(db.keysByDuty) + db.mu.Unlock() + + if tt.wantStored { + require.Equal(t, 1, gotEntries) + } else { + require.Zero(t, gotEntries) + } + + if tt.wantInKeysByDuty { + require.Equal(t, 1, gotKeys) + } else { + require.Zero(t, gotKeys) + } + }) + } +} + +// TestMemDBExemptCap verifies that exempt-duty entries are capped per share index per validator +// (evicting oldest), bounding memory against a peer that replays many distinct epochs/slots. +func TestMemDBExemptCap(t *testing.T) { + const shareIdx = 1 + + // fixedDeadliner returns DeadlineExempt for every duty, so all stored entries are treated as + // exempt regardless of type, exercising the per-share cap. + db := NewMemDB(7, fixedDeadliner{status: core.DeadlineExempt}, NewMemDBMetadata(eth2util.Mainnet.SlotDuration, time.Unix(eth2util.Mainnet.GenesisTimestamp, 0))) + + pubkey := testutil.RandomCorePubKey(t) + att := testutil.RandomDenebVersionedAttestation() + + // Store more distinct slots than the cap, all for the same validator and share index. + const stored = maxExemptEntriesPerShare + 5 + for slot := range uint64(stored) { + parAtt, err := core.NewPartialVersionedAttestation(att, shareIdx) + require.NoError(t, err) + + err = db.StoreExternal(context.Background(), core.NewAttesterDuty(slot), core.ParSignedDataSet{pubkey: parAtt}) + require.NoError(t, err) + } + + db.mu.Lock() + defer db.mu.Unlock() + + require.Len(t, db.entries, maxExemptEntriesPerShare, "entries must be capped at maxExemptEntriesPerShare") + require.Len(t, db.exemptEntries[exemptEntryKey{ShareIdx: shareIdx, PubKey: pubkey, DutyType: core.DutyAttester}], maxExemptEntriesPerShare) +} + +// fixedDeadliner is a Deadliner that returns a fixed DeadlineStatus and never deadlines anything. +type fixedDeadliner struct { + status core.DeadlineStatus +} + +func (d fixedDeadliner) Add(core.Duty) core.DeadlineStatus { return d.status } + +func (fixedDeadliner) C() <-chan core.Duty { return nil } + func newTestDeadliner() *testDeadliner { return &testDeadliner{ ch: make(chan core.Duty), @@ -177,9 +261,9 @@ func (t *testDeadliner) Expire() bool { return true } -func (t *testDeadliner) Add(duty core.Duty) bool { +func (t *testDeadliner) Add(duty core.Duty) core.DeadlineStatus { t.added = append(t.added, duty) - return true + return core.DeadlineScheduled } func (t *testDeadliner) C() <-chan core.Duty { diff --git a/core/priority/prioritiser.go b/core/priority/prioritiser.go index 2a545c93f3..f8dfbe6027 100644 --- a/core/priority/prioritiser.go +++ b/core/priority/prioritiser.go @@ -186,8 +186,8 @@ func (p *Prioritiser) Prioritise(ctx context.Context, msg *pbv1.PriorityMsg) err duty := core.DutyFromProto(msg.GetDuty()) ctx = log.WithCtx(ctx, z.Any("duty", duty)) - if !p.deadliner.Add(duty) { - log.Warn(ctx, "Dropping priority protocol instance for expired duty", nil) + if status := p.deadliner.Add(duty); status == core.DeadlineExpired || status == core.DeadlineExempt { + log.Warn(ctx, "Dropping priority protocol instance for expired/exempt duty", nil, z.Any("duty", duty)) return nil } @@ -215,8 +215,8 @@ func (p *Prioritiser) handleRequest(ctx context.Context, pID peer.ID, msg *pbv1. duty := core.DutyFromProto(msg.GetDuty()) - if !p.deadliner.Add(duty) { - return nil, errors.New("duty expired") + if status := p.deadliner.Add(duty); status == core.DeadlineExpired || status == core.DeadlineExempt { + return nil, errors.New("duty expired or exempt", z.Any("duty", duty)) } reqBuffer := p.getReqBuffer(duty) diff --git a/core/tracker/tracker.go b/core/tracker/tracker.go index 473b8d9802..c1aba67f47 100644 --- a/core/tracker/tracker.go +++ b/core/tracker/tracker.go @@ -142,8 +142,13 @@ func (t *Tracker) Run(ctx context.Context) error { continue // Ignore events before from slot. } - if !t.deleter.Add(e.duty) || !t.analyser.Add(e.duty) { - continue // Ignore expired or never expiring duties + // Ignore expired or exempt duties. + if status := t.deleter.Add(e.duty); status == core.DeadlineExpired || status == core.DeadlineExempt { + continue + } + + if status := t.analyser.Add(e.duty); status == core.DeadlineExpired || status == core.DeadlineExempt { + continue } t.events[e.duty] = append(t.events[e.duty], e) diff --git a/core/tracker/tracker_internal_test.go b/core/tracker/tracker_internal_test.go index 919d3f4cf7..bdff35496e 100644 --- a/core/tracker/tracker_internal_test.go +++ b/core/tracker/tracker_internal_test.go @@ -675,8 +675,8 @@ type testDeadliner struct { deadlineChan chan core.Duty } -func (testDeadliner) Add(core.Duty) bool { - return true +func (testDeadliner) Add(core.Duty) core.DeadlineStatus { + return core.DeadlineScheduled } func (t testDeadliner) C() <-chan core.Duty { diff --git a/dkg/exchanger.go b/dkg/exchanger.go index e76aa46846..2e7f99efde 100644 --- a/dkg/exchanger.go +++ b/dkg/exchanger.go @@ -173,8 +173,8 @@ func (e *exchanger) pushPsigs(ctx context.Context, duty core.Duty, set map[core. // noopDeadliner is a deadliner that does nothing. type noopDeadliner struct{} -func (noopDeadliner) Add(core.Duty) bool { - return true +func (noopDeadliner) Add(core.Duty) core.DeadlineStatus { + return core.DeadlineScheduled } func (noopDeadliner) C() <-chan core.Duty {