-
-
Notifications
You must be signed in to change notification settings - Fork 210
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Problem Description
I've identified a potential deadlock scenario in the SyncedEnforcer implementation when using Redis watcher for policy consistence between multiple nodes. The deadlock occurs due to conflicting lock acquisition orders between different components.
Details
SyncedEnforcer
class SyncedEnforcer:
def __init__(self, model=None, adapter=None):
self._e = Enforcer(model, adapter)
self._rwlock = RWLockWrite()
self._rl = self._rwlock.gen_rlock()
self._wl = self._rwlock.gen_wlock()
self._auto_loading = AtomicBool(False)
self._auto_loading_thread = None
def load_policy(self):
with self._wl:
return self._e.load_policy()
def add_policy(self, *params):
with self._wl:
return self._e.add_policy(*params)class InternalEnforcer(CoreEnforcer):
def _add_policy(self, sec, ptype, rule):
"""adds a rule to the current policy."""
if self.model.has_policy(sec, ptype, rule):
return False
if self.adapter and self.auto_save:
if self.adapter.add_policy(sec, ptype, rule) is False:
return False
if self.watcher and self.auto_notify_watcher:
if callable(getattr(self.watcher, "update_for_add_policy", None)):
self.watcher.update_for_add_policy(sec, ptype, rule)
else:
self.watcher.update()
rule_added = self.model.add_policy(sec, ptype, rule)Redis Watcher(https://github.com/officialpycasbin/redis-watcher/tree/master)
Watcher Update Method:
def update(self):
def func():
with self.mutex: # Acquires watcher mutex
msg = MSG("Update", self.options.local_ID, "", "", "")
return self.pub_client.publish(self.options.channel, msg.marshal_binary())
return self.log_record(func)Redis Watcher Subscribe Method:
def subscribe(self):
time.sleep(self.sleep)
try:
if not self.sub_client:
rds = self._get_redis_conn()
self.sub_client = rds.client().pubsub()
self.sub_client.subscribe(self.options.channel)
print(f"Waiting for casbin updates... in the worker: {self.options.local_ID}")
if self.execute_update:
self.update()
try:
for item in self.sub_client.listen():
if not self.subscribe_event.is_set():
self.subscribe_event.set()
if item is not None and item["type"] == "message":
try:
with self.mutex: # Acquires watcher mutex for callback
self.callback(str(item)) # Typically calls load_policy()
except Exception as listen_exc:
print(
"Casbin Redis watcher failed sending update to teh callback function "
" process due to: {}".format(str(listen_exc))
)
if self.sub_client:
self.sub_client.close()
break
except Exception as sub_exc:
print("Casbin Redis watcher failed to get message from redis due to {}".format(str(sub_exc)))
if self.sub_client:
self.sub_client.close()
except Exception as redis_exc:
print("Casbin Redis watcher failed to subscribe due to: {}".format(str(redis_exc)))
finally:
if self.sub_client:
self.sub_client.close()Deadlock Scenario
Thread A (Subscribe thread):
- Acquires mutex in subscribe() method (line: with self.mutex:)
- Calls callback (typically load_policy())
- Attempts to acquire _wl (write lock) in load_policy() (line: with self._wl:)
Thread B (Policy modification thread):
- Acquires _wl (write lock) in add_policy() (line: with self._wl:)
- Calls _e.add_policy() which triggers watcher's update() method
- Attempts to acquire mutex in update() (line: with self.mutex:)
Circular Dependency:
Thread A holds mutex, waiting for _wl
Thread B holds _wl, waiting for mutex
Copilot
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working