Skip to content

Commit 8d26290

Browse files
authored
Merge pull request thingsboard#10255 from thingsboard/fix/actor-system
Fix actor msg broadcast
2 parents 57b5aa5 + 10c13bf commit 8d26290

File tree

10 files changed

+54
-12
lines changed

10 files changed

+54
-12
lines changed

application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import lombok.extern.slf4j.Slf4j;
1919
import org.thingsboard.server.actors.ActorSystemContext;
20+
import org.thingsboard.server.actors.ProcessFailureStrategy;
2021
import org.thingsboard.server.actors.TbActor;
2122
import org.thingsboard.server.actors.TbActorCtx;
2223
import org.thingsboard.server.actors.TbActorException;
@@ -88,7 +89,7 @@ protected boolean doProcess(TbActorMsg msg) {
8889
case APP_INIT_MSG:
8990
break;
9091
case PARTITION_CHANGE_MSG:
91-
ctx.broadcastToChildren(msg);
92+
ctx.broadcastToChildren(msg, true);
9293
break;
9394
case COMPONENT_LIFE_CYCLE_MSG:
9495
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
@@ -219,6 +220,12 @@ private void onToEdgeSessionMsg(EdgeSessionMsg msg) {
219220
}
220221
}
221222

223+
@Override
224+
public ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {
225+
log.error("Failed to process msg: {}", msg, t);
226+
return doProcessFailure(t);
227+
}
228+
222229
public static class ActorCreator extends ContextBasedCreator {
223230

224231
public ActorCreator(ActorSystemContext context) {

application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,19 @@ public RuleChainManagerActor(ActorSystemContext systemContext, TenantId tenantId
5959
}
6060

6161
protected void initRuleChains() {
62-
ruleChainsInitialized = true;
62+
log.debug("[{}] Initializing rule chains", tenantId);
6363
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
6464
RuleChainId ruleChainId = ruleChain.getId();
6565
log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
6666
TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
6767
visit(ruleChain, actorRef);
6868
log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
6969
}
70+
ruleChainsInitialized = true;
7071
}
7172

7273
protected void destroyRuleChains() {
74+
log.debug("[{}] Destroying rule chains", tenantId);
7375
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
7476
ctx.stop(new TbEntityActorId(ruleChain.getId()));
7577
}

application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public boolean process(TbActorMsg msg) {
4747
protected abstract boolean doProcess(TbActorMsg msg);
4848

4949
@Override
50-
public ProcessFailureStrategy onProcessFailure(Throwable t) {
51-
log.debug("[{}] Processing failure: ", getActorRef().getActorId(), t);
50+
public ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {
51+
log.debug("[{}] Processing failure for msg {}", getActorRef().getActorId(), msg, t);
5252
return doProcessFailure(t);
5353
}
5454

application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
122122
@Override
123123
protected void onTbApplicationEvent(PartitionChangeEvent event) {
124124
log.info("Received partition change event.");
125-
this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceType()));
125+
appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceType()));
126126
}
127127

128128
@Override

application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import lombok.extern.slf4j.Slf4j;
1919
import org.thingsboard.server.actors.ActorSystemContext;
20+
import org.thingsboard.server.actors.ProcessFailureStrategy;
2021
import org.thingsboard.server.actors.TbActor;
2122
import org.thingsboard.server.actors.TbActorCtx;
2223
import org.thingsboard.server.actors.TbActorException;
@@ -183,7 +184,7 @@ private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
183184
return;
184185
}
185186
TbMsg tbMsg = msg.getMsg();
186-
if (getApiUsageState().isReExecEnabled() && ruleChainsInitialized) {
187+
if (getApiUsageState().isReExecEnabled()) {
187188
if (tbMsg.getRuleChainId() == null) {
188189
if (getRootChainActor() != null) {
189190
getRootChainActor().tell(msg);
@@ -207,7 +208,7 @@ private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
207208
}
208209

209210
private void onRuleChainMsg(RuleChainAwareMsg msg) {
210-
if (getApiUsageState().isReExecEnabled() && ruleChainsInitialized) {
211+
if (getApiUsageState().isReExecEnabled()) {
211212
getOrCreateActor(msg.getRuleChainId()).tell(msg);
212213
}
213214
}
@@ -319,6 +320,12 @@ private ApiUsageState getApiUsageState() {
319320
return apiUsageState;
320321
}
321322

323+
@Override
324+
public ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {
325+
log.error("[{}] Failed to process msg: {}", tenantId, msg, t);
326+
return doProcessFailure(t);
327+
}
328+
322329
public static class ActorCreator extends ContextBasedCreator {
323330

324331
private final TenantId tenantId;

common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,29 @@ private void tell(TbActorId target, TbActorMsg actorMsg, boolean highPriority) {
156156

157157
@Override
158158
public void broadcastToChildren(TbActorId parent, TbActorMsg msg) {
159-
broadcastToChildren(parent, id -> true, msg);
159+
broadcastToChildren(parent, msg, false);
160+
}
161+
162+
@Override
163+
public void broadcastToChildren(TbActorId parent, TbActorMsg msg, boolean highPriority) {
164+
broadcastToChildren(parent, id -> true, msg, highPriority);
160165
}
161166

162167
@Override
163168
public void broadcastToChildren(TbActorId parent, Predicate<TbActorId> childFilter, TbActorMsg msg) {
169+
broadcastToChildren(parent, childFilter, msg, false);
170+
}
171+
172+
private void broadcastToChildren(TbActorId parent, Predicate<TbActorId> childFilter, TbActorMsg msg, boolean highPriority) {
164173
Set<TbActorId> children = parentChildMap.get(parent);
165174
if (children != null) {
166-
children.stream().filter(childFilter).forEach(id -> tell(id, msg));
175+
children.stream().filter(childFilter).forEach(id -> {
176+
try {
177+
tell(id, msg, highPriority);
178+
} catch (TbActorNotRegisteredException e) {
179+
log.warn("Actor is missing for {}", id);
180+
}
181+
});
167182
}
168183
}
169184

@@ -190,6 +205,8 @@ public void stop(TbActorId actorId) {
190205
stop(child);
191206
}
192207
}
208+
parentChildMap.values().forEach(parentChildren -> parentChildren.remove(actorId));
209+
193210
TbActorMailbox mailbox = actors.remove(actorId);
194211
if (mailbox != null) {
195212
mailbox.destroy(null);

common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ default InitFailureStrategy onInitFailure(int attempt, Throwable t) {
3434
return InitFailureStrategy.retryWithDelay(5000L * attempt);
3535
}
3636

37-
default ProcessFailureStrategy onProcessFailure(Throwable t) {
37+
default ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {
3838
if (t instanceof Error) {
3939
return ProcessFailureStrategy.stop();
4040
} else {

common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public interface TbActorCtx extends TbActorRef {
3636

3737
void broadcastToChildren(TbActorMsg msg);
3838

39+
void broadcastToChildren(TbActorMsg msg, boolean highPriority);
40+
3941
void broadcastToChildrenByType(TbActorMsg msg, EntityType entityType);
4042

4143
void broadcastToChildren(TbActorMsg msg, Predicate<TbActorId> childFilter);

common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private void processMailbox() {
160160
destroy(updateException.getCause());
161161
} catch (Throwable t) {
162162
log.debug("[{}] Failed to process message: {}", selfId, msg, t);
163-
ProcessFailureStrategy strategy = actor.onProcessFailure(t);
163+
ProcessFailureStrategy strategy = actor.onProcessFailure(msg, t);
164164
if (strategy.isStop()) {
165165
system.stop(selfId);
166166
}
@@ -190,7 +190,12 @@ public void tell(TbActorId target, TbActorMsg actorMsg) {
190190

191191
@Override
192192
public void broadcastToChildren(TbActorMsg msg) {
193-
system.broadcastToChildren(selfId, msg);
193+
broadcastToChildren(msg, false);
194+
}
195+
196+
@Override
197+
public void broadcastToChildren(TbActorMsg msg, boolean highPriority) {
198+
system.broadcastToChildren(selfId, msg, highPriority);
194199
}
195200

196201
@Override

common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public interface TbActorSystem {
4848

4949
void broadcastToChildren(TbActorId parent, TbActorMsg msg);
5050

51+
void broadcastToChildren(TbActorId parent, TbActorMsg msg, boolean highPriority);
52+
5153
void broadcastToChildren(TbActorId parent, Predicate<TbActorId> childFilter, TbActorMsg msg);
5254

5355
List<TbActorId> filterChildren(TbActorId parent, Predicate<TbActorId> childFilter);

0 commit comments

Comments
 (0)