Skip to content

Commit

Permalink
refactor(event_buffer): A trial to add type hints for `EventBuffer.py…
Browse files Browse the repository at this point in the history
…` and `Utils.py`
  • Loading branch information
ARCJ137442 committed Jul 25, 2024
1 parent 7209145 commit 8ac6ec7
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 53 deletions.
108 changes: 69 additions & 39 deletions pynars/NARS/DataStructures/MC/EventBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,33 @@
from pynars.NARS.DataStructures.MC import Utils
from pynars.NARS.DataStructures.MC.OutputBuffer import Reaction
from pynars.NARS.DataStructures.MC.Utils import PriorityQueue, BufferTask, satisfaction_level, preprocessing
from pynars.NARS.DataStructures import Memory
from pynars.Narsese import Compound, Judgement, Task, Interval, parser, Term, Truth, Copula, Statement
from typing import Iterable, List, Optional, Tuple


class Anticipation:

def __init__(self, task, prediction):
matched: bool
task: Task
prediction: 'PredictiveImplication'

def __init__(self, task: Task, prediction: 'PredictiveImplication') -> None:
self.matched = False
self.task = task
self.prediction = prediction


class PredictiveImplication:

def __init__(self, condition, interval, conclusion, task):
condition: Term
interval: Interval
conclusion: Term
to_memory_cooldown: int
expiration: int
task: Task

def __init__(self, condition: Term, interval: Interval, conclusion: Term, task: Task) -> None:
self.condition = condition
"""
As explained the conceptual design, "+1, +2" cannot be used in buffers, thus the interval is only kept in the
Expand All @@ -34,7 +47,7 @@ def __init__(self, condition, interval, conclusion, task):
self.expiration = 0
self.task = task

def get_conclusion(self, condition_task):
def get_conclusion(self, condition_task: 'BufferTask') -> Tuple[Optional[Interval], Optional[Task]]:
# when "A" is matched with "A =/> B", return B with truth deduction
truth = Truth_deduction(self.task.truth, condition_task.task.truth)
if truth.c < 0.3:
Expand All @@ -50,26 +63,41 @@ class Slot:
It contains 3 parts: 1) events observed, 2) anticipations made, 3) operations to do.
"""

def __init__(self, num_events, num_anticipations, num_operations):
events: PriorityQueue[BufferTask]
anticipations: List[Anticipation]
num_anticipations: int
operations: list # TODO: operation types
num_operations: int

def __init__(self, num_events: int, num_anticipations: int, num_operations: int) -> None:
self.events = PriorityQueue(num_events)
self.anticipations = []
self.num_anticipations = num_anticipations
self.operations = []
self.num_operations = num_operations

def push(self, item, value):
def push(self, item: BufferTask, value: float):
self.events.push(item, value)

def pop(self):
def pop(self) -> Tuple[BufferTask, float]:
return self.events.pop()

def random_pop(self):
def random_pop(self) -> Optional[BufferTask]:
return self.events.random_pop()


class EventBuffer:

def __init__(self, num_slot, num_events, num_anticipations, num_operations, num_predictive_implications, N=1):
num_events: int
num_anticipations: int
num_operations: int
slots: List[Slot]
curr: int
predictive_implications: PriorityQueue[PredictiveImplication]
reactions: PriorityQueue[Reaction]
N: int

