Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/aggsigdb/memory_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ type testDeadliner struct {
ch chan core.Duty
}

func (d *testDeadliner) Add(duty core.Duty) bool {
func (d *testDeadliner) Add(duty core.Duty) core.DeadlineStatus {
d.mu.Lock()
defer d.mu.Unlock()

d.added = append(d.added, duty)

return true
return core.DeadlineScheduled
}

func (d *testDeadliner) C() <-chan core.Duty {
Expand Down
4 changes: 2 additions & 2 deletions core/aggsigdb/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ func newNoopDeadliner() core.Deadliner {

type noopDeadliner struct{}

func (noopDeadliner) Add(core.Duty) bool {
return true
func (noopDeadliner) Add(core.Duty) core.DeadlineStatus {
return core.DeadlineScheduled
}

func (noopDeadliner) C() <-chan core.Duty {
Expand Down
10 changes: 5 additions & 5 deletions core/consensus/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,9 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err

ctx, span = core.StartDutyTrace(ctx, duty, "core/qbft.runInstance")

if !c.deadliner.Add(duty) {
span.AddEvent("Expired Duty Skipped")
log.Warn(ctx, "Skipping consensus for expired duty", nil)
if status := c.deadliner.Add(duty); status == core.DeadlineExpired || status == core.DeadlineExempt {
span.AddEvent("Expired/exempt duty skipped")
log.Warn(ctx, "Skipping consensus for expired/exempt duty", nil, z.Any("duty", duty))

return nil
}
Expand Down Expand Up @@ -710,8 +710,8 @@ func (c *Consensus) handle(ctx context.Context, _ peer.ID, req proto.Message) (p
)
}

if !c.deadliner.Add(duty) {
return nil, false, errors.New("duty expired", z.Any("duty", duty), c.dropFilter)
if status := c.deadliner.Add(duty); status == core.DeadlineExpired || status == core.DeadlineExempt {
return nil, false, errors.New("duty expired or exempt", z.Any("duty", duty), c.dropFilter)
}

select {
Expand Down
6 changes: 3 additions & 3 deletions core/consensus/qbft/qbft_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func TestQBFTConsensus_handle(t *testing.T) {
var tc Consensus

deadliner := coremocks.NewDeadliner(t)
deadliner.On("Add", mock.Anything).Maybe().Return(true)
deadliner.On("Add", mock.Anything).Maybe().Return(core.DeadlineScheduled)
tc.deadliner = deadliner
tc.mutable.instances = make(map[core.Duty]*instance.IO[Msg])
tc.gaterFunc = func(core.Duty) bool { return true }
Expand Down Expand Up @@ -717,7 +717,7 @@ func TestInstanceIO_MaybeStart(t *testing.T) {
var c Consensus

deadliner := coremocks.NewDeadliner(t)
deadliner.On("Add", mock.Anything).Return(true)
deadliner.On("Add", mock.Anything).Return(core.DeadlineScheduled)
c.deadliner = deadliner
c.gaterFunc = func(core.Duty) bool { return true }
c.mutable.instances = make(map[core.Duty]*instance.IO[Msg])
Expand Down Expand Up @@ -749,7 +749,7 @@ func TestInstanceIO_MaybeStart(t *testing.T) {
var c Consensus

deadliner := coremocks.NewDeadliner(t)
deadliner.On("Add", mock.Anything).Return(true)
deadliner.On("Add", mock.Anything).Return(core.DeadlineScheduled)
c.deadliner = deadliner
c.gaterFunc = func(core.Duty) bool { return true }
c.mutable.instances = make(map[core.Duty]*instance.IO[Msg])
Expand Down
2 changes: 1 addition & 1 deletion core/consensus/qbft/qbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func testQBFTConsensus(t *testing.T, threshold, nodes int) {
gaterFunc := func(core.Duty) bool { return true }

deadliner := coremocks.NewDeadliner(t)
deadliner.On("Add", mock.Anything).Return(true)
deadliner.On("Add", mock.Anything).Return(core.DeadlineScheduled)
deadliner.On("C").Return((<-chan core.Duty)(deadlineChan))

// Create a mock beacon client for test
Expand Down
65 changes: 42 additions & 23 deletions core/deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,31 @@ const (
// DeadlineFunc is a function that returns the deadline for a duty.
type DeadlineFunc func(Duty) (time.Time, bool)

// DeadlineStatus is the result of adding a duty to a Deadliner.
type DeadlineStatus int

const (
// DeadlineExpired indicates the duty's deadline has already passed,
// so it was not scheduled and will never be emitted on C().
DeadlineExpired DeadlineStatus = iota
// DeadlineScheduled indicates the duty was scheduled for future deadline expiry
// and will eventually be emitted on C().
DeadlineScheduled
// DeadlineExempt indicates the duty type never expires (e.g. exits),
// so it was not scheduled and will never be emitted on C().
DeadlineExempt
)

// Deadliner provides duty Deadline functionality. The C method isn’t thread safe and
// may only be used by a single goroutine. So, multiple instances are required
// for different components and use cases.
type Deadliner interface {
// Add returns true if the duty was added for future deadline scheduling. It is idempotent
// and returns true if the duty was previously added and still awaits deadline scheduling. It
// returns false if the duty has already expired and cannot therefore be added for scheduling.
Add(duty Duty) bool
// Add schedules the duty for future deadline expiry and returns DeadlineScheduled.
// It is idempotent and returns DeadlineScheduled if the duty was previously added and still awaits expiry.
// It returns DeadlineExpired if the duty's deadline has already passed.
// It returns DeadlineExempt if the duty type never expires.
// In the latter two cases the duty is not scheduled and will never be emitted on C().
Add(duty Duty) DeadlineStatus
Comment thread
KaloyanTanev marked this conversation as resolved.

// C returns the same read channel every time and contains deadlined duties.
// It should only be called by a single goroutine.
Expand All @@ -42,7 +59,7 @@ type Deadliner interface {
// deadlineInput represents the input to inputChan.
type deadlineInput struct {
duty Duty
success chan<- bool
success chan<- DeadlineStatus
}

// deadliner implements the Deadliner interface.
Expand Down Expand Up @@ -76,7 +93,7 @@ func NewDutyDeadlineFunc(ctx context.Context, eth2Cl eth2wrap.Client) (DeadlineF
return nil, err
}

slotDuration, _, err := eth2wrap.FetchSlotsConfig(ctx, eth2Cl)
slotDuration, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(ctx, eth2Cl)
if err != nil {
return nil, err
}
Expand All @@ -100,9 +117,11 @@ func NewDutyDeadlineFunc(ctx context.Context, eth2Cl eth2wrap.Client) (DeadlineF
duration = slotDuration / 3
case DutySyncMessage:
duration = 2 * slotDuration / 3
case DutyAttester, DutyAggregator, DutyPrepareAggregator:
// Even though attestations and aggregations are acceptable even after 2 slots, the rewards are heavily diminished.
duration = 2 * slotDuration
case DutyAttester, DutyAggregator:
// Attestations and aggregations are kept for a full epoch so late partial signatures are not dropped.
duration = time.Duration(slotsPerEpoch) * slotDuration
case DutyPrepareAggregator, DutyPrepareSyncContribution:
duration = 2 * time.Duration(slotsPerEpoch) * slotDuration
Comment thread
KaloyanTanev marked this conversation as resolved.
default:
duration = slotDuration
}
Expand Down Expand Up @@ -157,19 +176,18 @@ func (d *deadliner) run(ctx context.Context, deadlineFunc DeadlineFunc) {
deadline, canExpire := deadlineFunc(input.duty)
if !canExpire {
// Drop duties that never expire
input.success <- false
input.success <- DeadlineExempt
continue
}

expired := deadline.Before(d.clock.Now())

input.success <- !expired

// Ignore expired duties
if expired {
// Ignore (and signal) duties that have already expired.
if deadline.Before(d.clock.Now()) {
input.success <- DeadlineExpired
continue
}

input.success <- DeadlineScheduled

duties[input.duty] = true

if deadline.Before(currDeadline) {
Expand All @@ -194,21 +212,22 @@ func (d *deadliner) run(ctx context.Context, deadlineFunc DeadlineFunc) {
}
}

// Add adds a duty to be notified of the deadline. It returns true if the duty was added successfully.
func (d *deadliner) Add(duty Duty) bool {
success := make(chan bool)
// Add adds a duty to be notified of the deadline.
// See the Deadliner interface for the meaning of the returned DeadlineStatus.
func (d *deadliner) Add(duty Duty) DeadlineStatus {
success := make(chan DeadlineStatus)

select {
case <-d.quit:
return false
return DeadlineExpired
case d.inputChan <- deadlineInput{duty: duty, success: success}:
}

select {
case <-d.quit:
return false
case ok := <-success:
return ok
return DeadlineExpired
case status := <-success:
return status
}
}

Expand Down
55 changes: 41 additions & 14 deletions core/deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@ func TestDeadliner(t *testing.T) {
wg := &sync.WaitGroup{}

// Add our duties to the deadliner.
expectedFalseCh := make(chan bool, len(expiredDuties))
expectedTrueCh := make(chan bool, len(nonExpiredDuties)+len(voluntaryExits))
expectedExpiredCh := make(chan core.DeadlineStatus, len(expiredDuties))
expectedScheduledCh := make(chan core.DeadlineStatus, len(nonExpiredDuties)+len(voluntaryExits))

addDuties(t, wg, expiredDuties, expectedFalseCh, deadliner)
addDuties(t, wg, nonExpiredDuties, expectedTrueCh, deadliner)
addDuties(t, wg, voluntaryExits, expectedTrueCh, deadliner)
addDuties(t, wg, expiredDuties, expectedExpiredCh, deadliner)
addDuties(t, wg, nonExpiredDuties, expectedScheduledCh, deadliner)
addDuties(t, wg, voluntaryExits, expectedScheduledCh, deadliner)

// Wait till all the duties are added to the deadliner.
wg.Wait()

for range len(expiredDuties) {
require.False(t, <-expectedFalseCh)
require.Equal(t, core.DeadlineExpired, <-expectedExpiredCh)
}

for range len(nonExpiredDuties) + len(voluntaryExits) {
require.True(t, <-expectedTrueCh)
require.Equal(t, core.DeadlineScheduled, <-expectedScheduledCh)
}
Comment thread
KaloyanTanev marked this conversation as resolved.

var maxSlot uint64
Expand All @@ -88,17 +88,44 @@ func TestDeadliner(t *testing.T) {
require.Equal(t, nonExpiredDuties, actualDuties)
}

// TestDeadlinerExempt verifies that a duty whose deadline func reports it as never-expiring
// returns DeadlineExempt from Add and is never emitted on C().
func TestDeadlinerExempt(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

clock := clockwork.NewFakeClock()
neverExpires := func(core.Duty) (time.Time, bool) {
return time.Time{}, false
}

deadliner := core.NewDeadlinerForT(ctx, t, neverExpires, clock)

require.Equal(t, core.DeadlineExempt, deadliner.Add(core.NewVoluntaryExit(123)))

// Advance the clock well beyond any deadline; exempt duties must never be emitted on C().
clock.Advance(time.Hour)

select {
case got := <-deadliner.C():
require.Failf(t, "exempt duty must not be emitted on C()", "got duty %v", got)
case <-time.After(100 * time.Millisecond):
}
}

func TestNewDutyDeadlineFunc(t *testing.T) {
bmock, err := beaconmock.New(t.Context())
require.NoError(t, err)

genesisTime, err := eth2wrap.FetchGenesisTime(t.Context(), bmock)
require.NoError(t, err)

slotDuration, _, err := eth2wrap.FetchSlotsConfig(t.Context(), bmock)
slotDuration, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(t.Context(), bmock)
require.NoError(t, err)

margin := slotDuration / 12
oneEpoch := time.Duration(slotsPerEpoch) * slotDuration
twoEpochs := 2 * oneEpoch
currentSlot := uint64(time.Since(genesisTime) / slotDuration)
now := genesisTime.Add(time.Duration(currentSlot) * slotDuration)

Expand Down Expand Up @@ -129,15 +156,15 @@ func TestNewDutyDeadlineFunc(t *testing.T) {
},
{
duty: core.NewAttesterDuty(currentSlot),
expectedDuration: 2*slotDuration + margin,
expectedDuration: oneEpoch + margin,
},
{
duty: core.NewAggregatorDuty(currentSlot),
expectedDuration: 2*slotDuration + margin,
expectedDuration: oneEpoch + margin,
},
{
duty: core.NewPrepareAggregatorDuty(currentSlot),
expectedDuration: 2*slotDuration + margin,
expectedDuration: twoEpochs + margin,
},
{
duty: core.NewSyncMessageDuty(currentSlot),
Expand All @@ -157,7 +184,7 @@ func TestNewDutyDeadlineFunc(t *testing.T) {
},
{
duty: core.NewPrepareSyncContributionDuty(currentSlot),
expectedDuration: slotDuration + margin,
expectedDuration: twoEpochs + margin,
},
}

Expand All @@ -173,12 +200,12 @@ func TestNewDutyDeadlineFunc(t *testing.T) {
}

// addDuties runs a goroutine which adds the duties to the deadliner channel.
func addDuties(t *testing.T, wg *sync.WaitGroup, duties []core.Duty, expCh chan bool, deadliner core.Deadliner) {
func addDuties(t *testing.T, wg *sync.WaitGroup, duties []core.Duty, expCh chan core.DeadlineStatus, deadliner core.Deadliner) {
t.Helper()

wg.Add(1)

go func(duties []core.Duty, expCh chan bool) {
go func(duties []core.Duty, expCh chan core.DeadlineStatus) {
defer wg.Done()

for _, duty := range duties {
Expand Down
4 changes: 2 additions & 2 deletions core/dutydb/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (db *MemDB) Store(_ context.Context, duty core.Duty, unsignedSet core.Unsig
db.mu.Lock()
defer db.mu.Unlock()

if !db.deadliner.Add(duty) {
return errors.New("not storing unsigned data for expired duty", z.Any("duty", duty))
if status := db.deadliner.Add(duty); status == core.DeadlineExpired || status == core.DeadlineExempt {
return errors.New("not storing unsigned data for expired or exempt duty", z.Any("duty", duty))
}

switch duty.Type {
Expand Down
4 changes: 2 additions & 2 deletions core/dutydb/memory_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func TestCancelledQueries(t *testing.T) {

type noopDeadliner struct{}

func (t noopDeadliner) Add(duty core.Duty) bool {
return true
func (t noopDeadliner) Add(duty core.Duty) core.DeadlineStatus {
return core.DeadlineScheduled
}

func (t noopDeadliner) C() <-chan core.Duty {
Expand Down
4 changes: 2 additions & 2 deletions core/dutydb/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,13 +530,13 @@ type testDeadliner struct {
ch chan core.Duty
}

func (d *testDeadliner) Add(duty core.Duty) bool {
func (d *testDeadliner) Add(duty core.Duty) core.DeadlineStatus {
d.mu.Lock()
defer d.mu.Unlock()

d.added = append(d.added, duty)

return true
return core.DeadlineScheduled
}

func (d *testDeadliner) C() <-chan core.Duty {
Expand Down
8 changes: 4 additions & 4 deletions core/mocks/deadliner.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading