Skip to content

[tmpnet] Enable runtime-specific restart behavior #3882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 55 additions & 54 deletions tests/fixture/tmpnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ func RestartNetwork(ctx context.Context, log logging.Logger, dir string) error {
return network.Restart(ctx)
}

// Restart the provided nodes. Blocks on the nodes accepting API requests but not their health.
func restartNodes(ctx context.Context, nodes ...*Node) error {
for _, node := range nodes {
if err := node.Restart(ctx); err != nil {
return fmt.Errorf("failed to restart node %s: %w", node.NodeID, err)
}
}
return nil
}

// Reads a network from the provided directory.
func ReadNetwork(ctx context.Context, log logging.Logger, dir string) (*Network, error) {
canonicalDir, err := toCanonicalDir(dir)
Expand Down Expand Up @@ -441,26 +451,20 @@ func (n *Network) Bootstrap(ctx context.Context, log logging.Logger) error {
bootstrapNode.Flags[config.SybilProtectionEnabledKey] = *existingSybilProtectionValue
}

// Ensure the bootstrap node is restarted to pick up subnet and chain configuration
//
// TODO(marun) This restart might be unnecessary if:
// - sybil protection didn't change
// - the node is not a subnet validator
log.Info("restarting bootstrap node",
zap.Stringer("nodeID", bootstrapNode.NodeID),
)

if len(n.Nodes) == 1 {
// Ensure the node is restarted to pick up subnet and chain configuration
return n.RestartNode(ctx, bootstrapNode)
if err := bootstrapNode.Restart(ctx); err != nil {
return err
}

// TODO(marun) This last restart of the bootstrap node might be unnecessary if:
// - sybil protection didn't change
// - the node is not a subnet validator

// Ensure the bootstrap node is restarted to pick up configuration changes. Avoid using
// RestartNode since the node won't be able to report healthy until other nodes are started.
if err := bootstrapNode.Stop(ctx); err != nil {
return fmt.Errorf("failed to stop node %s: %w", bootstrapNode.NodeID, err)
}
if err := n.StartNode(ctx, bootstrapNode); err != nil {
return fmt.Errorf("failed to start node %s: %w", bootstrapNode.NodeID, err)
if len(n.Nodes) == 1 {
return nil
}

log.Info("starting remaining nodes")
Expand All @@ -486,31 +490,6 @@ func (n *Network) StartNode(ctx context.Context, node *Node) error {
return nil
}

// Restart a single node.
func (n *Network) RestartNode(ctx context.Context, node *Node) error {
runtimeConfig := node.getRuntimeConfig()
if runtimeConfig.Process != nil && runtimeConfig.Process.ReuseDynamicPorts {
// Attempt to save the API port currently being used so the
// restarted node can reuse it. This may result in the node
// failing to start if the operating system allocates the port
// to a different process between node stop and start.
if err := node.SaveAPIPort(); err != nil {
return err
}
}

if err := node.Stop(ctx); err != nil {
return fmt.Errorf("failed to stop node %s: %w", node.NodeID, err)
}
if err := n.StartNode(ctx, node); err != nil {
return fmt.Errorf("failed to start node %s: %w", node.NodeID, err)
}
n.log.Info("waiting for node to report healthy",
zap.Stringer("nodeID", node.NodeID),
)
return node.WaitForHealthy(ctx)
}

// Stops all nodes in the network.
func (n *Network) Stop(ctx context.Context) error {
// Ensure the node state is up-to-date
Expand Down Expand Up @@ -540,11 +519,22 @@ func (n *Network) Stop(ctx context.Context) error {
return nil
}

// Restarts all nodes in the network.
// Restarts all non-ephemeral nodes in the network.
func (n *Network) Restart(ctx context.Context) error {
n.log.Info("restarting network")
for _, node := range n.Nodes {
if err := n.RestartNode(ctx, node); err != nil {
if err := restartNodes(ctx, n.Nodes...); err != nil {
return err
}
return WaitForHealthyNodes(ctx, n.log, n.Nodes...)
}

// Waits for the provided nodes to become healthy.
func WaitForHealthyNodes(ctx context.Context, log logging.Logger, nodes ...*Node) error {
for _, node := range nodes {
log.Info("waiting for node to become healthy",
zap.Stringer("nodeID", node.NodeID),
)
if err := node.WaitForHealthy(ctx); err != nil {
return err
}
}
Expand Down Expand Up @@ -669,15 +659,20 @@ func (n *Network) CreateSubnets(ctx context.Context, log logging.Logger, apiURI
if restartRequired {
log.Info("restarting node(s) to enable them to track the new subnet(s)")

runningNodes := make([]*Node, 0, len(reconfiguredNodes))
for _, node := range reconfiguredNodes {
if len(node.URI) == 0 {
// Only running nodes should be restarted
continue
}
if err := n.RestartNode(ctx, node); err != nil {
return err
if len(node.URI) > 0 {
runningNodes = append(runningNodes, node)
}
}

if err := restartNodes(ctx, runningNodes...); err != nil {
return err
}

if err := WaitForHealthyNodes(ctx, n.log, runningNodes...); err != nil {
return err
}
}

// Add validators for the subnet
Expand Down Expand Up @@ -738,15 +733,21 @@ func (n *Network) CreateSubnets(ctx context.Context, log logging.Logger, apiURI
log.Info("restarting node(s) to pick up chain configuration")

// Restart nodes to allow configuration for the new chains to take effect
nodesToRestart := make([]*Node, 0, len(n.Nodes))
for _, node := range n.Nodes {
if !validatorsToRestart.Contains(node.NodeID) {
continue
}
if err := n.RestartNode(ctx, node); err != nil {
return err
if validatorsToRestart.Contains(node.NodeID) {
nodesToRestart = append(nodesToRestart, node)
}
}

if err := restartNodes(ctx, nodesToRestart...); err != nil {
return err
}

if err := WaitForHealthyNodes(ctx, log, nodesToRestart...); err != nil {
return err
}

return nil
}

Expand Down
22 changes: 5 additions & 17 deletions tests/fixture/tmpnet/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"io"
"maps"
"net"
"net/http"
"net/netip"
"os"
Expand Down Expand Up @@ -58,6 +57,7 @@ type NodeRuntime interface {
Start(ctx context.Context) error
InitiateStop(ctx context.Context) error
WaitForStopped(ctx context.Context) error
Restart(ctx context.Context) error
IsHealthy(ctx context.Context) (bool, error)
}

Expand Down Expand Up @@ -165,6 +165,10 @@ func (n *Node) WaitForStopped(ctx context.Context) error {
return n.getRuntime().WaitForStopped(ctx)
}

func (n *Node) Restart(ctx context.Context) error {
return n.getRuntime().Restart(ctx)
}

func (n *Node) readState(ctx context.Context) error {
return n.getRuntime().readState(ctx)
}
Expand Down Expand Up @@ -383,22 +387,6 @@ func (n *Node) composeFlags() (FlagsMap, error) {
return flags, nil
}

// Saves the currently allocated API port to the node's configuration
// for use across restarts.
func (n *Node) SaveAPIPort() error {
hostPort := strings.TrimPrefix(n.URI, "http://")
if len(hostPort) == 0 {
// Without an API URI there is nothing to save
return nil
}
_, port, err := net.SplitHostPort(hostPort)
if err != nil {
return err
}
n.Flags[config.HTTPPortKey] = port
return nil
}

// WaitForHealthy blocks until node health is true or an error (including context timeout) is observed.
func (n *Node) WaitForHealthy(ctx context.Context) error {
if _, ok := ctx.Deadline(); !ok {
Expand Down
37 changes: 37 additions & 0 deletions tests/fixture/tmpnet/process_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"io/fs"
"maps"
"net"
"net/netip"
"os"
"os/exec"
Expand Down Expand Up @@ -235,6 +236,26 @@ func (p *ProcessRuntime) WaitForStopped(ctx context.Context) error {
}
}

// Restarts the node
func (p *ProcessRuntime) Restart(ctx context.Context) error {
if p.getRuntimeConfig().ReuseDynamicPorts {
// Attempt to save the API port currently being used so the
// restarted node can reuse it. This may result in the node
// failing to start if the operating system allocates the port
// to a different process between node stop and start.
if err := p.saveAPIPort(); err != nil {
return err
}
}
if err := p.node.Stop(ctx); err != nil {
return fmt.Errorf("failed to stop node %s: %w", p.node.NodeID, err)
}
if err := p.Start(ctx); err != nil {
return fmt.Errorf("failed to start node %s: %w", p.node.NodeID, err)
}
return nil
}

func (p *ProcessRuntime) IsHealthy(ctx context.Context) (bool, error) {
// Check that the node process is running as a precondition for
// checking health. getProcess will also ensure that the node's
Expand Down Expand Up @@ -404,6 +425,22 @@ func (p *ProcessRuntime) GetLocalStakingAddress(_ context.Context) (netip.AddrPo
return p.node.StakingAddress, func() {}, nil
}

// Saves the currently allocated API port to the node's configuration
// for use across restarts.
func (p *ProcessRuntime) saveAPIPort() error {
hostPort := strings.TrimPrefix(p.node.URI, "http://")
if len(hostPort) == 0 {
// Without an API URI there is nothing to save
return nil
}
_, port, err := net.SplitHostPort(hostPort)
if err != nil {
return err
}
p.node.Flags[config.HTTPPortKey] = port
return nil
}

// watchLogFileForFatal waits for the specified file path to exist and then checks each of
// its lines for the string 'FATAL' until such a line is observed or the provided context
// is canceled. If line containing 'FATAL' is encountered, it will be provided as an error
Expand Down