Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 115 additions & 46 deletions tariff/amber.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,12 @@ func (t *Amber) run(done chan error) {
continue
}

// Group by hour and average intervals within each hour
hourlyData := make(map[time.Time]*struct {
totalValue float64
totalDuration time.Duration
start time.Time
currentValue *float64 // Override with current interval if present (for accurate charging session costs)
})
// Create and sort time-ordered list of all Amber intervals
var intervals []struct {
start, end time.Time
value float64
isCurrent bool
}

for _, r := range res {
if t.channel == strings.ToLower(r.ChannelType) {
Expand All @@ -114,56 +113,126 @@ func (t *Amber) run(done chan error) {
value = -value
}

localStart := startTime.Local()
localEnd := endTime.Local()
hourStart := localStart.Truncate(time.Hour) // Preserve date+hour
duration := localEnd.Sub(localStart)

// Initialize hour entry if needed
if hourlyData[hourStart] == nil {
hourlyData[hourStart] = &struct {
totalValue float64
totalDuration time.Duration
start time.Time
currentValue *float64
}{start: hourStart}
}
intervals = append(intervals, struct {
start, end time.Time
value float64
isCurrent bool
}{
start: startTime.Local(),
end: endTime.Local(),
value: value,
isCurrent: r.Type == "CurrentInterval",
})
}
}

hr := hourlyData[hourStart]
if len(intervals) == 0 {
mergeRates(t.data, nil)
once.Do(func() { close(done) })
continue
}

// If this is the current interval, use its value directly for this hour
if r.Type == "CurrentInterval" {
hr.currentValue = &value
} else {
// Add to weighted average for forecast intervals
hr.totalValue += value * duration.Seconds()
hr.totalDuration += duration
}
// Sort intervals by start time to ensure correct processing
slices.SortFunc(intervals, func(a, b struct {
start, end time.Time
value float64
isCurrent bool
}) int {
return a.start.Compare(b.start)
})

data := t.buildSlotRates(intervals)

mergeRates(t.data, data)
once.Do(func() { close(done) })
}
}

// buildSlotRates converts Amber intervals into 15-minute slots using bucket sharding
// to avoid O(slots × intervals) complexity and only create slots with actual data
func (t *Amber) buildSlotRates(intervals []struct {
start, end time.Time
value float64
isCurrent bool
}) api.Rates {
// Build slot buckets using sharding approach
type bucket struct {
totalSecs float64
weightedSum float64
current *float64
}
buckets := make(map[time.Time]*bucket)

for _, iv := range intervals {
// Truncate start to slot boundary
slot := iv.start.Truncate(SlotDuration)
end := iv.end

for slot.Before(end) {
next := slot.Add(SlotDuration)

// Compute overlap [max(slot, iv.start), min(next, iv.end))
overlapStart := slot
if iv.start.After(slot) {
overlapStart = iv.start
}
}

// Convert to final hourly rates
data := make(api.Rates, 0, len(hourlyData))
for _, hr := range hourlyData {
var finalValue float64
if hr.currentValue != nil {
// Use current interval value if available
finalValue = *hr.currentValue
} else if hr.totalDuration > 0 {
// Otherwise use weighted average of forecast intervals
finalValue = hr.totalValue / hr.totalDuration.Seconds()
overlapEnd := next
if iv.end.Before(next) {
overlapEnd = iv.end
}

overlapSecs := overlapEnd.Sub(overlapStart).Seconds()

b, ok := buckets[slot]
if !ok {
b = &bucket{}
buckets[slot] = b
}

if iv.isCurrent {
// Current interval overrides the entire slot
b.current = &iv.value
} else {
// Add to weighted average
b.weightedSum += iv.value * overlapSecs
b.totalSecs += overlapSecs
}

slot = next
}
}

// Convert buckets to sorted rates, skipping empty slots
var data api.Rates
for start, b := range buckets {
var finalValue float64
hasValue := false

if b.current != nil {
finalValue = *b.current
hasValue = true
} else if b.totalSecs > 0 {
finalValue = b.weightedSum / b.totalSecs
hasValue = true
}

// Only add slots with actual data
if hasValue {
data = append(data, api.Rate{
Start: hr.start,
End: hr.start.Add(time.Hour),
Start: start,
End: start.Add(SlotDuration),
Value: finalValue,
})
}

mergeRates(t.data, data)
once.Do(func() { close(done) })
}

// Sort by start time
slices.SortFunc(data, func(a, b api.Rate) int {
return a.Start.Compare(b.Start)
})

return data
}

// Rates implements the api.Tariff interface
Expand Down
Loading