Skip to content

Commit 554bca1

Browse files
committed
Update.
1 parent b83f416 commit 554bca1

28 files changed

+511
-187
lines changed

api/remote/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ func NewClient(specificClient external.Caravela, clientNode common.Node) *Client
2222

2323
func (h *Client) getRequestContext(ctx context.Context) context.Context {
2424
if h.clientNode != nil {
25-
return context.WithValue(ctx, types.PartitionsStateKey, h.clientNode.GetSystemPartitionsState().PartitionsState())
25+
ctx = context.WithValue(ctx, types.PartitionsStateKey, h.clientNode.GetSystemPartitionsState().PartitionsState())
26+
ctx = context.WithValue(ctx, types.NodeGUIDKey, h.clientNode.GUID())
27+
return ctx
2628
} else {
2729
return context.Background()
2830
}

cli/init_node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/strabox/caravela/docker"
1111
"github.com/strabox/caravela/node"
1212
"github.com/strabox/caravela/node/common/guid"
13-
"github.com/strabox/caravela/overlay"
13+
overlayFactory "github.com/strabox/caravela/overlay/factory"
1414
"strings"
1515
)
1616

@@ -49,7 +49,7 @@ func initNode(hostIP, configFilePath string, join bool, joinIP string) error {
4949
int64(systemConfigurations.GUIDScaleFactor()))
5050

5151
// Create Overlay Component
52-
overlayConfigured := overlay.Create(systemConfigurations)
52+
overlayConfigured := overlayFactory.Create(systemConfigurations)
5353

5454
// Create CARAVELA's Remote httpClient
5555
caravelaCli := remote.NewHttpClient(systemConfigurations.APIPort(), systemConfigurations.APITimeout())

node/common/local_node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ import "github.com/strabox/caravela/node/discovery/offering/partitions"
44

55
type Node interface {
66
GetSystemPartitionsState() *partitions.SystemResourcePartitions
7+
GUID() string
78
}

node/containers/manager.go

Lines changed: 75 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import (
1111
"github.com/strabox/caravela/node/common/resources"
1212
"github.com/strabox/caravela/node/external"
1313
"github.com/strabox/caravela/util"
14+
"github.com/strabox/caravela/util/debug"
1415
"sync"
16+
"unsafe"
1517
)
1618

1719
// Containers manager responsible for interacting with the Docker daemon and managing all the interaction with the
@@ -25,7 +27,7 @@ type Manager struct {
2527
supplier supplierLocal // Local Supplier component.
2628

2729
quitChan chan bool // Channel to alert that the node is stopping.
28-
containersMutex *sync.Mutex // Mutex to control access to containers map.
30+
containersMutex sync.Mutex // Mutex to control access to containers map.
2931
containersMap map[string]map[string]*localContainer // Collection of deployed containers (buyerIP->(containerID->Container)).
3032
}
3133

@@ -38,21 +40,21 @@ func NewManager(config *configuration.Configuration, dockerClient external.Docke
3840
supplier: supplier,
3941

4042
quitChan: make(chan bool),
41-
containersMutex: &sync.Mutex{},
43+
containersMutex: sync.Mutex{},
4244
containersMap: make(map[string]map[string]*localContainer),
4345
}
4446
}
4547

