Skip to content

Issue: Potential Deadlock in SyncedEnforcer with Redis Watcher #408

@feymanpaper

Description

@feymanpaper

Problem Description

I've identified a potential deadlock scenario in the SyncedEnforcer implementation when using Redis watcher for policy consistence between multiple nodes. The deadlock occurs due to conflicting lock acquisition orders between different components.

Details

SyncedEnforcer

class SyncedEnforcer:
    def __init__(self, model=None, adapter=None):
        self._e = Enforcer(model, adapter)
        self._rwlock = RWLockWrite()
        self._rl = self._rwlock.gen_rlock()
        self._wl = self._rwlock.gen_wlock()
        self._auto_loading = AtomicBool(False)
        self._auto_loading_thread = None

    def load_policy(self):
        with self._wl:
            return self._e.load_policy()
    
    def add_policy(self, *params):        
        with self._wl:
            return self._e.add_policy(*params)
class InternalEnforcer(CoreEnforcer):
    def _add_policy(self, sec, ptype, rule):
        """adds a rule to the current policy."""
        if self.model.has_policy(sec, ptype, rule):
            return False

        if self.adapter and self.auto_save:
            if self.adapter.add_policy(sec, ptype, rule) is False:
                return False

            if self.watcher and self.auto_notify_watcher:
                if callable(getattr(self.watcher, "update_for_add_policy", None)):
                    self.watcher.update_for_add_policy(sec, ptype, rule)
                else:
                    self.watcher.update()

        rule_added = self.model.add_policy(sec, ptype, rule)

Redis Watcher(https://github.com/officialpycasbin/redis-watcher/tree/master)

Watcher Update Method:

def update(self):
    def func():
        with self.mutex:  # Acquires watcher mutex
            msg = MSG("Update", self.options.local_ID, "", "", "")
            return self.pub_client.publish(self.options.channel, msg.marshal_binary())

    return self.log_record(func)

Redis Watcher Subscribe Method:

def subscribe(self):
    time.sleep(self.sleep)
    try:
        if not self.sub_client:
            rds = self._get_redis_conn()
            self.sub_client = rds.client().pubsub()
        self.sub_client.subscribe(self.options.channel)
        print(f"Waiting for casbin updates... in the worker: {self.options.local_ID}")
        if self.execute_update:
            self.update()
        try:
            for item in self.sub_client.listen():
                if not self.subscribe_event.is_set():
                    self.subscribe_event.set()
                if item is not None and item["type"] == "message":
                    try:
                        with self.mutex:  # Acquires watcher mutex for callback
                            self.callback(str(item))  # Typically calls load_policy()
                    except Exception as listen_exc:
                        print(
                            "Casbin Redis watcher failed sending update to teh callback function "
                            " process due to: {}".format(str(listen_exc))
                        )
                        if self.sub_client:
                            self.sub_client.close()
                        break
        except Exception as sub_exc:
            print("Casbin Redis watcher failed to get message from redis due to {}".format(str(sub_exc)))
            if self.sub_client:
                self.sub_client.close()
    except Exception as redis_exc:
        print("Casbin Redis watcher failed to subscribe due to: {}".format(str(redis_exc)))
    finally:
        if self.sub_client:
            self.sub_client.close()

Deadlock Scenario

Thread A (Subscribe thread):

  • Acquires mutex in subscribe() method (line: with self.mutex:)
  • Calls callback (typically load_policy())
  • Attempts to acquire _wl (write lock) in load_policy() (line: with self._wl:)

Thread B (Policy modification thread):

  • Acquires _wl (write lock) in add_policy() (line: with self._wl:)
  • Calls _e.add_policy() which triggers watcher's update() method
  • Attempts to acquire mutex in update() (line: with self.mutex:)

Circular Dependency:
Thread A holds mutex, waiting for _wl
Thread B holds _wl, waiting for mutex

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions