diff --git a/cmd/containerd-shim-lcow-v2/manager.go b/cmd/containerd-shim-lcow-v2/manager.go index d17397a8da..abf9b010ed 100644 --- a/cmd/containerd-shim-lcow-v2/manager.go +++ b/cmd/containerd-shim-lcow-v2/manager.go @@ -183,8 +183,8 @@ func (m *shimManager) Start(ctx context.Context, id string, opts shim.StartOpts) // It reads and logs any panic messages written to panic.log, then tries to // terminate the associated HCS compute system and waits up to 30 seconds for // it to exit. -func (m *shimManager) Stop(_ context.Context, id string) (resp shim.StopStatus, err error) { - ctx, span := oc.StartSpan(context.Background(), "delete") +func (m *shimManager) Stop(ctx context.Context, id string) (resp shim.StopStatus, err error) { + ctx, span := oc.StartSpan(ctx, "delete") defer span.End() defer func() { oc.SetSpanStatus(span, err) }() diff --git a/internal/controller/vm/vm.go b/internal/controller/vm/vm.go index 68ac56dcb4..518e2d00c1 100644 --- a/internal/controller/vm/vm.go +++ b/internal/controller/vm/vm.go @@ -230,7 +230,7 @@ func (c *Controller) StartVM(ctx context.Context, opts *StartOptions) (err error // VM is started, entropy is seeded and log channel is up. Accept the // GCS dial on the prepared listener and run the GCS protocol handshake. - err = c.guest.CreateConnection(ctx, opts.ConfigOptions...) + err = c.guest.CreateConnection(ctx, true, opts.ConfigOptions...) if err != nil { return fmt.Errorf("failed to create guest connection: %w", err) } diff --git a/internal/gcs/bridge.go b/internal/gcs/bridge.go index 5b611f262e..13febd2da5 100644 --- a/internal/gcs/bridge.go +++ b/internal/gcs/bridge.go @@ -13,6 +13,7 @@ import ( "io" "net" "sync" + "sync/atomic" "time" "github.com/sirupsen/logrus" @@ -48,16 +49,18 @@ type bridge struct { // Timeout is the time a synchronous RPC must respond within. Timeout time.Duration - mu sync.Mutex - nextID int64 - rpcs map[int64]*rpc - conn io.ReadWriteCloser - rpcCh chan *rpc - notify notifyFunc - closed bool - log *logrus.Entry - brdgErr error - waitCh chan struct{} + mu sync.Mutex + nextID int64 + rpcs map[int64]*rpc + conn io.ReadWriteCloser + rpcCh chan *rpc + notify notifyFunc + closed bool + log *logrus.Entry + brdgErr error + waitCh chan struct{} + migrating atomic.Bool + resumeCh chan struct{} } var ErrBridgeClosed = fmt.Errorf("bridge closed: %w", net.ErrClosed) @@ -74,13 +77,14 @@ type notifyFunc func(*prot.ContainerNotification) error // traces using `log`. func newBridge(conn io.ReadWriteCloser, notify notifyFunc, log *logrus.Entry) *bridge { return &bridge{ - conn: conn, - rpcs: make(map[int64]*rpc), - rpcCh: make(chan *rpc), - waitCh: make(chan struct{}), - notify: notify, - log: log, - Timeout: bridgeFailureTimeout, + conn: conn, + rpcs: make(map[int64]*rpc), + rpcCh: make(chan *rpc), + waitCh: make(chan struct{}), + resumeCh: make(chan struct{}, 1), + notify: notify, + log: log, + Timeout: bridgeFailureTimeout, } } @@ -129,6 +133,37 @@ func (brdg *bridge) Wait() error { return brdg.brdgErr } +// SetMigrating toggles tolerance of transport-level failures around a +// live-migration blackout. Explicit [bridge.Close] and the RPC timeout +// kill still tear the bridge down. +func (brdg *bridge) SetMigrating(migrating bool) { + brdg.migrating.Store(migrating) +} + +// ResumeOnConn swaps the bridge transport onto conn and wakes the recv +// loop without dropping outstanding RPCs. +func (brdg *bridge) ResumeOnConn(newConn io.ReadWriteCloser) error { + brdg.mu.Lock() + defer brdg.mu.Unlock() + + if brdg.closed { + return ErrBridgeClosed + } + + if brdg.conn != nil { + // Force any in-progress recvLoop off the stale conn so it can restart on the new one. + _ = brdg.conn.Close() + } + + brdg.conn = newConn + + select { + case brdg.resumeCh <- struct{}{}: + default: + } + return nil +} + // AsyncRPC sends an RPC request to the guest but does not wait for a response. // If the message cannot be sent before the context is done, then an error is // returned. @@ -239,7 +274,23 @@ func (brdg *bridge) RPC(ctx context.Context, proc prot.RPCProc, req requestMessa } func (brdg *bridge) recvLoopRoutine() { - brdg.kill(brdg.recvLoop()) + for { + err := brdg.recvLoop() + + if !brdg.migrating.Load() { + brdg.kill(err) + break + } + // Park until [bridge.ResumeOnConn] swaps the conn or [bridge.Close] fires. + brdg.log.WithError(err).Info("bridge transport down during migration; awaiting resume or close") + select { + case <-brdg.resumeCh: + continue + case <-brdg.waitCh: + } + break + } + // Fail any remaining RPCs. brdg.mu.Lock() rpcs := brdg.rpcs @@ -365,6 +416,11 @@ func (brdg *bridge) sendLoop() { case call := <-brdg.rpcCh: err := brdg.sendRPC(&buf, enc, call) if err != nil { + if brdg.migrating.Load() { + // Blackout drop: sendRPC already failed this call; hold the bridge open. + brdg.log.WithError(err).Debug("bridge send failed during migration; bridge held open") + continue + } brdg.kill(err) return } diff --git a/internal/gcs/guestconnection.go b/internal/gcs/guestconnection.go index 35e6709d15..f5c94486d3 100644 --- a/internal/gcs/guestconnection.go +++ b/internal/gcs/guestconnection.go @@ -233,6 +233,24 @@ func (gc *GuestConnection) Close() error { return gc.brdg.Close() } +// SetMigrating forwards to [bridge.SetMigrating]. No-op if uninitialized. +func (gc *GuestConnection) SetMigrating(migrating bool) { + if gc.brdg == nil { + return + } + + gc.brdg.SetMigrating(migrating) +} + +// ResumeOnConn resumes the bridge after swaping the bridge +// transport without dropping outstanding RPCs. +func (gc *GuestConnection) ResumeOnConn(conn io.ReadWriteCloser) error { + if gc.brdg == nil { + return ErrBridgeClosed + } + return gc.brdg.ResumeOnConn(conn) +} + // CreateProcess creates a process in the container host. func (gc *GuestConnection) CreateProcess(ctx context.Context, settings interface{}) (_ cow.Process, err error) { ctx, span := oc.StartSpan(ctx, "gcs::GuestConnection::CreateProcess", oc.WithClientSpanKind) diff --git a/internal/logfields/fields.go b/internal/logfields/fields.go index f36417d7fd..445202554d 100644 --- a/internal/logfields/fields.go +++ b/internal/logfields/fields.go @@ -105,6 +105,11 @@ const ( VMShimOperation = "vmshim-op" + // migration + + SessionID = "session-id" + Action = "action" + // logging and tracing TraceID = "traceID" diff --git a/internal/vm/guestmanager/doc.go b/internal/vm/guestmanager/doc.go index c901a77248..f24dc68f73 100644 --- a/internal/vm/guestmanager/doc.go +++ b/internal/vm/guestmanager/doc.go @@ -18,7 +18,7 @@ GCS connection: if err != nil { // handle error } if err := g.PrepareConnection(gcsServiceID); err != nil { // handle error } // (start the UVM here) - if err := g.CreateConnection(ctx); err != nil { // handle error } + if err := g.CreateConnection(ctx, true); err != nil { // handle error } After the connection is established, use the manager interfaces for guest-side changes: diff --git a/internal/vm/guestmanager/guest.go b/internal/vm/guestmanager/guest.go index a95e5b9265..2c08aeb53d 100644 --- a/internal/vm/guestmanager/guest.go +++ b/internal/vm/guestmanager/guest.go @@ -100,8 +100,9 @@ func (gm *Guest) PrepareConnection(GCSServiceID guid.GUID) error { // CreateConnection accepts the GCS dial on the prepared listener and runs // the GCS protocol handshake. Must be called after VM start. Idempotent if -// a connection already exists. -func (gm *Guest) CreateConnection(ctx context.Context, opts ...ConfigOption) error { +// a connection already exists. Pass coldStart=true for a fresh boot and +// coldStart=false for a live-migration destination (guest already running). +func (gm *Guest) CreateConnection(ctx context.Context, coldStart bool, opts ...ConfigOption) error { gm.mu.Lock() defer gm.mu.Unlock() @@ -136,7 +137,7 @@ func (gm *Guest) CreateConnection(ctx context.Context, opts ...ConfigOption) err } // Start the GCS protocol - gm.gc, err = gcc.Connect(ctx, true) + gm.gc, err = gcc.Connect(ctx, coldStart) if err != nil { return fmt.Errorf("failed to connect to GCS: %w", err) } @@ -161,3 +162,39 @@ func (gm *Guest) CloseConnection() error { } return err } + +// SetMigrating marks the guest as migrating so transient connection errors are +// tolerated during the migration window. Callers must clear the flag on finalize. +func (gm *Guest) SetMigrating(migrating bool) { + gm.mu.RLock() + defer gm.mu.RUnlock() + + if gm.gc == nil { + return + } + + gm.gc.SetMigrating(migrating) +} + +// ResumeConnection accepts a fresh hvsock on the prepared listener and +// swaps it into the existing GCS bridge, preserving in-flight RPCs. +func (gm *Guest) ResumeConnection(ctx context.Context) error { + gm.mu.Lock() + defer gm.mu.Unlock() + + if gm.gc == nil { + return fmt.Errorf("ResumeConnection called without active connection") + } + if gm.gcListener == nil { + return fmt.Errorf("ResumeConnection called before PrepareConnection") + } + + l := gm.gcListener + gm.gcListener = nil + + conn, err := vmmanager.AcceptConnection(ctx, gm.uvm, l, true) + if err != nil { + return fmt.Errorf("failed to accept resumed guest connection: %w", err) + } + return gm.gc.ResumeOnConn(conn) +} diff --git a/internal/vm/guestmanager/manager.go b/internal/vm/guestmanager/manager.go index 573a641399..c2052094be 100644 --- a/internal/vm/guestmanager/manager.go +++ b/internal/vm/guestmanager/manager.go @@ -10,11 +10,15 @@ import ( "github.com/Microsoft/hcsshim/internal/gcs" ) -// Capabilities returns the capabilities of the guest connection. +// Capabilities returns the capabilities of the guest connection, or nil +// if no GCS connection is active. func (gm *Guest) Capabilities() gcs.GuestDefinedCapabilities { gm.mu.RLock() defer gm.mu.RUnlock() + if gm.gc == nil { + return nil + } return gm.gc.Capabilities() } diff --git a/internal/vm/guestmanager/request_helpers.go b/internal/vm/guestmanager/request_helpers.go index a8876b27d8..f6a6b9dcbe 100644 --- a/internal/vm/guestmanager/request_helpers.go +++ b/internal/vm/guestmanager/request_helpers.go @@ -7,7 +7,9 @@ import ( "errors" ) -var errGuestConnectionUnavailable = errors.New("guest connection not initialized") +// ErrGuestConnectionUnavailable is returned when the guest connection is nil +// (not yet established, or already closed by [Guest.CloseConnection]). +var ErrGuestConnectionUnavailable = errors.New("guest connection not initialized") // modify sends a guest modification request via the guest connection. // This is a helper method to avoid having to check for a nil guest connection in every method that needs to send a request. @@ -16,7 +18,7 @@ func (gm *Guest) modify(ctx context.Context, req interface{}) error { defer gm.mu.Unlock() if gm.gc == nil { - return errGuestConnectionUnavailable + return ErrGuestConnectionUnavailable } return gm.gc.Modify(ctx, req) }