Skip to content

Commit

Permalink
Merge pull request #15 from devilcove/cleanup
Browse files Browse the repository at this point in the history
cleanup
  • Loading branch information
mattkasun authored Apr 4, 2024
2 parents 3ed2bdd + 80687ff commit aa8845e
Show file tree
Hide file tree
Showing 15 changed files with 27 additions and 230 deletions.
5 changes: 1 addition & 4 deletions agent/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ func startBroker() (*server.Server, *nats.EncodedConn) {
}

func subcribe(ec *nats.EncodedConn) {
//ec.Subscribe(">", func(subj string, msg *any) {
//slog.Debug("received nats message", "subject", subj, "data", *msg)
//})
_, _ = ec.Subscribe(Agent+plexus.Status, func(subject, reply string, data any) {
slog.Debug("status request received")
if err := ec.Publish(reply, processStatus()); err != nil {
Expand Down Expand Up @@ -237,7 +234,7 @@ func subcribeToServerTopics(self Device) {
return
}
if err := startInterface(self, network); err != nil {
slog.Error("error starting interace", "interface", network.Interface, "network", network.Name, "error", err)
slog.Error("error starting interface", "interface", network.Interface, "network", network.Name, "error", err)
return
}
})
Expand Down
17 changes: 7 additions & 10 deletions agent/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/nats-io/nkeys"
)

var ()

func Run() {
plexus.SetLogging(Config.Verbosity)
if err := boltdb.Initialize(path+"plexus-agent.db", []string{deviceTable, networkTable}); err != nil {
Expand All @@ -34,7 +32,7 @@ func Run() {
}
startAllInterfaces(self)
checkinTicker := time.NewTicker(checkinTime)
//serverTicker := time.NewTicker(serverCheckTime)
serverTicker := time.NewTicker(serverCheckTime)
for {
select {
case <-quit:
Expand All @@ -55,11 +53,13 @@ func Run() {
return
case <-checkinTicker.C:
checkin()
//case <-serverTicker.C:
case <-serverTicker.C:
// reconnect to servers in case server was down when tried to connect earlier
//slog.Debug("refreshing server connection")
//closeServerConnections()
//connectToServers()
slog.Debug("refreshing server connection")
closeServerConnections()
if err := connectToServer(self); err != nil {
slog.Error("server connection", "error", err)
}
}
}
}
Expand Down Expand Up @@ -120,11 +120,8 @@ func checkin() {
slog.Error("get device", "error", err)
return
}
//stunCheck(self, checkPort(self.ListenPort))
checkinData.ID = self.WGPublicKey
checkinData.Version = self.Version
//checkinData.ListenPort = self.ListenPort
//checkinData.PublicListenPort = self.PublicListenPort
checkinData.Endpoint = self.Endpoint
serverEC := serverConn.Load()
if serverEC == nil {
Expand Down
9 changes: 0 additions & 9 deletions agent/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,6 @@ type StatusResponse struct {
Networks []Network
}

type ServerConnection struct {
Server string
Connected string
}

type NetData struct {
Name string
}

type LeaveServerRequest struct {
Force bool
}
12 changes: 5 additions & 7 deletions agent/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ func publishDeviceUpdate(self *Device) {
return
}
if err := serverEC.Publish(self.WGPublicKey+plexus.UpdatePeer, plexus.Peer{
WGPublicKey: self.WGPublicKey,
PubNkey: self.PubNkey,
Version: self.Version,
Name: self.Name,
OS: self.OS,
//ListenPort: self.ListenPort,
//PublicListenPort: self.PublicListenPort,
WGPublicKey: self.WGPublicKey,
PubNkey: self.PubNkey,
Version: self.Version,
Name: self.Name,
OS: self.OS,
Endpoint: self.Endpoint,
NatsConnected: true,
},
Expand Down
8 changes: 3 additions & 5 deletions agent/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,9 @@ func createPeer() (*plexus.Peer, *wgtypes.Key, string, error) {
PubNkey: nkey,
Name: name,
Version: "v0.1.0",
//ListenPort: port,
//PublicListenPort: stunAddr.Port,
Endpoint: stunAddr.IP,
OS: runtime.GOOS,
Updated: time.Now(),
Endpoint: stunAddr.IP,
OS: runtime.GOOS,
Updated: time.Now(),
}
return peer, privKey, string(seed), nil
}
Expand Down
2 changes: 1 addition & 1 deletion app/plexus-agent/cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@ func init() {

// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
versionCmd.Flags().BoolVarP(&long, "long", "l", false, "display server(s)/agent version infomation")
versionCmd.Flags().BoolVarP(&long, "long", "l", false, "display server(s)/agent version information")
}
2 changes: 1 addition & 1 deletion app/plexus-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
48 changes: 0 additions & 48 deletions app/plexus-server/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ func broker(ctx context.Context, wg *sync.WaitGroup, tls *tls.Config) {
brokerfail <- 1
return
}
//TODO :: add users
// users := GetUsers()
tokensUsers := getTokenUsers()
deviceUsers := getDeviceUsers()
natsOptions = &server.Options{
Expand Down Expand Up @@ -63,21 +61,10 @@ func broker(ctx context.Context, wg *sync.WaitGroup, tls *tls.Config) {
slog.Error("not ready for connection", "error", err)
return
}
//connectOpts := nats.Options{
// Url: nats.DefaultURL,
// //Url: "nats://localhost:4222",
// Nkey: adminPublicKey,
// Name: "nats-test-nkey",
// SignatureCB: func(nonce []byte) ([]byte, error) {
// return adminKey.Sign(nonce)
// },
//}
SignatureCB := func(nonce []byte) ([]byte, error) {
return adminKey.Sign(nonce)
}
opts := []nats.Option{nats.Nkey(adminPublicKey, SignatureCB)}
//opts = append(opts, )
//natsConn, err = connectOpts.Connect()
natsConn, err = nats.Connect(fmt.Sprintf("nats://%s:4222", config.FQDN), opts...)
if err != nil {
slog.Error("nats connect", "error", err)
Expand All @@ -91,26 +78,6 @@ func broker(ctx context.Context, wg *sync.WaitGroup, tls *tls.Config) {

subscrptions := serverSubcriptions()

//configSub, err := encodedConn.Subscribe("config.*", func(sub, reply string, request any) {
//response := configHandler(sub)
// if err := encodedConn.Publish(reply, response); err != nil {
// slog.Error("pub response to config request", "error", err)
// }
//})
//if err != nil {
// slog.Error("subcribe config", "error", err)
//}
//leaveSub, err := encodedConn.Subscribe("leave.*", func(subj, reply string, request *plexus.AgentRequest) {
// response := processLeave(request)
// slog.Debug("publish leave reply", "response", response)
// if err := encodedConn.Publish(reply, response); err != nil {
// slog.Error("leave reply", "error", err)
// }
//})
//if err != nil {
// slog.Error("subscribe leave", "error", err)
//}

slog.Info("broker started")
pingTicker := time.NewTicker(pingTick)
keyTicker := time.NewTicker(keyTick)
Expand All @@ -123,11 +90,6 @@ func broker(ctx context.Context, wg *sync.WaitGroup, tls *tls.Config) {
for _, sub := range subscrptions {
_ = sub.Drain()
}
//registerSub.Drain()
//checkinSub.Drain()
//updateSub.Drain()
//configSub.Drain()
//leaveSub.Drain()
return
case token := <-newDevice:
slog.Info("new login device", "device", token)
Expand Down Expand Up @@ -269,17 +231,7 @@ func serverSubcriptions() []*nats.Subscription {
subcriptions = append(subcriptions, register)

//device subscriptions
//general
//general, err := eConn.Subscribe(">", func(subj, repl string, request *any) {
// slog.Debug("received request", "subject", subj, "message", request)
//})
//if err != nil {
// slog.Error("subcribe general", "error", err)
//}
//subcriptions = append(subcriptions, general)

//checkin

checkin, err := eConn.Subscribe("*"+plexus.Checkin, func(subj, reply string, request *plexus.CheckinData) {
if len(subj) != 52 {
slog.Error("invalid subj", "subj", subj)
Expand Down
2 changes: 1 addition & 1 deletion app/plexus-server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func configureServer() (*tls.Config, error) {
}

}
// initalize database
// initialize database
if err := os.MkdirAll(config.DBPath, os.ModePerm); err != nil {
return nil, err
}
Expand Down
23 changes: 0 additions & 23 deletions app/plexus-server/nathandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,6 @@ func processCheckin(data *plexus.CheckinData) plexus.MessageResponse {
if peer.Version != data.Version {
peer.Version = data.Version
}
//if peer.PublicListenPort != data.PublicListenPort {
// peer.PublicListenPort = data.PublicListenPort
// publishUpdate = true
//}
//if peer.ListenPort != data.ListenPort {
// peer.ListenPort = data.ListenPort
// publishUpdate = true
//}
if !peer.Endpoint.Equal(data.Endpoint) {
peer.Endpoint = data.Endpoint
publishUpdate = true
Expand Down Expand Up @@ -257,21 +249,6 @@ func processLeave(id string, request *plexus.LeaveRequest) plexus.MessageRespons
}
}

//func connectToNetwork(request *plexus.AgentRequest) plexus.ServerResponse {
// errResponse := plexus.ServerResponse{Error: true}
// response := plexus.ServerResponse{}
// slog.Debug("join request", "peer", request.Peer.WGPublicKey, "network", request.Network)
// network, err := addPeerToNetwork(request.Peer.WGPublicKey, request.Network)
// if err != nil {
// slog.Debug("add peer to network", "error", err)
// errResponse.Message = err.Error()
// return errResponse
// }
// response.Networks = append(response.Networks, network)
// response.Message = fmt.Sprintf("%s added to network %s", request.Peer.WGPublicKey, request.Network)
// return response
//}

func publishNetworkPeerUpdate(peer plexus.Peer) error {
networks, err := boltdb.GetAll[plexus.Network](networkTable)
if err != nil {
Expand Down
40 changes: 0 additions & 40 deletions app/plexus-server/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,43 +170,3 @@ func savePeer(peer plexus.Peer) {
}
}
}

//func processUpdate(request *plexus.AgentRequest) plexus.ServerResponse {
// network, err := boltdb.Get[plexus.Network](request.Network, networkTable)
// if err != nil {
// return plexus.ServerResponse{
// Error: true,
// Message: "invalid network " + request.Network,
// }
// }
// found := false
// for i, peer := range network.Peers {
// if peer.WGPublicKey == request.NetworkPeer.WGPublicKey {
// network.Peers[i] = request.NetworkPeer
// found = true
// break
// }
// }
// if !found {
// return plexus.ServerResponse{
// Error: true,
// Message: "invalid peer" + request.NetworkPeer.HostName,
// }
// }
// if err := boltdb.Save(network, network.Name, networkTable); err != nil {
// return plexus.ServerResponse{
// Error: true,
// Message: "network not updated: " + err.Error(),
// }
// }
// slog.Debug("publishing network update", "network", request.Network, "action", plexus.UpdateNetworkPeer)
// if err := eConn.Publish("network."+network.Name, plexus.NetworkUpdate{
// Action: plexus.UpdateNetworkPeer,
// Peer: request.NetworkPeer,
// }); err != nil {
// slog.Error("publish network update", "error", err)
// }
// return plexus.ServerResponse{
// Message: fmt.Sprintf("network %s update", request.Network),
// }
//}
5 changes: 0 additions & 5 deletions app/plexus-server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ func setupRouter() *gin.Engine {
c.FileFromFS(c.Request.URL.Path, http.FS(f))
})
_ = router.SetTrustedProxies(nil)
//config := sloggin.Config{
// DefaultLevel: slog.LevelDebug,
// ClientErrorLevel: slog.LevelWarn,
// ServerErrorLevel: slog.LevelError,
//}
router.Use(gin.Recovery(), session, weblogger())
router.GET("/", displayMain)
router.POST("/", login)
Expand Down
1 change: 0 additions & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ func SetLogging(v string) {
TimeFormat: time.Kitchen,
Level: logLevel,
}))
//logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{AddSource: true, ReplaceAttr: replace, Level: logLevel}))
slog.SetDefault(logger)
switch strings.ToUpper(v) {
case "DEBUG":
Expand Down
Loading

0 comments on commit aa8845e

Please sign in to comment.