Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to support "agent manager" pattern #379

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ tasks:
sources:
- "*.go"
cmds:
- go build -tags netgo,debug -ldflags '-extldflags "-static"'
- go build -tags netgo -ldflags '-extldflags "-static"'

rootfs:
dir: nex
Expand Down
92 changes: 68 additions & 24 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ type Agent struct {
ctx context.Context
sigs chan os.Signal

provider providers.ExecutionProvider
subz []*nats.Subscription
providers map[string]providers.ExecutionProvider
subz []*nats.Subscription

cacheBucket nats.ObjectStore
md *agentapi.MachineMetadata
nc *nats.Conn
started time.Time

sandboxed bool
undeploying *atomic.Bool
undeploying map[string]*atomic.Bool
}

// Initialize a new agent to facilitate communications with the host
Expand All @@ -79,16 +79,16 @@ func NewAgent(ctx context.Context, cancelF context.CancelFunc) (*Agent, error) {
}

return &Agent{
agentLogs: make(chan *agentapi.LogEntry, 64),
eventLogs: make(chan *cloudevents.Event, 64),
// sandbox defaults to true, only way to override that is with an explicit 'false'
agentLogs: make(chan *agentapi.LogEntry, 64),
eventLogs: make(chan *cloudevents.Event, 64),
cancelF: cancelF,
ctx: ctx,
sandboxed: isSandboxed(),
md: metadata,
providers: make(map[string]providers.ExecutionProvider),
sandboxed: isSandboxed(), // sandbox defaults to true, only way to override that is with an explicit 'false'
started: time.Now().UTC(),
subz: make([]*nats.Subscription, 0),
undeploying: &atomic.Bool{},
undeploying: make(map[string]*atomic.Bool),
}, nil
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func (a *Agent) requestHandshake() error {
return nil
}

return errors.New("Failed to obtain handshake from host")
return errors.New("failed to obtain handshake from host")
}

