Skip to content

Commit f1ee27a

Browse files
committed
WIP
1 parent 2a33f47 commit f1ee27a

File tree

2 files changed

+118
-100
lines changed

2 files changed

+118
-100
lines changed

agent/immortalstreams/manager.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ import (
1919
const (
2020
// MaxStreams is the maximum number of immortal streams allowed per agent
2121
MaxStreams = 32
22-
// BufferSize is the size of the ring buffer for each stream (64 MiB)
23-
BufferSize = 64 * 1024 * 1024
2422
)
2523

2624
// Manager manages immortal streams for an agent

agent/immortalstreams/stream.go

Lines changed: 118 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ type Stream struct {
5252

5353
// Shutdown signal
5454
shutdownChan chan struct{}
55+
56+
// Context cancellation for BackedPipe
57+
cancel context.CancelFunc
5558
}
5659

5760
// reconnectRequest represents a pending reconnection request
@@ -67,8 +70,73 @@ type reconnectResponse struct {
6770
err error
6871
}
6972

73+
// streamReconnector implements backedpipe.Reconnector interface for Stream
74+
type streamReconnector struct {
75+
s *Stream
76+
}
77+
78+
// Reconnect implements the backedpipe.Reconnector interface
79+
func (r *streamReconnector) Reconnect(ctx context.Context, writerSeqNum uint64) (io.ReadWriteCloser, uint64, error) {
80+
r.s.mu.Lock()
81+
82+
// If there's already a pending reconnect, this is a concurrent call.
83+
// We should return an error to let the BackedPipe retry later.
84+
if r.s.pendingReconnect != nil {
85+
r.s.mu.Unlock()
86+
return nil, 0, xerrors.New("reconnection already in progress")
87+
}
88+
89+
// Fast path: if the stream is already shutting down, abort immediately
90+
if r.s.closed {
91+
r.s.mu.Unlock()
92+
return nil, 0, xerrors.New("stream is shutting down")
93+
}
94+
95+
// Wait for HandleReconnect to be called with a new connection
96+
responseChan := make(chan reconnectResponse, 1)
97+
r.s.pendingReconnect = &reconnectRequest{
98+
writerSeqNum: writerSeqNum,
99+
response: responseChan,
100+
}
101+
r.s.handshakePending = true
102+
// Mark disconnected if we previously had a client connection
103+
if r.s.connected {
104+
r.s.connected = false
105+
r.s.lastDisconnectionAt = time.Now()
106+
}
107+
r.s.logger.Info(context.Background(), "pending reconnect set",
108+
slog.F("writer_seq", writerSeqNum))
109+
// Signal waiters a reconnect request is pending
110+
r.s.reconnectCond.Broadcast()
111+
r.s.mu.Unlock()
112+
113+
// Wait for response from HandleReconnect or context cancellation
114+
r.s.logger.Info(context.Background(), "reconnect function waiting for response")
115+
select {
116+
case resp := <-responseChan:
117+
r.s.logger.Info(context.Background(), "reconnect function got response",
118+
slog.F("has_conn", resp.conn != nil),
119+
slog.F("read_seq", resp.readSeq),
120+
slog.Error(resp.err))
121+
return resp.conn, resp.readSeq, resp.err
122+
case <-ctx.Done():
123+
// Context was canceled, return error immediately
124+
// The stream's Close() method will handle cleanup
125+
r.s.logger.Info(context.Background(), "reconnect function context canceled", slog.Error(ctx.Err()))
126+
return nil, 0, ctx.Err()
127+
case <-r.s.shutdownChan:
128+
// Stream is being shut down, return error immediately
129+
// The stream's Close() method will handle cleanup
130+
r.s.logger.Info(context.Background(), "reconnect function shutdown signal received")
131+
return nil, 0, xerrors.New("stream is shutting down")
132+
}
133+
}
134+
70135
// NewStream creates a new immortal stream
71136
func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream {
137+
// Create a context that will be canceled when the stream is closed
138+
ctx, cancel := context.WithCancel(context.Background())
139+
72140
stream := &Stream{
73141
id: id,
74142
name: name,
@@ -78,72 +146,13 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
78146
disconnectChan: make(chan struct{}, 1),
79147
shutdownChan: make(chan struct{}),
80148
reconnectReq: make(chan struct{}, 1),
149+
cancel: cancel, // Store cancel function for cleanup
81150
}
82151
stream.reconnectCond = sync.NewCond(&stream.mu)
83152

84-
// Create a reconnect function that waits for a client connection
85-
reconnectFn := func(ctx context.Context, writerSeqNum uint64) (io.ReadWriteCloser, uint64, error) {
86-
// Wait for HandleReconnect to be called with a new connection
87-
responseChan := make(chan reconnectResponse, 1)
88-
89-
stream.mu.Lock()
90-
stream.pendingReconnect = &reconnectRequest{
91-
writerSeqNum: writerSeqNum,
92-
response: responseChan,
93-
}
94-
stream.handshakePending = true
95-
// Mark disconnected if we previously had a client connection
96-
if stream.connected {
97-
stream.connected = false
98-
stream.lastDisconnectionAt = time.Now()
99-
}
100-
stream.logger.Info(context.Background(), "pending reconnect set",
101-
slog.F("writer_seq", writerSeqNum))
102-
// Signal waiters a reconnect request is pending
103-
stream.reconnectCond.Broadcast()
104-
stream.mu.Unlock()
105-
106-
// Fast path: if the stream is already shutting down, abort immediately
107-
select {
108-
case <-stream.shutdownChan:
109-
stream.mu.Lock()
110-
// Clear the pending request since we're aborting
111-
if stream.pendingReconnect != nil {
112-
stream.pendingReconnect = nil
113-
}
114-
stream.mu.Unlock()
115-
return nil, 0, xerrors.New("stream is shutting down")
116-
default:
117-
}
118-
119-
// Wait for response from HandleReconnect or context cancellation
120-
stream.logger.Info(context.Background(), "reconnect function waiting for response")
121-
select {
122-
case resp := <-responseChan:
123-
stream.logger.Info(context.Background(), "reconnect function got response",
124-
slog.F("has_conn", resp.conn != nil),
125-
slog.F("read_seq", resp.readSeq),
126-
slog.Error(resp.err))
127-
return resp.conn, resp.readSeq, resp.err
128-
case <-ctx.Done():
129-
// Context was canceled, clear pending request and return error
130-
stream.mu.Lock()
131-
stream.pendingReconnect = nil
132-
stream.handshakePending = false
133-
stream.mu.Unlock()
134-
return nil, 0, ctx.Err()
135-
case <-stream.shutdownChan:
136-
// Stream is being shut down, clear pending request and return error
137-
stream.mu.Lock()
138-
stream.pendingReconnect = nil
139-
stream.handshakePending = false
140-
stream.mu.Unlock()
141-
return nil, 0, xerrors.New("stream is shutting down")
142-
}
143-
}
144-
145-
// Create BackedPipe with background context
146-
stream.pipe = backedpipe.NewBackedPipe(context.Background(), reconnectFn)
153+
// Create BackedPipe with streamReconnector
154+
reconnector := &streamReconnector{s: stream}
155+
stream.pipe = backedpipe.NewBackedPipe(ctx, reconnector)
147156

148157
// Start reconnect worker: dedupe pokes and call ForceReconnect when safe.
149158
go func() {
@@ -152,7 +161,7 @@ func NewStream(id uuid.UUID, name string, port int, logger slog.Logger) *Stream
152161
case <-stream.shutdownChan:
153162
return
154163
case <-stream.reconnectReq:
155-
// Drain extra pokes to coalesce
164+
// Drain extra pokes
156165
for {
157166
select {
158167
case <-stream.reconnectReq:
@@ -240,22 +249,10 @@ func (s *Stream) HandleReconnect(clientConn io.ReadWriteCloser, readSeqNum uint6
240249
s.mu.Unlock()
241250
respCh <- reconnectResponse{conn: clientConn, readSeq: readSeqNum, err: nil}
242251

243-
// Wait until the pipe reports a connected state so the handshake fully completes.
244-
// Use a bounded timeout to avoid hanging forever in pathological cases.
245-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
246-
err := s.pipe.WaitForConnection(ctx)
247-
cancel()
248-
if err != nil {
249-
s.mu.Lock()
250-
s.connected = false
251-
if s.reconnectCond != nil {
252-
s.reconnectCond.Broadcast()
253-
}
254-
s.mu.Unlock()
255-
s.logger.Warn(context.Background(), "failed to connect backed pipe", slog.Error(err))
256-
return xerrors.Errorf("failed to establish connection: %w", err)
257-
}
258-
252+
// The connection has been provided to the BackedPipe via the response channel.
253+
// The BackedPipe will establish the connection, and since we control the
254+
// reconnection process, we know it will succeed (or the Reconnect method
255+
// would have returned an error).
259256
s.mu.Lock()
260257
s.lastConnectionAt = time.Now()
261258
s.connected = true
@@ -307,6 +304,11 @@ func (s *Stream) Close() error {
307304
s.closed = true
308305
s.connected = false
309306

307+
// Cancel the context to interrupt any pending BackedPipe operations
308+
if s.cancel != nil {
309+
s.cancel()
310+
}
311+
310312
// Signal shutdown to any pending reconnect attempts and listeners
311313
// Closing the channel wakes all waiters exactly once
312314
select {
@@ -424,15 +426,44 @@ func (s *Stream) startCopyingLocked() {
424426
// The BackedPipe will block when no client is connected
425427
buf := make([]byte, 32*1024)
426428
for {
429+
// Check if we should shut down before attempting to read
430+
select {
431+
case <-s.shutdownChan:
432+
s.logger.Debug(context.Background(), "shutdown signal received, exiting copy goroutine")
433+
return
434+
default:
435+
}
436+
427437
// Use a buffer for copying
428438
n, err := s.pipe.Read(buf)
429-
// Log significant events
430-
if errors.Is(err, io.EOF) {
431-
s.logger.Debug(context.Background(), "got EOF from pipe")
432-
s.SignalDisconnect()
433-
} else if err != nil && !errors.Is(err, io.ErrClosedPipe) {
434-
s.logger.Debug(context.Background(), "error reading from pipe", slog.Error(err))
435-
s.SignalDisconnect()
439+
440+
if err != nil {
441+
// Check for fatal errors that should terminate the goroutine
442+
if xerrors.Is(err, io.ErrClosedPipe) {
443+
// The pipe itself is closed, we're done
444+
s.logger.Debug(context.Background(), "pipe closed, exiting copy goroutine")
445+
s.SignalDisconnect()
446+
return
447+
}
448+
449+
// Check for BackedPipe specific errors
450+
if xerrors.Is(err, backedpipe.ErrPipeClosed) {
451+
s.logger.Debug(context.Background(), "backed pipe closed, exiting copy goroutine")
452+
s.SignalDisconnect()
453+
return
454+
}
455+
456+
// Log other errors but continue
457+
if errors.Is(err, io.EOF) {
458+
s.logger.Debug(context.Background(), "got EOF from pipe")
459+
s.SignalDisconnect()
460+
} else {
461+
s.logger.Debug(context.Background(), "error reading from pipe", slog.Error(err))
462+
s.SignalDisconnect()
463+
}
464+
465+
// For non-fatal errors, continue the loop
466+
continue
436467
}
437468

438469
if n > 0 {
@@ -445,17 +476,6 @@ func (s *Stream) startCopyingLocked() {
445476
return
446477
}
447478
}
448-
449-
if err != nil {
450-
// Check if this is a fatal error
451-
if xerrors.Is(err, io.ErrClosedPipe) {
452-
// The pipe itself is closed, we're done
453-
s.logger.Debug(context.Background(), "pipe closed, exiting copy goroutine")
454-
s.SignalDisconnect()
455-
return
456-
}
457-
// Any other error (including EOF) is handled by BackedPipe; continue
458-
}
459479
}
460480
}()
461481

0 commit comments

Comments
 (0)