Skip to content

Commit 1e15ee8

Browse files
committed
WIP
1 parent f1ee27a commit 1e15ee8

File tree

3 files changed

+43
-41
lines changed

3 files changed

+43
-41
lines changed

agent/immortalstreams/manager_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import (
44
"context"
55
"io"
66
"net"
7+
"runtime"
78
"sync"
89
"testing"
9-
"time"
1010

1111
"github.com/google/uuid"
1212

@@ -387,8 +387,13 @@ func TestManager_Eviction(t *testing.T) {
387387
}
388388

389389
// Close the first connection to make it disconnected
390-
time.Sleep(100 * time.Millisecond) // Let connections establish
390+
// Wait for connections to be established
391391
connMu.Lock()
392+
for len(conns) == 0 {
393+
connMu.Unlock()
394+
runtime.Gosched()
395+
connMu.Lock()
396+
}
392397
require.Greater(t, len(conns), 0)
393398
_ = conns[0].Close()
394399
connMu.Unlock()
@@ -400,8 +405,10 @@ func TestManager_Eviction(t *testing.T) {
400405
// Manually trigger disconnection since the automatic detection isn't working
401406
firstStream.SignalDisconnect()
402407

403-
// Wait a bit for the disconnection to be processed
404-
time.Sleep(50 * time.Millisecond)
408+
// Wait for the disconnection to be processed
409+
for firstStream.IsConnected() {
410+
runtime.Gosched()
411+
}
405412

406413
// Verify the first stream is now disconnected
407414
require.False(t, firstStream.IsConnected(), "First stream should be disconnected")

agent/immortalstreams/stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func (s *Stream) Close() error {
304304
s.closed = true
305305
s.connected = false
306306

307-
// Cancel the context to interrupt any pending BackedPipe operations
307+
// Cancel will interrupt any pending BackedPipe operations
308308
if s.cancel != nil {
309309
s.cancel()
310310
}

agent/immortalstreams/stream_test.go

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package immortalstreams_test
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"io"
78
"net"
@@ -129,10 +130,10 @@ func TestStream_HandleReconnect(t *testing.T) {
129130
_ = fromServerWrite1.Close()
130131

131132
// Wait until the stream is marked disconnected
132-
deadline0 := time.Now().Add(2 * time.Second)
133-
for stream.IsConnected() && time.Now().Before(deadline0) {
134-
time.Sleep(10 * time.Millisecond)
135-
}
133+
ctx := testutil.Context(t, testutil.WaitShort)
134+
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
135+
return !stream.IsConnected()
136+
}, testutil.IntervalFast)
136137
require.False(t, stream.IsConnected())
137138

138139
// Create new client connection (full-duplex)
@@ -297,23 +298,23 @@ func TestStream_ConcurrentAccess(t *testing.T) {
297298
defer wg.Done()
298299
for i := 0; i < 100; i++ {
299300
_ = stream.IsConnected()
300-
time.Sleep(time.Microsecond)
301+
runtime.Gosched() // Yield to other goroutines
301302
}
302303
}()
303304

304305
go func() {
305306
defer wg.Done()
306307
for i := 0; i < 100; i++ {
307308
_ = stream.ToAPI()
308-
time.Sleep(time.Microsecond)
309+
runtime.Gosched() // Yield to other goroutines
309310
}
310311
}()
311312

312313
go func() {
313314
defer wg.Done()
314315
for i := 0; i < 100; i++ {
315316
_ = stream.LastDisconnectionAt()
316-
time.Sleep(time.Microsecond)
317+
runtime.Gosched() // Yield to other goroutines
317318
}
318319
}()
319320

@@ -323,7 +324,7 @@ func TestStream_ConcurrentAccess(t *testing.T) {
323324
// Test other concurrent operations instead
324325
_ = stream.IsConnected()
325326
_ = stream.ToAPI()
326-
time.Sleep(time.Microsecond)
327+
runtime.Gosched() // Yield to other goroutines
327328
}
328329
}()
329330

@@ -403,9 +404,9 @@ func BenchmarkImmortalStream_ReconnectLatency(b *testing.B) {
403404
require.NoError(b, stream.HandleReconnect(c1, 0))
404405
// Ensure disconnected before starting benchmark loop
405406
_ = r1.Close()
406-
deadline := time.Now().Add(2 * time.Second)
407-
for stream.IsConnected() && time.Now().Before(deadline) {
408-
time.Sleep(5 * time.Millisecond)
407+
// Use a simple loop for benchmarks to avoid overhead
408+
for stream.IsConnected() {
409+
runtime.Gosched()
409410
}
410411

411412
b.ReportAllocs()
@@ -425,10 +426,9 @@ func BenchmarkImmortalStream_ReconnectLatency(b *testing.B) {
425426

426427
// Immediately disconnect for next iteration
427428
_ = remote.Close()
428-
// Wait until disconnected
429-
deadline := time.Now().Add(2 * time.Second)
430-
for stream.IsConnected() && time.Now().Before(deadline) {
431-
time.Sleep(5 * time.Millisecond)
429+
// Wait until disconnected - use a simple loop with runtime.Gosched for benchmarks
430+
for stream.IsConnected() {
431+
runtime.Gosched()
432432
}
433433
}
434434
}
@@ -544,10 +544,10 @@ func TestStream_ReconnectionScenarios(t *testing.T) {
544544
_ = fromServerWriteA.Close()
545545

546546
// Wait until the stream is marked disconnected
547-
deadline := time.Now().Add(2 * time.Second)
548-
for stream2.IsConnected() && time.Now().Before(deadline) {
549-
time.Sleep(10 * time.Millisecond)
550-
}
547+
ctx := testutil.Context(t, testutil.WaitShort)
548+
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
549+
return !stream2.IsConnected()
550+
}, testutil.IntervalFast)
551551
require.False(t, stream2.IsConnected())
552552

553553
// Reconnect with new client
@@ -580,7 +580,7 @@ func TestStream_ReconnectionScenarios(t *testing.T) {
580580
require.NoError(t, err)
581581
require.True(t, stream2.IsConnected())
582582

583-
// Wait for replay to complete
583+
// Wait for replay to complete - this ensures the connection is fully established
584584
<-replayDone
585585
require.Equal(t, testData, replayBuf, "should receive replayed data")
586586

@@ -649,10 +649,10 @@ func TestStream_ReconnectionScenarios(t *testing.T) {
649649
_ = fromServerWrite.Close()
650650

651651
// Wait until the stream is marked disconnected
652-
deadline := time.Now().Add(2 * time.Second)
653-
for stream3.IsConnected() && time.Now().Before(deadline) {
654-
time.Sleep(10 * time.Millisecond)
655-
}
652+
ctx := testutil.Context(t, testutil.WaitShort)
653+
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
654+
return !stream3.IsConnected()
655+
}, testutil.IntervalFast)
656656
require.False(t, stream3.IsConnected())
657657
}
658658
})
@@ -771,10 +771,9 @@ func TestStream_SequenceNumberReconnection_WithSequenceNumbers(t *testing.T) {
771771
// Simulate disconnection and wait for detection
772772
_ = clientConn1.Close()
773773
_ = serverConn1.Close()
774-
deadline1 := time.Now().Add(2 * time.Second)
775-
for stream.IsConnected() && time.Now().Before(deadline1) {
776-
time.Sleep(10 * time.Millisecond)
777-
}
774+
require.Eventually(t, func() bool {
775+
return !stream.IsConnected()
776+
}, testutil.WaitShort, testutil.IntervalFast)
778777
require.False(t, stream.IsConnected())
779778

780779
// Client reconnects with its sequence numbers
@@ -872,10 +871,7 @@ func TestStream_SequenceNumberReconnection_WithDataLoss(t *testing.T) {
872871
require.NoError(t, err)
873872
require.True(t, stream.IsConnected())
874873

875-
// Wait a bit for the connection to be fully established
876-
time.Sleep(100 * time.Millisecond)
877-
878-
// Send some data
874+
// Send some data - this will verify the connection is fully established
879875
testData1 := []byte("first message")
880876
_, err = serverConn1.Write(testData1)
881877
require.NoError(t, err)
@@ -892,10 +888,9 @@ func TestStream_SequenceNumberReconnection_WithDataLoss(t *testing.T) {
892888
// Simulate disconnection and wait for detection
893889
_ = clientConn1.Close()
894890
_ = serverConn1.Close()
895-
deadline2 := time.Now().Add(2 * time.Second)
896-
for stream.IsConnected() && time.Now().Before(deadline2) {
897-
time.Sleep(10 * time.Millisecond)
898-
}
891+
require.Eventually(t, func() bool {
892+
return !stream.IsConnected()
893+
}, testutil.WaitShort, testutil.IntervalFast)
899894
require.False(t, stream.IsConnected())
900895

901896
// Client reconnects with its sequence numbers

0 commit comments

Comments
 (0)