func (a *Agent) Version() string {
Expand All @@ -176,14 +176,15 @@ func (a *Agent) Version() string {
// temporary file and make it executable; this method returns the full
// path to the cached artifact if successful
func (a *Agent) cacheExecutableArtifact(req *agentapi.AgentWorkloadInfo) (*string, error) {
fileName := fmt.Sprintf("workload-%s", *a.md.VmID)
tempFile := path.Join(os.TempDir(), fileName)
// bucket := req.Location.Host
key := strings.Trim(req.Location.Path, "/")
tempFile := path.Join(os.TempDir(), key)

if strings.EqualFold(runtime.GOOS, "windows") && req.WorkloadType == controlapi.NexWorkloadNative {
tempFile = fmt.Sprintf("%s.exe", tempFile)
}

err := a.cacheBucket.GetFile(*a.md.VmID, tempFile)
err := a.cacheBucket.GetFile(key, tempFile)
if err != nil {
msg := fmt.Sprintf("Failed to get and write workload artifact to temp dir: %s", err)
a.submitLog(msg, slog.LevelError)
Expand All @@ -205,7 +206,9 @@ func (a *Agent) cacheExecutableArtifact(req *agentapi.AgentWorkloadInfo) (*strin
func (a *Agent) deleteExecutableArtifact() error {
fileName := fmt.Sprintf("workload-%s", *a.md.VmID)
tempFile := path.Join(os.TempDir(), fileName)
_ = os.Remove(tempFile)

tempFile = path.Join(os.TempDir(), "workload-*")
_ = os.Remove(tempFile)

if a.cacheBucket != nil {
Expand Down Expand Up @@ -275,6 +278,14 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
return
}

var id string

if info.FunctionID == nil {
id = *info.ID
} else {
id = *info.FunctionID
}

tmpFile, err := a.cacheExecutableArtifact(&info)
if err != nil {
_ = a.workAck(m, false, err.Error())
Expand All @@ -294,15 +305,15 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
_ = a.workAck(m, false, msg)
return
}
a.provider = provider
a.providers[id] = provider

shouldValidate := true
if !a.sandboxed && info.WorkloadType == controlapi.NexWorkloadNative {
shouldValidate = false
}

if shouldValidate {
err = a.provider.Validate()
err = a.providers[id].Validate()
if err != nil {
msg := fmt.Sprintf("Failed to validate workload: %s", err)
a.submitLog(msg, slog.LevelError)
Expand All @@ -311,7 +322,9 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
}
}

err = a.provider.Deploy()
a.undeploying[id] = new(atomic.Bool)

err = a.providers[id].Deploy()
if err != nil {
a.submitLog(fmt.Sprintf("Failed to deploy workload: %s", err), slog.LevelError)
} else {
Expand All @@ -320,20 +333,34 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
}

func (a *Agent) handleUndeploy(m *nats.Msg) {
if a.provider == nil {
if len(a.providers) == 0 {
a.submitLog("Received undeploy workload request on agent without deployed workload", slog.LevelDebug)
_ = m.Respond([]byte{})
return
}

if a.undeploying.Load() {
var id *string // if present, attempts to undeploy a single workload tenant

tokens := strings.Split(m.Subject, ".")
if len(tokens) == 4 {
id = &tokens[3]
}

var undeploying *atomic.Bool
if id == nil {
undeploying = a.undeploying[*a.md.VmID]
} else {
undeploying = a.undeploying[*id]
}

if undeploying.Load() {
a.submitLog("Received additional undeploy workload request on agent", slog.LevelWarn)
return
}

a.undeploying.Store(true)
undeploying.Store(true)

err := a.provider.Undeploy()
err := a.providers[*id].Undeploy()
if err != nil {
// don't return an error here so worst-case scenario is an ungraceful shutdown,
// not a failure
Expand All @@ -354,6 +381,7 @@ func (a *Agent) handlePing(m *nats.Msg) {
//
// - agentint.<agent_id>.deploy
// - agentint.<agent_id>.undeploy
// - agentint.<agent_id>.<workload_id>.undeploy
// - agentint.<agent_id>.ping
func (a *Agent) init() error {
a.installSignalHandlers()
Expand Down Expand Up @@ -391,6 +419,14 @@ func (a *Agent) init() error {
}
a.subz = append(a.subz, sub)

udsubject = fmt.Sprintf("agentint.%s.*.undeploy", *a.md.VmID)
sub, err = a.nc.Subscribe(udsubject, a.handleUndeploy)
if err != nil {
a.submitLog(fmt.Sprintf("Failed to subscribe to agent undeploy subject: %s", err), slog.LevelError)
return err
}
a.subz = append(a.subz, sub)

pingSubject := fmt.Sprintf("agentint.%s.ping", *a.md.VmID)
sub, err = a.nc.Subscribe(pingSubject, a.handlePing)
if err != nil {
Expand Down Expand Up @@ -469,7 +505,9 @@ func (a *Agent) newExecutionProviderParams(info *agentapi.AgentWorkloadInfo, tmp
Stderr: &logEmitter{stderr: true, name: *info.WorkloadName, logs: a.agentLogs},
Stdout: &logEmitter{stderr: false, name: *info.WorkloadName, logs: a.agentLogs},
TmpFilename: &tmpFile,
VmID: *a.md.VmID,

VmID: *a.md.VmID,
FunctionID: info.FunctionID,

Fail: make(chan bool),
Run: make(chan bool),
Expand Down Expand Up @@ -517,6 +555,10 @@ func (a *Agent) shutdown() {
_ = a.deleteExecutableArtifact()

for _, sub := range a.subz {
if !sub.IsValid() {
continue
}

_ = sub.Drain()
}

Expand All @@ -527,10 +569,12 @@ func (a *Agent) shutdown() {
}
}

if a.provider != nil && !a.undeploying.Load() {
err := a.provider.Undeploy()
if err != nil {
fmt.Printf("failed to undeploy workload: %s\n", err)
if len(a.providers) > 0 && !a.undeploying[*a.md.VmID].Load() {
for id := range a.providers {
err := a.providers[id].Undeploy()
if err != nil {
fmt.Printf("failed to undeploy workload: %s\n", err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion agent/providers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type ExecutionProvider interface {
// Execute a deployed function, if supported by the execution provider implementation (e.g., "v8" and "wasm" types)
Execute(ctx context.Context, payload []byte) ([]byte, error)

// Undeploy a workload, giving it a chance to gracefully clean up after itself (if applicable)
// Undeploy a workload (or a single tenant by id), giving it a chance to gracefully clean up after itself (if applicable)
Undeploy() error

// Validate the executable artifact, e.g., specific characteristics of a
Expand Down
15 changes: 12 additions & 3 deletions agent/providers/lib/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ type V8 struct {
namespace string
tmpFilename string
totalBytes int32
vmID string

vmID string
id *string

fail chan bool
run chan bool
Expand All @@ -95,7 +97,12 @@ func (v *V8) Deploy() error {
return fmt.Errorf("invalid state for execution; no compiled code available for vm: %s", v.name)
}

subject := fmt.Sprintf("agentint.%s.trigger", v.vmID)
if v.id == nil {
return fmt.Errorf("invalid state for execution; no workload id provided: %s", v.name)
}

subject := fmt.Sprintf("agentint.%s.%s.trigger", v.vmID, *v.id)

_, err := v.nc.Subscribe(subject, func(msg *nats.Msg) {
ctx := context.WithValue(context.Background(), agentapi.NexTriggerSubject, msg.Header.Get(agentapi.NexTriggerSubject)) //nolint:all
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(msg.Header))
Expand Down Expand Up @@ -903,7 +910,9 @@ func InitNexExecutionProviderV8(params *agentapi.ExecutionProviderParams) (*V8,
namespace: *params.Namespace,
tmpFilename: *params.TmpFilename,
totalBytes: 0, // FIXME
vmID: params.VmID,

vmID: params.VmID,
id: params.FunctionID,

stderr: params.Stderr,
stdout: params.Stdout,
Expand Down
2 changes: 1 addition & 1 deletion control-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type PingResponse struct {
Uptime string `json:"uptime"`
TargetXkey string `json:"target_xkey"`
Tags map[string]string `json:"tags,omitempty"`
RunningMachines int `json:"running_machines"`
RunningMachines int32 `json:"running_machines"`
}

type WorkloadPingResponse struct {
Expand Down
7 changes: 4 additions & 3 deletions host-services/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ func (h *HostServicesServer) SetHostServicesConnection(workloadId string, nc *na
// Adds a secondary host services client used by providers like messaging to connect to the same subject
// space as the workload's corresponding triggers
func (h *HostServicesServer) AddHostServicesConnection(workloadId string, purpose string, nc *nats.Conn) {
if _, ok := h.hsClientConnections[workloadId]; !ok {
h.hsClientConnections[workloadId] = map[string]*nats.Conn{DefaultConnection: nc}
}

if conns, ok := h.hsClientConnections[workloadId]; ok {
conns[purpose] = nc
} else {
h.log.Warn("Attempted to add a host service connection without a default connection")
return
}
}

Expand Down
43 changes: 34 additions & 9 deletions internal/agent-api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (a *AgentClient) DeployWorkload(request *AgentWorkloadInfo) (*DeployRespons
if errors.Is(err, os.ErrDeadlineExceeded) {
return nil, errors.New("timed out waiting for acknowledgement of workload deployment")
} else if errors.Is(err, nats.ErrNoResponders) {
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 250)
a.deployRetryCount += 1
return a.DeployWorkload(request)
} else {
Expand Down Expand Up @@ -183,6 +183,10 @@ func (a *AgentClient) WorkloadBytes() uint64 {
// with the agent client
func (a *AgentClient) Drain() error {
for _, sub := range a.subz {
if !sub.IsValid() {
continue
}

err := sub.Drain()
if err != nil {
a.log.Warn("failed to drain subscription associated with agent client",
Expand All @@ -205,23 +209,33 @@ func (a *AgentClient) Drain() error {

// Stop the agent client instance
// Currently this method simply sets the stopping flag, e.g., allowing any pending handshakes to be aborted
func (a *AgentClient) Stop() error {
func (a *AgentClient) Stop(functionID *string) error {
if atomic.AddUint32(&a.stopping, 1) == 1 {
return nil
}

return errors.New("agent client already stopping")
}

func (a *AgentClient) Undeploy() error {
_ = a.Stop()
func (a *AgentClient) Undeploy(functionID *string) error {
_ = a.Stop(functionID)

var subject string

subject := fmt.Sprintf("agentint.%s.undeploy", a.agentID)
if functionID == nil {
subject = fmt.Sprintf("agentint.%s.undeploy", a.agentID)
} else {
subject = fmt.Sprintf("agentint.%s.%s.undeploy", a.agentID, *functionID)
}

a.log.Debug("sending undeploy request to agent via internal NATS connection",
attrs := []any{
slog.String("subject", subject),
slog.String("agent_id", a.agentID),
)
}
if functionID != nil {
attrs = append(attrs, slog.String("function_id", *functionID))
}
a.log.Debug("sending undeploy request to agent via internal NATS connection", attrs...)

_, err := a.nc.Request(subject, []byte{}, 500*time.Millisecond) // FIXME-- allow this timeout to be configurable... 500ms is likely not enough
if err != nil {
Expand Down Expand Up @@ -274,8 +288,15 @@ func (a *AgentClient) MarkUnselected() {
a.selected = false
}

func (a *AgentClient) RunTrigger(ctx context.Context, tracer trace.Tracer, subject string, data []byte) (*nats.Msg, error) {
intmsg := nats.NewMsg(fmt.Sprintf("agentint.%s.trigger", a.agentID))
func (a *AgentClient) RunTrigger(ctx context.Context, tracer trace.Tracer, id *string, subject string, data []byte) (*nats.Msg, error) {
var intsub string
if id == nil {
intsub = fmt.Sprintf("agentint.%s.trigger", a.agentID)
} else {
intsub = fmt.Sprintf("agentint.%s.%s.trigger", a.agentID, *id)
}

intmsg := nats.NewMsg(intsub)
intmsg.Header.Add(string(NexTriggerSubject), subject)
intmsg.Data = data

Expand Down Expand Up @@ -363,6 +384,8 @@ func (a *AgentClient) handleAgentEvent(msg *nats.Msg) {
}

a.log.Info("Received agent event", slog.String("agent_id", agentID), slog.String("type", evt.Type()))

// FIXME-- send function id if appropriate
a.eventReceived(agentID, evt)
}

Expand All @@ -378,6 +401,8 @@ func (a *AgentClient) handleAgentLog(msg *nats.Msg) {
}

a.log.Log(context.TODO(), shandler.LevelTrace, "Received agent log", slog.String("agent_id", agentID), slog.String("log", logentry.Text))

// FIXME-- send function id if appropriate
a.logReceived(agentID, logentry)
}

Expand Down
Loading
Loading