Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions cmd/gsw_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ func decomInitialize(ctx context.Context) map[int]chan []byte {
channelMap[packet.Port] = finalOutputChannel

go func(packet tlm.TelemetryPacket, ch chan []byte) {
proc.TelemetryPacketWriter(packet, finalOutputChannel, *shmDir)
<-ctx.Done()
err := proc.TelemetryPacketWriter(ctx, packet, finalOutputChannel, *shmDir)
if err != nil && !errors.Is(err, context.Canceled) {
logger.Error("error initializing packet writer", zap.Error(err))
}
close(ch)
}(packet, finalOutputChannel)
}
Expand All @@ -98,16 +100,13 @@ func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, host stri
dbHandler := db.InfluxDBV1Handler{}
err := dbHandler.Initialize(host, port)
if err != nil {
logger.Warn("Warning. Telemetry packets will not be published to database")
return err
}

for _, packet := range proc.GswConfig.TelemetryPackets {
go func(dbHandler db.Handler, packet tlm.TelemetryPacket, ch chan []byte) {
proc.DatabaseWriter(dbHandler, packet, ch)
<-ctx.Done()
close(ch)
}(&dbHandler, packet, channelMap[packet.Port])
go func(packet tlm.TelemetryPacket, ch chan []byte) {
proc.DatabaseWriter(&dbHandler, packet, ch)
}(packet, channelMap[packet.Port])
}

