Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session log: close unfinished sessions on startup #10246

Merged
merged 8 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/loadpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type Loadpoint struct {
progress *Progress // Step-wise progress indicator

// session log
db session.Database
db *session.DB
session *session.Session

tasks *util.Queue[Task] // tasks to be executed
Expand Down
96 changes: 96 additions & 0 deletions core/loadpoint_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,99 @@ func TestSession(t *testing.T) {
assert.Len(t, s, 1)
t.Logf("session: %+v", s)
}

func TestCloseSessionsOnStartup_emptyDb(t *testing.T) {
var err error
serverdb.Instance, err = serverdb.New("sqlite", ":memory:")
assert.NoError(t, err)

db, err := session.NewStore("foo", serverdb.Instance)
assert.NoError(t, err)

// assert empty DB is no problem
err = db.ClosePendingSessionsInHistory(1000)
assert.NoError(t, err)
}

func TestCloseSessionsOnStartup(t *testing.T) {
var err error
serverdb.Instance, err = serverdb.New("sqlite", ":memory:")
assert.NoError(t, err)

db1, err := session.NewStore("foo", serverdb.Instance)
assert.NoError(t, err)

db2, err := session.NewStore("bar", serverdb.Instance)
assert.NoError(t, err)

clock := clock.NewMock()

//test data, creates 6 sessions for each loadpoint, 3rd and 6th are "unfinished"
var sessions1 []*session.Session = createMockSessions(db1, clock)
var sessions2 []*session.Session = createMockSessions(db2, clock)

//write interleaved for two loadpoints
for index, session := range sessions1 {
db1.Persist(session)
db2.Persist(sessions2[index])
}

err = db1.ClosePendingSessionsInHistory(1000)
assert.NoError(t, err)

//check fixed sessions for db1
var db1Sessions session.Sessions
err = serverdb.Instance.Where("Loadpoint = ?", "foo").Order("ID").Find(&db1Sessions).Error
assert.NoError(t, err)
assert.Len(t, db1Sessions, 6)

//check fixed history
for _, s := range db1Sessions[:5] {
assert.NotEmpty(t, s.MeterStop)
assert.Equal(t, float64(10), s.ChargedEnergy)
t.Logf("session: %+v", s)
}

//check fixed most recent record
assert.NotEmpty(t, db1Sessions[5].MeterStop)
assert.Equal(t, float64(940), db1Sessions[5].ChargedEnergy)

//ensure no side effects on loadpoint 2 data, i.e. data left unfixed
var db2Sessions session.Sessions
err = serverdb.Instance.Where("Loadpoint = ?", "bar").Order("ID").Find(&db2Sessions).Error
assert.NoError(t, err)
assert.Len(t, db2Sessions, 6)

for i, s := range db2Sessions {
if (i+1)%3 == 0 {
assert.Empty(t, s.MeterStop)
assert.Empty(t, s.ChargedEnergy)
continue
}
assert.NotEmpty(t, s.MeterStop)
assert.Equal(t, float64(10), s.ChargedEnergy)
}
}

func createMockSessions(db *session.DB, clock *clock.Mock) []*session.Session {
var sessions []*session.Session
for i := 1; i <= 6; i++ {

var meter1Start float64 = float64(i * 10)
session := db.New(meter1Start)
session.Created = clock.Now().Add(1 * time.Minute)

//create every third session as incomplete
if i%3 == 0 {
sessions = append(sessions, session)
continue
}

session.Finished = clock.Now().Add(2 * time.Minute)
meterStop := float64(meter1Start + 10)
session.MeterStop = &meterStop
session.ChargedEnergy = 10
sessions = append(sessions, session)
}
return sessions
}
33 changes: 28 additions & 5 deletions core/session/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ type DB struct {
name string
}

type Database interface {
New(startEnergy float64) *Session
Persist(session interface{})
}

// NewStore creates a session store
func NewStore(name string, db *gorm.DB) (*DB, error) {
err := db.AutoMigrate(new(Session))
Expand Down Expand Up @@ -57,3 +52,31 @@ func (s *DB) Sessions() (Sessions, error) {
tx := s.db.Find(&res)
return res, tx.Error
}

func (s *DB) ClosePendingSessionsInHistory(chargeMeterTotal float64) error {
var res Sessions
if tx := s.db.Find(&res, map[string]interface{}{"finished": "0001-01-01 00:00:00+00:00", "Loadpoint": s.name}); tx.Error != nil {
andig marked this conversation as resolved.
Show resolved Hide resolved
return tx.Error
}

for _, session := range res {
var nextSession Session

var tx *gorm.DB
if tx = s.db.Limit(1).Order("ID").Find(&nextSession, "ID > ? AND Loadpoint = ?", session.ID, s.name); tx.Error != nil {
andig marked this conversation as resolved.
Show resolved Hide resolved
return tx.Error
}

if tx.RowsAffected == 0 {
// no successor, this is the most recent session and it is open
session.MeterStop = &chargeMeterTotal
} else {
session.MeterStop = nextSession.MeterStart
}

session.ChargedEnergy = *session.MeterStop - *session.MeterStart
s.Persist(session)
}

return nil
}
4 changes: 4 additions & 0 deletions core/site.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func NewSiteFromConfig(
if lp.db, err = session.NewStore(lp.Title(), db.Instance); err != nil {
return nil, err
}
// Fix any dangling history
if err := lp.db.ClosePendingSessionsInHistory(lp.chargeMeterTotal()); err != nil {
return nil, err
}

// NOTE: this requires stopSession to respect async access
shutdown.Register(lp.stopSession)
Expand Down