4648
// receiveDockerEvents
47-
func (man *Manager) receiveDockerEvents(eventsChan <-chan *events.Event) {
49+
func (m *Manager) receiveDockerEvents(eventsChan <-chan *events.Event) {
4850
go func() {
4951
for {
5052
select {
5153
case event := <-eventsChan:
5254
if event.Type == events.ContainerDied {
53-
man.StopContainer(event.Value)
55+
m.StopContainer(event.Value)
5456
}
55-
case quit := <-man.quitChan: // Stopping the containers management
57+
case quit := <-m.quitChan: // Stopping the containers management
5658
if quit {
5759
log.Infof(util.LogTag("CONTAINER") + "STOPPED")
5860
return
@@ -63,18 +65,18 @@ func (man *Manager) receiveDockerEvents(eventsChan <-chan *events.Event) {
6365
}
6466

6567
// Verify if the offer is valid and alert the supplier and after that start the container in the Docker engine.
66-
func (man *Manager) StartContainer(fromBuyer *types.Node, offer *types.Offer, containersConfigs []types.ContainerConfig,
68+
func (m *Manager) StartContainer(fromBuyer *types.Node, offer *types.Offer, containersConfigs []types.ContainerConfig,
6769
totalResourcesNecessary resources.Resources) ([]types.ContainerStatus, error) {
68-
if !man.IsWorking() {
70+
if !m.IsWorking() {
6971
panic(fmt.Errorf("can't start container, container manager not working"))
7072
}
7173

72-
man.containersMutex.Lock()
73-
defer man.containersMutex.Unlock()
74+
m.containersMutex.Lock()
75+
defer m.containersMutex.Unlock()
7476

7577
// =================== Obtain the resources from the offer ==================
7678

77-
obtained := man.supplier.ObtainResources(offer.ID, totalResourcesNecessary, len(containersConfigs))
79+
obtained := m.supplier.ObtainResources(offer.ID, totalResourcesNecessary, len(containersConfigs))
7880
if !obtained {
7981
log.Debugf(util.LogTag("CONTAINER")+"Container NOT RUNNING, invalid offer: %d", offer.ID)
8082
return nil, fmt.Errorf("can't start container, invalid offer: %d", offer.ID)
@@ -85,11 +87,11 @@ func (man *Manager) StartContainer(fromBuyer *types.Node, offer *types.Offer, co
8587
deployedContStatus := make([]types.ContainerStatus, 0)
8688

8789
for _, contConfig := range containersConfigs {
88-
containerStatus, err := man.dockerClient.RunContainer(contConfig)
90+
containerStatus, err := m.dockerClient.RunContainer(contConfig)
8991
if err != nil { // If can't deploy a container remove all the other containers.
90-
man.supplier.ReturnResources(totalResourcesNecessary, len(containersConfigs))
92+
m.supplier.ReturnResources(totalResourcesNecessary, len(containersConfigs))
9193
for _, contStatus := range deployedContStatus {
92-
man.StopContainer(contStatus.ContainerID)
94+
m.StopContainer(contStatus.ContainerID)
9395
}
9496
return nil, err
9597
}
@@ -104,15 +106,15 @@ func (man *Manager) StartContainer(fromBuyer *types.Node, offer *types.Offer, co
104106
newContainer := newContainer(contConfig.Name, contConfig.ImageKey, contConfig.Args, contConfig.PortMappings,
105107
*contResources, containerID, fromBuyer.IP)
106108

107-
if _, ok := man.containersMap[fromBuyer.IP]; !ok {
109+
if _, ok := m.containersMap[fromBuyer.IP]; !ok {
108110
userContainersMap := make(map[string]*localContainer)
109111
userContainersMap[containerID] = newContainer
110-
man.containersMap[fromBuyer.IP] = userContainersMap
112+
m.containersMap[fromBuyer.IP] = userContainersMap
111113
} else {
112-
man.containersMap[fromBuyer.IP][containerID] = newContainer
114+
m.containersMap[fromBuyer.IP][containerID] = newContainer
113115
}
114116

115-
deployedContStatus[i].SupplierIP = man.config.HostIP() // Set the container's supplier's IP!
117+
deployedContStatus[i].SupplierIP = m.config.HostIP() // Set the container's supplier's IP!
116118

117119
log.Debugf(util.LogTag("CONTAINER")+"[%d] Container %s RUNNING, Img: %s, Args: %v, Res: <%d,%d>",
118120
i, containerID[0:12], contConfig.ImageKey, contConfig.Args, contResources.CPUs(),
@@ -123,21 +125,21 @@ func (man *Manager) StartContainer(fromBuyer *types.Node, offer *types.Offer, co
123125
}
124126

125127
// StopContainer stop a local container in the Docker engine and remove it.
126-
func (man *Manager) StopContainer(containerIDToStop string) error {
127-
man.containersMutex.Lock()
128-
defer man.containersMutex.Unlock()
128+
func (m *Manager) StopContainer(containerIDToStop string) error {
129+
m.containersMutex.Lock()
130+
defer m.containersMutex.Unlock()
129131

130-
for buyerIP, containersMap := range man.containersMap {
132+
for buyerIP, containersMap := range m.containersMap {
131133
for containerID, container := range containersMap {
132134
if containerID == containerIDToStop {
133-
man.dockerClient.RemoveContainer(containerIDToStop)
134-
man.supplier.ReturnResources(container.Resources(), 1)
135+
m.dockerClient.RemoveContainer(containerIDToStop)
136+
m.supplier.ReturnResources(container.Resources(), 1)
135137
delete(containersMap, containerID)
136138
return nil
137139
}
138140
}
139141
if containersMap == nil || len(containersMap) == 0 {
140-
delete(man.containersMap, buyerIP)
142+
delete(m.containersMap, buyerIP)
141143
}
142144
}
143145

@@ -148,32 +150,67 @@ func (man *Manager) StopContainer(containerIDToStop string) error {
148150
// = SubComponent Interface =
149151
// ===============================================================================
150152

151-
func (man *Manager) Start() {
152-
man.Started(man.config.Simulation(), func() {
153-
if !man.config.Simulation() {
154-
eventsChan := man.dockerClient.Start()
155-
man.receiveDockerEvents(eventsChan)
153+
func (m *Manager) Start() {
154+
m.Started(m.config.Simulation(), func() {
155+
if !m.config.Simulation() {
156+
eventsChan := m.dockerClient.Start()
157+
m.receiveDockerEvents(eventsChan)
156158
}
157159
})
158160
}
159161

160-
func (man *Manager) Stop() {
161-
man.Stopped(func() {
162-
man.containersMutex.Lock()
163-
defer man.containersMutex.Unlock()
162+
func (m *Manager) Stop() {
163+
m.Stopped(func() {
164+
m.containersMutex.Lock()
165+
defer m.containersMutex.Unlock()
164166

165167
// Stop and remove all the running containers from the docker engine
166-
for _, containers := range man.containersMap {
168+
for _, containers := range m.containersMap {
167169
for containerID := range containers {
168-
man.dockerClient.RemoveContainer(containerID)
170+
m.dockerClient.RemoveContainer(containerID)
169171
log.Debugf(util.LogTag("CONTAINER")+"Container, %s STOPPED and REMOVED", containerID)
170172
}
171173
}
172174

173-
man.quitChan <- true
175+
m.quitChan <- true
174176
})
175177
}
176178

177-
func (man *Manager) IsWorking() bool {
178-
return man.Working()
179+
func (m *Manager) IsWorking() bool {
180+
return m.Working()
181+
}
182+
183+
// ===============================================================================
184+
// = Debug Methods =
185+
// ===============================================================================
186+
187+
func (m *Manager) DebugSizeBytes() int {
188+
localContainerSize := func(container *localContainer) uintptr {
189+
contSizeBytes := unsafe.Sizeof(*container)
190+
contSizeBytes += debug.DebugSizeofString(container.buyerIP)
191+
// common.Container
192+
contSizeBytes += unsafe.Sizeof(*container.Container)
193+
contSizeBytes += debug.DebugSizeofString(container.Name())
194+
contSizeBytes += debug.DebugSizeofString(container.ImageKey())
195+
contSizeBytes += debug.DebugSizeofString(container.ID())
196+
contSizeBytes += debug.DebugSizeofStringSlice(container.Args())
197+
contSizeBytes += debug.DebugSizeofPortMappings(container.PortMappings())
198+
return contSizeBytes
199+
}
200+
201+
contManagerSizeBytes := unsafe.Sizeof(*m)
202+
for k, v := range m.containersMap {
203+
contManagerSizeBytes += unsafe.Sizeof(k)
204+
contManagerSizeBytes += debug.DebugSizeofString(k)
205+
contManagerSizeBytes += unsafe.Sizeof(v)
206+
if v != nil {
207+
for k2, v2 := range v {
208+
contManagerSizeBytes += unsafe.Sizeof(k2)
209+
contManagerSizeBytes += debug.DebugSizeofString(k2)
210+
contManagerSizeBytes += unsafe.Sizeof(v2)
211+
contManagerSizeBytes += localContainerSize(v2)
212+
}
213+
}
214+
}
215+
return int(contManagerSizeBytes)
179216
}

node/discovery/backend/discovery.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ type Discovery interface {
1717
// Component is the interface that all "runnable" Components in caravela must adhere to.
1818
common.Component
1919

20+
//
21+
GUID() string
22+
2023
// =========================== Internal Services (Mandatory to Implement) =====================
2124
//
2225
AddTrader(traderGUID guid.GUID)
@@ -43,9 +46,13 @@ type Discovery interface {
4346

4447
// ========================== External/Remote Services (Only Simulation) =======================
4548
//
46-
NodeInformationSim() (types.Resources, types.Resources, int)
49+
NodeInformationSim() (types.Resources, types.Resources, int, int)
4750
//
4851
RefreshOffersSim()
4952
//
5053
SpreadOffersSim()
54+
55+
// ===================================== Debug Methods =========================================
56+
//
57+
DebugSizeBytes() int
5158
}

node/discovery/discovery_factory.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ import (
1212
"github.com/strabox/caravela/node/discovery/random"
1313
"github.com/strabox/caravela/node/discovery/swarm"
1414
"github.com/strabox/caravela/node/external"
15+
"github.com/strabox/caravela/overlay"
1516
"strings"
1617
)
1718

1819
// DiscoveryBackendFactory represents a method that creates a new discovery backend.
19-
type BackendFactory func(node common.Node, config *configuration.Configuration, overlay external.Overlay,
20+
type BackendFactory func(node common.Node, config *configuration.Configuration, overlay overlay.Overlay,
2021
client external.Caravela, resourcesMap *resources.Mapping, maxResources resources.Resources) (backend.Discovery, error)
2122

2223
// discoveryBackends holds all the registered discovery backends available.
@@ -44,7 +45,7 @@ func RegisterDiscoveryBackend(discBackendName string, factory BackendFactory) {
4445
}
4546

4647
// CreateDiscoveryBackend is used to obtain a discovery backend based on the configurations.
47-
func CreateDiscoveryBackend(node common.Node, config *configuration.Configuration, overlay external.Overlay,
48+
func CreateDiscoveryBackend(node common.Node, config *configuration.Configuration, overlay overlay.Overlay,
4849
client external.Caravela, resourcesMap *resources.Mapping, maxResources resources.Resources) backend.Discovery {
4950
configuredDiscoveryBackend := config.DiscoveryBackend()
5051

node/discovery/offering/offering_discovery.go

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ import (
1212
"github.com/strabox/caravela/node/discovery/offering/supplier"
1313
"github.com/strabox/caravela/node/discovery/offering/trader"
1414
"github.com/strabox/caravela/node/external"
15+
"github.com/strabox/caravela/overlay"
1516
"github.com/strabox/caravela/util"
17+
"github.com/strabox/caravela/util/debug"
1618
"sync"
19+
"unsafe"
1720
)
1821

1922
// Discovery is responsible for dealing with the resource management local and remote.
@@ -22,33 +25,39 @@ type Discovery struct {
2225
common.NodeComponent // Base component
2326

2427
config *configuration.Configuration // System's configurations.
25-
overlay external.Overlay // Overlay component.
28+
overlay overlay.Overlay // Overlay component.
2629
client external.Caravela // Remote caravela's client.
2730

31+
nodeGUID *guid.GUID
2832
resourcesMap *resources.Mapping // GUID<->FreeResources mapping
2933
supplier *supplier.Supplier // Supplier for managing the offers locally and remotely
3034
traders sync.Map // Node can have multiple "virtual" traders in several places of the overlay
3135
}
3236

33-
func NewOfferingDiscovery(node common.Node, config *configuration.Configuration, overlay external.Overlay,
37+
func NewOfferingDiscovery(node common.Node, config *configuration.Configuration, overlay overlay.Overlay,
3438
client external.Caravela, resourcesMap *resources.Mapping, maxResources resources.Resources) (backend.Discovery, error) {
3539

3640
return &Discovery{
3741
config: config,
3842
overlay: overlay,
3943
client: client,
4044

45+
nodeGUID: nil,
4146
resourcesMap: resourcesMap,
4247
supplier: supplier.NewSupplier(node, config, overlay, client, resourcesMap, maxResources),
4348
traders: sync.Map{},
4449
}, nil
4550
}
4651

52+
func (d *Discovery) GUID() string {
53+
return d.nodeGUID.String()
54+
}
55+
4756
// ====================== Local Services (Consumed by other Components) ============================
4857

4958
// Adds a new local "virtual" trader when the overlay notifies its presence.
5059
func (d *Discovery) AddTrader(traderGUID guid.GUID) {
51-
d.supplier.SetNodeGUID(traderGUID)
60+
d.nodeGUID = &traderGUID
5261

5362
newTrader := trader.NewTrader(d.config, d.overlay, d.client, traderGUID, d.resourcesMap)
5463
d.traders.Store(traderGUID.String(), newTrader)
@@ -121,16 +130,16 @@ func (d *Discovery) AdvertiseNeighborOffers(fromTrader, toNeighborTrader, trader
121130
// ======================= External Services (Consumed during simulation ONLY) =========================
122131

123132
// Simulation
124-
func (d *Discovery) NodeInformationSim() (types.Resources, types.Resources, int) {
125-
numActiveOffers := 0
133+
func (d *Discovery) NodeInformationSim() (types.Resources, types.Resources, int, int) {
134+
traderActiveOffers := 0
126135
d.traders.Range(func(_, value interface{}) bool {
127136
currentTrader, ok := value.(*trader.Trader)
128137
if ok {
129-
numActiveOffers = currentTrader.NumActiveOffers()
138+
traderActiveOffers = currentTrader.NumActiveOffers()
130139
}
131140
return true
132141
})
133-
return d.supplier.AvailableResources(), d.supplier.MaximumResources(), numActiveOffers
142+
return d.supplier.AvailableResources(), d.supplier.MaximumResources(), traderActiveOffers, d.supplier.NumActiveOffers()
134143
}
135144

136145
// Simulation
@@ -181,3 +190,25 @@ func (d *Discovery) Stop() {
181190
func (d *Discovery) IsWorking() bool {
182191
return d.Working()
183192
}
193+
194+
// ===============================================================================
195+
// = Debug Methods =
196+
// ===============================================================================
197+
198+
func (d *Discovery) DebugSizeBytes() int {
199+
discoverySizeBytes := unsafe.Sizeof(*d)
200+
discoverySizeBytes += debug.DebugSizeofGUID(d.nodeGUID)
201+
// Resources<->GUIDMap
202+
discoverySizeBytes += 500 // Hack!
203+
// Traders.
204+
d.traders.Range(func(key, value interface{}) bool {
205+
discoverySizeBytes += unsafe.Sizeof(key.(string))
206+
discoverySizeBytes += debug.DebugSizeofString(key.(string))
207+
discoverySizeBytes += unsafe.Sizeof(value.(*trader.Trader))
208+
discoverySizeBytes += uintptr(value.(*trader.Trader).DebugSizeBytes())
209+
return true
210+
})
211+
// Supplier.
212+
discoverySizeBytes += uintptr(d.supplier.DebugSizeBytes())
213+
return int(discoverySizeBytes)
214+
}

0 commit comments

Comments
 (0)