return nil
Expand All @@ -124,7 +123,7 @@ func readConfig() (*viper.Viper, int) {

if err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
logger.Fatal("Error reading GSW config: %w", zap.Error(err))
logger.Fatal("Error reading GSW config", zap.Error(err))
} else {
logger.Warn("Config file not found, reading config from environment variables")
}
Expand Down Expand Up @@ -169,7 +168,7 @@ func main() {

go func() {
sig := <-sigs
logger.Debug("Received signal: ", zap.String("signal", sig.String()))
logger.Debug("Received signal", zap.String("signal", sig.String()))
cancel()
}()

Expand All @@ -181,12 +180,21 @@ func main() {
defer telemetryConfigCleanup()

channelMap := decomInitialize(ctx)
err = dbInitialize(ctx, channelMap, config.GetString("database_host_name"), config.GetInt("database_port_number"))
if err != nil {
logger.Warn("DB Initialization failed", zap.Error(err))
if config.IsSet("database_host_name") && config.IsSet("database_port_number") {
err = dbInitialize(ctx, channelMap, config.GetString("database_host_name"), config.GetInt("database_port_number"))
if err != nil {
logger.Warn("DB Initialization failed, telemetry packets will not be published to the database", zap.Error(err))
}
} else {
logger.Warn("database_host_name or database_port_number is not set, telemetry packets will not be published to the database")
}

// Wait for context cancellation or signal handling
<-ctx.Done()
logger.Info("Shutting down GSW...")

for i, channel := range channelMap {
<-channel
logger.Info("channel closed", zap.Int("port", i))
}
}
5 changes: 5 additions & 0 deletions lib/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,8 @@ func Error(message string, fields ...zap.Field) {
func Panic(message string, fields ...zap.Field) {
logger.Panic(message, fields...)
}

// Log retrieves the underlying zap logger
func Log() *zap.Logger {
return logger
}
10 changes: 7 additions & 3 deletions proc/db_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,28 @@ package proc

import (
"fmt"
"time"

"github.com/AarC10/GSW-V2/lib/db"
"github.com/AarC10/GSW-V2/lib/logger"
"github.com/AarC10/GSW-V2/lib/tlm"
"time"
"go.uber.org/zap"
)

// DatabaseWriter writes telemetry data to the database
// It reads data from the channel and writes it to the database
func DatabaseWriter(handler db.Handler, packet tlm.TelemetryPacket, channel chan []byte) {
log := logger.Log().Named("database").With(zap.String("packet", packet.Name))
measGroup := initMeasurementGroup(packet)
fmt.Println("Started database writer for", packet.Name)
log.Info("Started database writer")

for {
data := <-channel
UpdateMeasurementGroup(packet, measGroup, data)

err := handler.Insert(measGroup)
if err != nil {
fmt.Printf("%s", err)
log.Error("couldn't insert measurement group", zap.Error(err))
}
}
}
Expand Down
57 changes: 38 additions & 19 deletions proc/decom.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package proc

import (
"context"
"errors"
"fmt"
"net"
"strconv"
"sync"

"github.com/AarC10/GSW-V2/lib/ipc"
"github.com/AarC10/GSW-V2/lib/logger"
"github.com/AarC10/GSW-V2/lib/tlm"
"go.uber.org/zap"
)

// newIpcShmHandlerForPacket creates a shared memory IPC handler for a telemetry packet
Expand All @@ -22,50 +27,64 @@ func newIpcShmHandlerForPacket(packet tlm.TelemetryPacket, write bool, shmDir st
}

// TelemetryPacketWriter is a goroutine that listens for telemetry data on a UDP port and writes it to shared memory
func TelemetryPacketWriter(packet tlm.TelemetryPacket, outChannel chan []byte, shmDir string) {
func TelemetryPacketWriter(ctx context.Context, packet tlm.TelemetryPacket, outChannel chan []byte, shmDir string) error {
log := logger.Log().Named("decom").With(zap.String("packet", packet.Name))
packetSize := GetPacketSize(packet)
shmWriter, _ := newIpcShmHandlerForPacket(packet, true, shmDir)
shmWriter, err := newIpcShmHandlerForPacket(packet, true, shmDir)
if shmWriter == nil {
fmt.Printf("Failed to create shared memory writer\n")
return
return fmt.Errorf("creating shared memory writer: %w", err)
}
defer shmWriter.Cleanup()

fmt.Printf("Packet size for port %d: %d bytes %d bits\n", packet.Port, packetSize, packetSize*8)
log.Info(fmt.Sprintf("Packet size: %d bytes %d bits", packetSize, packetSize*8))

addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", packet.Port))
if err != nil {
fmt.Printf("Error resolving UDP address: %v\n", err)
return
return fmt.Errorf("resolving listen address: %w", err)
}

conn, err := net.ListenUDP("udp", addr)
if err != nil {
fmt.Printf("Error listening on UDP: %v\n", err)
return
return fmt.Errorf("listening: %w", err)
}
defer func(conn *net.UDPConn) {
err := conn.Close()
if err != nil {
fmt.Printf("Error closing UDP connection: %v\n", err)
}
}(conn)

fmt.Printf("Listening on port %d for telemetry packet...\n", packet.Port)
var closeConnOnce sync.Once
closeConn := func() {
closeConnOnce.Do(func() {
if err := conn.Close(); err != nil {
log.Error("error closing UDP connection", zap.Error(err))
}
})
}
defer closeConn()

stopf := context.AfterFunc(ctx, closeConn)
defer stopf()

log.Info(fmt.Sprintf("Listening on %d for telemetry packet...", packet.Port))

// Receive data
buffer := make([]byte, packetSize)
for {
n, _, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Printf("Error reading from UDP: %v\n", err)
if ctx.Err() != nil {
return ctx.Err()
}

// a closed connection would be unrecoverable, so return the error
if errors.Is(err, net.ErrClosed) {
return err
}

log.Error("error reading from UDP", zap.Error(err))
continue
}

if n == packetSize {
err := shmWriter.Write(buffer)
if err != nil {
fmt.Printf("Error writing to shared memory: %v\n", err)
log.Error("error writing to shared memory", zap.Error(err))
}

select {
Expand All @@ -75,7 +94,7 @@ func TelemetryPacketWriter(packet tlm.TelemetryPacket, outChannel chan []byte, s
break
}
} else {
fmt.Printf("Received packet of incorrect size. Expected: %d, Received: %d\n", packetSize, n)
log.Error("received packet of incorrect size", zap.Int("expected", packetSize), zap.Int("received", n))
}
}
}
Expand Down
Loading