Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/containerd-shim-lcow-v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ func (m *shimManager) Start(ctx context.Context, id string, opts shim.StartOpts)
// It reads and logs any panic messages written to panic.log, then tries to
// terminate the associated HCS compute system and waits up to 30 seconds for
// it to exit.
func (m *shimManager) Stop(_ context.Context, id string) (resp shim.StopStatus, err error) {
ctx, span := oc.StartSpan(context.Background(), "delete")
func (m *shimManager) Stop(ctx context.Context, id string) (resp shim.StopStatus, err error) {
ctx, span := oc.StartSpan(ctx, "delete")
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()

Expand Down
2 changes: 1 addition & 1 deletion internal/controller/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (c *Controller) StartVM(ctx context.Context, opts *StartOptions) (err error

// VM is started, entropy is seeded and log channel is up. Accept the
// GCS dial on the prepared listener and run the GCS protocol handshake.
err = c.guest.CreateConnection(ctx, opts.ConfigOptions...)
err = c.guest.CreateConnection(ctx, true, opts.ConfigOptions...)
if err != nil {
return fmt.Errorf("failed to create guest connection: %w", err)
}
Expand Down
92 changes: 74 additions & 18 deletions internal/gcs/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -48,16 +49,18 @@ type bridge struct {
// Timeout is the time a synchronous RPC must respond within.
Timeout time.Duration

mu sync.Mutex
nextID int64
rpcs map[int64]*rpc
conn io.ReadWriteCloser
rpcCh chan *rpc
notify notifyFunc
closed bool
log *logrus.Entry
brdgErr error
waitCh chan struct{}
mu sync.Mutex
nextID int64
rpcs map[int64]*rpc
conn io.ReadWriteCloser
rpcCh chan *rpc
notify notifyFunc
closed bool
log *logrus.Entry
brdgErr error
waitCh chan struct{}
migrating atomic.Bool
resumeCh chan struct{}
}

var ErrBridgeClosed = fmt.Errorf("bridge closed: %w", net.ErrClosed)
Expand All @@ -74,13 +77,14 @@ type notifyFunc func(*prot.ContainerNotification) error
// traces using `log`.
func newBridge(conn io.ReadWriteCloser, notify notifyFunc, log *logrus.Entry) *bridge {
return &bridge{
conn: conn,
rpcs: make(map[int64]*rpc),
rpcCh: make(chan *rpc),
waitCh: make(chan struct{}),
notify: notify,
log: log,
Timeout: bridgeFailureTimeout,
conn: conn,
rpcs: make(map[int64]*rpc),
rpcCh: make(chan *rpc),
waitCh: make(chan struct{}),
resumeCh: make(chan struct{}, 1),
notify: notify,
log: log,
Timeout: bridgeFailureTimeout,
}
}

Expand Down Expand Up @@ -129,6 +133,37 @@ func (brdg *bridge) Wait() error {
return brdg.brdgErr
}

// SetMigrating toggles tolerance of transport-level failures around a
// live-migration blackout. Explicit [bridge.Close] and the RPC timeout
// kill still tear the bridge down.
func (brdg *bridge) SetMigrating(migrating bool) {
brdg.migrating.Store(migrating)
}

// ResumeOnConn swaps the bridge transport onto conn and wakes the recv
// loop without dropping outstanding RPCs.
func (brdg *bridge) ResumeOnConn(newConn io.ReadWriteCloser) error {
brdg.mu.Lock()
defer brdg.mu.Unlock()

if brdg.closed {
return ErrBridgeClosed
}

if brdg.conn != nil {
// Force any in-progress recvLoop off the stale conn so it can restart on the new one.
_ = brdg.conn.Close()
}

brdg.conn = newConn

select {
case brdg.resumeCh <- struct{}{}:
default:
}
return nil
}

// AsyncRPC sends an RPC request to the guest but does not wait for a response.
// If the message cannot be sent before the context is done, then an error is
// returned.
Expand Down Expand Up @@ -239,7 +274,23 @@ func (brdg *bridge) RPC(ctx context.Context, proc prot.RPCProc, req requestMessa
}

func (brdg *bridge) recvLoopRoutine() {
brdg.kill(brdg.recvLoop())
for {
err := brdg.recvLoop()

if !brdg.migrating.Load() {
brdg.kill(err)
break
}
// Park until [bridge.ResumeOnConn] swaps the conn or [bridge.Close] fires.
brdg.log.WithError(err).Info("bridge transport down during migration; awaiting resume or close")
select {
case <-brdg.resumeCh:
continue
case <-brdg.waitCh:
}
break
}

// Fail any remaining RPCs.
brdg.mu.Lock()
rpcs := brdg.rpcs
Expand Down Expand Up @@ -365,6 +416,11 @@ func (brdg *bridge) sendLoop() {
case call := <-brdg.rpcCh:
err := brdg.sendRPC(&buf, enc, call)
if err != nil {
if brdg.migrating.Load() {
// Blackout drop: sendRPC already failed this call; hold the bridge open.
brdg.log.WithError(err).Debug("bridge send failed during migration; bridge held open")
continue
}
brdg.kill(err)
return
}
Expand Down
18 changes: 18 additions & 0 deletions internal/gcs/guestconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,24 @@ func (gc *GuestConnection) Close() error {
return gc.brdg.Close()
}

// SetMigrating forwards to [bridge.SetMigrating]. No-op if uninitialized.
func (gc *GuestConnection) SetMigrating(migrating bool) {
if gc.brdg == nil {
return
}

gc.brdg.SetMigrating(migrating)
}

// ResumeOnConn resumes the bridge after swaping the bridge
// transport without dropping outstanding RPCs.
func (gc *GuestConnection) ResumeOnConn(conn io.ReadWriteCloser) error {
if gc.brdg == nil {
return ErrBridgeClosed
}
return gc.brdg.ResumeOnConn(conn)
}

// CreateProcess creates a process in the container host.
func (gc *GuestConnection) CreateProcess(ctx context.Context, settings interface{}) (_ cow.Process, err error) {
ctx, span := oc.StartSpan(ctx, "gcs::GuestConnection::CreateProcess", oc.WithClientSpanKind)
Expand Down
5 changes: 5 additions & 0 deletions internal/logfields/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ const (

VMShimOperation = "vmshim-op"

// migration

SessionID = "session-id"
Action = "action"

// logging and tracing

TraceID = "traceID"
Expand Down
2 changes: 1 addition & 1 deletion internal/vm/guestmanager/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ GCS connection:
if err != nil { // handle error }
if err := g.PrepareConnection(gcsServiceID); err != nil { // handle error }
// (start the UVM here)
if err := g.CreateConnection(ctx); err != nil { // handle error }
if err := g.CreateConnection(ctx, true); err != nil { // handle error }

After the connection is established, use the manager interfaces for guest-side changes:

Expand Down
43 changes: 40 additions & 3 deletions internal/vm/guestmanager/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ func (gm *Guest) PrepareConnection(GCSServiceID guid.GUID) error {

// CreateConnection accepts the GCS dial on the prepared listener and runs
// the GCS protocol handshake. Must be called after VM start. Idempotent if
// a connection already exists.
func (gm *Guest) CreateConnection(ctx context.Context, opts ...ConfigOption) error {
// a connection already exists. Pass coldStart=true for a fresh boot and
// coldStart=false for a live-migration destination (guest already running).
func (gm *Guest) CreateConnection(ctx context.Context, coldStart bool, opts ...ConfigOption) error {
gm.mu.Lock()
defer gm.mu.Unlock()

Expand Down Expand Up @@ -136,7 +137,7 @@ func (gm *Guest) CreateConnection(ctx context.Context, opts ...ConfigOption) err
}

// Start the GCS protocol
gm.gc, err = gcc.Connect(ctx, true)
gm.gc, err = gcc.Connect(ctx, coldStart)
if err != nil {
return fmt.Errorf("failed to connect to GCS: %w", err)
}
Expand All @@ -161,3 +162,39 @@ func (gm *Guest) CloseConnection() error {
}
return err
}

// SetMigrating marks the guest as migrating so transient connection errors are
// tolerated during the migration window. Callers must clear the flag on finalize.
func (gm *Guest) SetMigrating(migrating bool) {
gm.mu.RLock()
defer gm.mu.RUnlock()

if gm.gc == nil {
return
}

gm.gc.SetMigrating(migrating)
}

// ResumeConnection accepts a fresh hvsock on the prepared listener and
// swaps it into the existing GCS bridge, preserving in-flight RPCs.
func (gm *Guest) ResumeConnection(ctx context.Context) error {
gm.mu.Lock()
defer gm.mu.Unlock()

if gm.gc == nil {
return fmt.Errorf("ResumeConnection called without active connection")
}
if gm.gcListener == nil {
return fmt.Errorf("ResumeConnection called before PrepareConnection")
}

l := gm.gcListener
gm.gcListener = nil

conn, err := vmmanager.AcceptConnection(ctx, gm.uvm, l, true)
if err != nil {
return fmt.Errorf("failed to accept resumed guest connection: %w", err)
}
return gm.gc.ResumeOnConn(conn)
}
6 changes: 5 additions & 1 deletion internal/vm/guestmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ import (
"github.com/Microsoft/hcsshim/internal/gcs"
)

// Capabilities returns the capabilities of the guest connection.
// Capabilities returns the capabilities of the guest connection, or nil
// if no GCS connection is active.
func (gm *Guest) Capabilities() gcs.GuestDefinedCapabilities {
gm.mu.RLock()
defer gm.mu.RUnlock()

if gm.gc == nil {
return nil
}
return gm.gc.Capabilities()
}

Expand Down
6 changes: 4 additions & 2 deletions internal/vm/guestmanager/request_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"errors"
)

var errGuestConnectionUnavailable = errors.New("guest connection not initialized")
// ErrGuestConnectionUnavailable is returned when the guest connection is nil
// (not yet established, or already closed by [Guest.CloseConnection]).
var ErrGuestConnectionUnavailable = errors.New("guest connection not initialized")

// modify sends a guest modification request via the guest connection.
// This is a helper method to avoid having to check for a nil guest connection in every method that needs to send a request.
Expand All @@ -16,7 +18,7 @@ func (gm *Guest) modify(ctx context.Context, req interface{}) error {
defer gm.mu.Unlock()

if gm.gc == nil {
return errGuestConnectionUnavailable
return ErrGuestConnectionUnavailable
}
return gm.gc.Modify(ctx, req)
}
Loading