Skip to content

Commit 44e2576

Browse files
author
tohanov
committed
MicroService handling of last tick, GPUService subscription to events, partial GPU handling of events
1 parent 906101f commit 44e2576

File tree

4 files changed

+154
-109
lines changed

4 files changed

+154
-109
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
<plugin>
3535
<groupId>org.apache.maven.plugins</groupId>
3636
<artifactId>maven-jar-plugin</artifactId>
37-
<version>3.0.2</version>
37+
<version>3.2.0</version>
3838
<configuration>
3939
<archive>
4040
<manifest>

src/main/java/bgu/spl/mics/MicroService.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public abstract class MicroService implements Runnable {
3030
private boolean terminated = false;
3131
private final String name;
3232
private MessageBusImpl messageBus;
33-
private HashMap<Class<? extends Object>, Callback<? extends Object>> callbackHashMap;
33+
private HashMap<Class<? extends Message>, Callback<? extends Message>> callbackHashMap;
3434
private ConcurrentLinkedQueue<Callback<Message>> callbackQueue;
3535

3636

@@ -71,7 +71,7 @@ public MicroService(String name) {
7171
*/
7272
protected final <T, E extends Event<T>> void subscribeEvent(Class<E> type, Callback<E> callback) {
7373
messageBus.subscribeEvent(type, this);
74-
callbackHashMap.put((Class<? extends Object>)type, (Callback<? extends Object>)callback);
74+
callbackHashMap.put((Class<? extends Message>)type, (Callback<? extends Message>)callback);
7575
}
7676

7777

@@ -97,7 +97,7 @@ protected final <T, E extends Event<T>> void subscribeEvent(Class<E> type, Callb
9797
*/
9898
protected final <B extends Broadcast> void subscribeBroadcast(Class<B> type, Callback<B> callback) {
9999
messageBus.subscribeBroadcast(type, this);
100-
callbackHashMap.put((Class<? extends Object>)type, (Callback<? extends Object>)callback);
100+
callbackHashMap.put((Class<? extends Message>)type, (Callback<? extends Message>)callback);
101101
}
102102

103103

@@ -187,20 +187,16 @@ public final void run() {
187187
try {
188188
Message message = messageBus.awaitMessage(this);
189189
Class<? extends Message> messageClass = message.getClass();
190-
/*
191-
if (messageClass != TickBroadcast.class) {
192-
// TODO: put aside the message into an internal queue
193-
}
194-
else { // got a tick
195-
if (((TickBroadcast)message).isLast()) {
196-
terminate();
197-
}
198-
else {
199-
// TODO: operate on the mesages that were put aside to the internal queue
200-
}
190+
191+
// TODO: think if should let the services handle the last tick as well or terminate everything without that
192+
((Callback<Message>)callbackHashMap.get(messageClass)).call(message);
193+
194+
if (message instanceof TickBroadcast && ((TickBroadcast)message).isLast()) {
195+
terminate();
201196
}
202-
*/
203-
callbackHashMap.get(messageClass).call(message);
197+
// else {
198+
// ((Callback<Message>)callbackHashMap.get(messageClass)).call(message);
199+
// }
204200
}
205201
catch (InterruptedException e) {
206202
synchronized (System.out) {
Lines changed: 122 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,53 @@
11
package bgu.spl.mics.application.objects;
22

3+
import java.util.ArrayDeque;
34
import java.util.Collection;
5+
import java.util.Queue;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.ThreadPoolExecutor;
9+
import java.util.concurrent.TimeUnit;
10+
11+
import org.junit.experimental.theories.Theories;
412

513
import bgu.spl.mics.MessageBusImpl;
614
import bgu.spl.mics.application.services.GPUService;
15+
import bgu.spl.mics.application.messages.TestModelEvent;
16+
import bgu.spl.mics.application.messages.TickBroadcast;
17+
import bgu.spl.mics.application.messages.TrainModelEvent;
18+
import bgu.spl.mics.Callback;
19+
import bgu.spl.mics.Event;
720

821
/**
922
* Passive object representing a single GPU.
1023
* Add all the fields described in the assignment as private fields.
1124
* Add fields and methods to this class as you see fit (including public methods and constructors).
1225
*/
1326
public class GPU {
27+
1428
/**
1529
* Enum representing the type of the GPU.
1630
*/
17-
public enum Type {RTX3090, RTX2080, GTX1080}
31+
public enum Type { RTX3090, RTX2080, GTX1080 }
1832

1933

2034
// region According to assignment instructions
2135
private Type type;
22-
private Cluster cluster;
23-
private Model model;
36+
private Model model; // the model being taken care of currently
37+
private static Cluster cluster = Cluster.getInstance();
2438
// endregion According to assignment instructions
2539

2640

2741
// region Added fields
28-
private GPUService service;
42+
// private GPUService service;
2943
// private Collection<DataBatch> processedBatches;
30-
private final byte vRAM;
31-
private int processedBatchesNum;
44+
private Queue<Event<Model>> modelEventsQueue;
45+
private final int trainingDelay; // according to the type of the gpu
46+
private final byte vRAM; // according to the type of the gpu
47+
private int storedProcessedBatchesNumber;
48+
private int ticksToTrainBatch;
49+
private boolean training;
50+
private boolean testing;
3251
// endregion Added fields
3352

3453
/**
@@ -58,7 +77,57 @@ public enum Type {RTX3090, RTX2080, GTX1080}
5877
// }
5978

6079

61-
private Type typeFromString(String _type) {
80+
/**
81+
* @return The type of the GPU
82+
*/
83+
// public Type getType () {
84+
// return type;
85+
// }
86+
87+
88+
// public void runService() { // TODO remove ?
89+
// service.run();
90+
// }
91+
92+
93+
/**
94+
* @return Reference to the corresponding GPUService
95+
*/
96+
// public GPUService getService() {
97+
// return service;
98+
// }
99+
100+
101+
// public Collection<DataBatch> getProcessedBatches() {
102+
// return processedBatches;
103+
// }
104+
105+
106+
// region for serialization from json
107+
public GPU(String _type) {
108+
this.type = typeFromString(_type);
109+
model = null;
110+
storedProcessedBatchesNumber = 0;
111+
112+
switch(type) {
113+
case GTX1080:
114+
vRAM = 8;
115+
trainingDelay = 4;
116+
break;
117+
case RTX2080:
118+
vRAM = 16;
119+
trainingDelay = 2;
120+
break;
121+
default : // RTX3090:
122+
vRAM = 32;
123+
trainingDelay = 1;
124+
}
125+
126+
modelEventsQueue = new ArrayDeque<>();
127+
}
128+
129+
130+
private Type typeFromString(String _type) {
62131
Type returnType;
63132
String uppercaseType = _type.toUpperCase();
64133

@@ -71,75 +140,75 @@ else if (uppercaseType == "RTX2080")
71140

72141
return returnType;
73142
}
143+
// endregion for serialization from json
74144

75145

76-
/**
77-
* @return The type of the GPU
78-
*/
79-
public Type getType () {
80-
return type;
81-
}
146+
public void gotTick() {
147+
// TODO: treat the case of last tick??
82148

149+
// TODO: split to smaller functions (queries + actions)
150+
if (training) { // && storedProcessedBatchesNumber != 0) {
151+
--ticksToTrainBatch;
152+
153+
if (ticksToTrainBatch == 0) { // if finished training batch
154+
ticksToTrainBatch = trainingDelay; // reset the counter
155+
finishTrainingBatch(); //
83156

84-
// public void runService() { // TODO remove ?
85-
// service.run();
86-
// }
157+
if (storedProcessedBatchesNumber == 0) {
158+
training = false; // TODO: maybe incorrect since someone could tell me to test a model while i haven't gotten all the batches from cpu yet
159+
// TODO set Future to done / send more batches to cpu for processing
160+
}
161+
}
162+
}
163+
else if (testing) {
164+
// TODO
165+
}
166+
// else if (storedProcessedBatchesNumber != 0) {
167+
// training = true;
168+
// --ticksToTrainBatch;
169+
170+
// if (trainingDelay == 0) {
171+
// gpu.finishTrainingBatch();
172+
173+
// if (gpu.getProcessedBatchesNum() == 0) {
174+
// // TODO set Future to done
175+
// }
176+
// }
177+
// }
178+
}
87179

88180

89-
/**
90-
* @return Reference to the corresponding GPUService
91-
*/
92-
public GPUService getService() {
93-
return service;
181+
public void gotModelEvent(Event<Model> modelEvent) {
182+
// TODO: put aside to wait for ticks
183+
modelEventsQueue.add(modelEvent);
94184
}
95185

96186

97-
// public Collection<DataBatch> getProcessedBatches() {
98-
// return processedBatches;
187+
// public void gotModelToTest(TestModelEvent testModelEvent) {
188+
// // TODO: put aside to wait for ticks
99189
// }
100190

101191

102192
/**
103193
* @return The amount of vRAM the GPU has
104194
*/
105-
public byte getVRAM() {
106-
return vRAM;
107-
}
195+
// public byte getVRAM() {
196+
// return vRAM;
197+
// }
108198

199+
109200
/**
110201
* @return Number of processed batches currently in training
111202
*/
112-
public int getProcessedBatchesNum() {
113-
return processedBatchesNum;
114-
}
203+
// public int getProcessedBatchesNum() {
204+
// return storedProcessedBatchesNumber;
205+
// }
115206

116207

117208
/**
118209
* @inv processedBatchesNum >= 0
119210
*/
120211
public void finishTrainingBatch() {
121-
--processedBatchesNum;
212+
--storedProcessedBatchesNumber;
122213
}
123-
124-
125-
// region for serialization from json
126-
public GPU(String _type) {
127-
this.type = typeFromString(_type);
128-
// this.service = new GPUService(_name, this);
129-
130-
processedBatchesNum = 0;
131-
132-
switch(type) {
133-
case GTX1080:
134-
vRAM = 8;
135-
break;
136-
case RTX2080:
137-
vRAM = 16;
138-
break;
139-
default : // RTX3090:
140-
vRAM = 32;
141-
}
142-
}
143-
// endregion for serialization from json
144-
145-
}
214+
}

0 commit comments

Comments
 (0)