def __init__(self, num_slot: int, num_events: int, num_anticipations: int, num_operations: int, num_predictive_implications: int, N=1) -> None:
# num slot is the number of slots on one side. If num_slot is 2, there are 1 (in the middle) + 2*2=5 slots
self.num_events = num_events
self.num_anticipations = num_anticipations
Expand All @@ -80,13 +108,13 @@ def __init__(self, num_slot, num_events, num_anticipations, num_operations, num_
self.reactions = PriorityQueue(num_predictive_implications * 5)
self.N = N

def push(self, tasks, memory):
def push(self, tasks: Iterable[Task], memory: Memory) -> None:
for task in tasks:
buffer_task = BufferTask(task)
buffer_task.preprocess_effect = Utils.preprocessing(task, memory)
self.slots[self.curr].push(buffer_task, buffer_task.priority)

def pop(self):
def pop(self) -> List[Task]:
ret = []
for _ in range(self.N):
if len(self.slots[self.curr].events) != 0:
Expand All @@ -95,7 +123,7 @@ def pop(self):
return ret

@staticmethod
def contemporary_composition(events):
def contemporary_composition(events: List[Task]) -> Task:
# according to the conceptual design, currently only 2-compounds are allowed,
# though in the future, we may have compounds with many components,

Expand All @@ -120,13 +148,13 @@ def contemporary_composition(events):
budget = Budget_merge(budget, each.budget)

# sentence
sentence = Judgement(term, stamp, truth)
sentence = Judgement(term, stamp, truth) # ! Argument 1 to "Judgement" has incompatible type "Type[Compound]"; expected "Term" (Mypy arg-type)

# task
return Task(sentence, budget)

@staticmethod
def sequential_composition(event_1, interval, event_2):
def sequential_composition(event_1: Task, interval: Interval, event_2: Task) -> Task:
# according to the conceptual design, we currently only have "event_1, interval, event_2" schema,
# though in the future this may also change, but it is too early to decide here

Expand All @@ -145,7 +173,7 @@ def sequential_composition(event_1, interval, event_2):
return Task(sentence, budget)

@staticmethod
def generate_prediction_util(event_1, interval, event_2):
def generate_prediction_util(event_1: Task, interval: Interval, event_2: Task) -> PredictiveImplication:
if interval != 0:
copula = Copula.PredictiveImplication # =/>
else:
Expand All @@ -169,7 +197,7 @@ def generate_prediction_util(event_1, interval, event_2):
# predictive implication
return PredictiveImplication(event_1.term, interval, event_2.term, task)

def compound_composition(self, memory):
def compound_composition(self, memory: Memory) -> None:
"""
After the initial composition, pick the one with the highest priority in the current slot.
Compose it with all other events in the current slot and the previous max events.
Expand All @@ -184,15 +212,15 @@ def compound_composition(self, memory):
curr_max.is_component = 1
curr_composition.append(self.contemporary_composition([curr_max.task, curr_remaining[-1].task]))

previous_max = []
previous_max: List[Optional[BufferTask]] = []
previous_composition = []
for i in range(self.curr):
if len(self.slots[i].events) != 0:
tmp, _ = self.slots[i].pop()
previous_max.append(tmp)
# don't change previous max's "is_component"
curr_max.is_component = 1
previous_composition.append(self.sequential_composition(previous_max[-1].task,
previous_composition.append(self.sequential_composition(previous_max[-1].task, # ? should change this `previous_max[-1]` to `tmp` to assume the non-none type?
Interval(self.curr - i), curr_max.task))
else:
previous_max.append(None)
Expand All @@ -208,12 +236,12 @@ def compound_composition(self, memory):
# add all compositions to the current slot
self.push(curr_composition + previous_composition, memory)

def check_anticipation(self, memory):
def check_anticipation(self, memory: Memory) -> None:
"""
Check all anticipations, award or punish the corresponding predictive implications.
If an anticipation does not even exist, apply the lowest satisfaction.
"""
prediction_award_penalty = []
prediction_award_penalty: List[Tuple[PredictiveImplication, float]] = []
checked_buffer_tasks = []
while len(self.slots[self.curr].events) != 0:
buffer_task, _ = self.slots[self.curr].pop()
Expand All @@ -224,27 +252,27 @@ def check_anticipation(self, memory):
each_anticipation.matched = True
buffer_task.task = revision(each_anticipation.task, buffer_task.task)
satisfaction = 1 - satisfaction_level(each_anticipation.task.truth, buffer_task.task.truth)
prediction_award_penalty.append([each_anticipation.prediction, satisfaction])
prediction_award_penalty.append((each_anticipation.prediction, satisfaction))
checked_buffer_tasks.append(buffer_task)

# if there are some unmatched anticipations, apply the lowest satisfaction
for each_anticipation in self.slots[self.curr].anticipations:
if not each_anticipation.matched:
prediction_award_penalty.append([each_anticipation.prediction, 0])
prediction_award_penalty.append((each_anticipation.prediction, 0))

print("prediction_award_penalty", prediction_award_penalty)

# put all buffer tasks back, some evaluations may change
for each in checked_buffer_tasks:
self.slots[self.curr].push(each, each.priority)
for each_task in checked_buffer_tasks: # ! have to use different variable names to avoid type conflicts
self.slots[self.curr].push(each_task, each_task.priority)

# update the predictive implications
for each in prediction_award_penalty:
each[0].task = revision(each[0].task, parser.parse(each[0].task.term.word + ". %" + str(each[1]) + ";0.9%"))
self.predictive_implications.edit(each[0], each[0].task.truth.e * preprocessing(each[0].task, memory),
lambda x: x.task.term)

def predictive_implication_application(self, memory):
def predictive_implication_application(self, memory: Memory) -> None:
"""
Check all predictive implications, whether some of them can fire.
If so, calculate the corresponding task of the conclusion and create it as an anticipation in the corresponding
Expand All @@ -256,9 +284,9 @@ def predictive_implication_application(self, memory):
implication, _ = self.predictive_implications.pop()
applied = False
for each_event in self.slots[self.curr].events.pq:
if implication.condition == each_event[1].task.term:
if implication.condition == each_event[1].task.term: # ! Unsupported operand types for == ("Term" and "Term") (Mypy operator)
interval, conclusion = implication.get_conclusion(each_event[1])
if interval is None:
if interval is None or conclusion is None: # ! if the interval isn't `None`, same to the conclusion
break
applied = True
implication.expiration = max(0, implication.expiration - 1)
Expand All @@ -273,7 +301,7 @@ def predictive_implication_application(self, memory):
self.predictive_implications.push(each, each.task.truth.e * preprocessing(each.task, memory) *
(1 / (1 + each.expiration)))

def to_memory_predictive_implication(self, memory, threshold_f=0.9, threshold_c=0.8, default_cooldown=100):
def to_memory_predictive_implication(self, memory: Memory, threshold_f: float=0.9, threshold_c: float=0.8, default_cooldown: int=100) -> List[Reaction]:
# when a predictive implication reaches a relatively high truth value, it will be forwarded to the memory
# (not the next level)
# this does not mean it is removed from the predictive implication pq
Expand All @@ -295,6 +323,7 @@ def to_memory_predictive_implication(self, memory, threshold_f=0.9, threshold_c=
# I have to cheat.
# ==================================================================================================

# ? for `each[1].task.term.predicate`, should assume the `each[1]`'s term is a statement?
if each[1].task.term.predicate.word == "<{SELF}-->[good]>":

if (each[1].task.term.subject.is_compound and each[1].task.term.subject.components[-1].word[0]
Expand Down Expand Up @@ -325,7 +354,7 @@ def to_memory_predictive_implication(self, memory, threshold_f=0.9, threshold_c=
return reactions
# ==============================================================================================================

def local_evaluation(self, memory, threshold_f=0.8, threshold_c=0.9, default_cooldown=100):
def local_evaluation(self, memory: Memory, threshold_f: float=0.8, threshold_c: float=0.9, default_cooldown: int=100) -> List[Reaction]:
self.check_anticipation(memory)
self.predictive_implication_application(memory)
# cheating
Expand All @@ -337,7 +366,7 @@ def local_evaluation(self, memory, threshold_f=0.8, threshold_c=0.9, default_coo
# original
# self.to_memory_predictive_implication(memory, threshold_f, threshold_c, default_cooldown)

def memory_based_evaluation(self, memory):
def memory_based_evaluation(self, memory: Memory) -> None:
evaluated_buffer_tasks = []
while len(self.slots[self.curr].events) != 0:
buffer_task, _ = self.slots[self.curr].pop()
Expand All @@ -348,12 +377,12 @@ def memory_based_evaluation(self, memory):
self.slots[self.curr].push(each, each.priority)

@staticmethod
def prediction_revision(existed_prediction, new_prediction):
def prediction_revision(existed_prediction: PredictiveImplication, new_prediction: PredictiveImplication) -> PredictiveImplication:
existed_prediction.task = revision(existed_prediction.task, new_prediction.task)
existed_prediction.expiration = max(0, existed_prediction.expiration - 1)
return existed_prediction

def prediction_generation(self, max_events_per_slot, memory):
def prediction_generation(self, max_events_per_slot: int, memory: Memory) -> None:
"""
For each slot, randomly pop "max events per slot" buffer tasks to generate predictions.
Currently, concurrent predictive implications (==>) are not supported.
Expand All @@ -376,31 +405,32 @@ def prediction_generation(self, max_events_per_slot, memory):
for each_curr_event in selected_buffer_tasks[-1]:
for each_previous_event in selected_buffer_tasks[i]:
if each_curr_event is not None and each_previous_event is not None:
tmp = self.generate_prediction_util(each_previous_event.task, Interval(self.curr - i),
tmp2 = self.generate_prediction_util(each_previous_event.task, Interval(self.curr - i), # ! for type stability, it may need to be renamed to `tmp2`
each_curr_event.task)
# if tmp.task.truth.e * preprocessing(tmp.task, memory) <= 0.05:
# if tmp2.task.truth.e * preprocessing(tmp2.task, memory) <= 0.05:
# continue
existed = None
for j in range(len(self.predictive_implications)):
if self.predictive_implications.pq[j][1].task.term == tmp.task.term:
if self.predictive_implications.pq[j][1].task.term == tmp2.task.term:
existed = self.predictive_implications.pq.pop(j)
break
if existed is not None:
tmp = self.prediction_revision(existed[1], tmp)
tmp2 = self.prediction_revision(existed[1], tmp2)

self.predictive_implications.push(tmp, tmp.task.truth.e * preprocessing(tmp.task, memory))
self.predictive_implications.push(tmp2, tmp2.task.truth.e * preprocessing(tmp2.task, memory))

# after the prediction generation, put the randomly selected buffer tasks back
for i in range(self.curr + 1):
for each in selected_buffer_tasks[i]:
if each is not None:
self.slots[i].push(each, each.priority)

def slots_cycle(self):
def slots_cycle(self) -> None:
self.slots = self.slots[1:] + [Slot(self.num_events, self.num_anticipations, self.num_operations)]

def buffer_cycle(self, tasks, memory, max_events_per_slot=5, threshold_f=0.8, threshold_c=0.9,
default_cooldown=10):
def buffer_cycle(self, tasks: Iterable[Task], memory: Memory,
max_events_per_slot: int=5, threshold_f: float=0.8, threshold_c: float=0.9,
default_cooldown: int=10) -> Tuple[List[Reaction], List[Task]]:
# put all tasks to the current slot
self.push(tasks, memory)
self.compound_composition(memory)
Expand Down
Loading

0 comments on commit 8ac6ec7

Please sign in to comment.