Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A trial to add type hints for EventBuffer.py and Utils.py #111

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 76 additions & 46 deletions pynars/NARS/DataStructures/MC/EventBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@
from pynars.NARS.DataStructures.MC import Utils
from pynars.NARS.DataStructures.MC.OutputBuffer import Reaction
from pynars.NARS.DataStructures.MC.Utils import PriorityQueue, BufferTask, satisfaction_level, preprocessing
from pynars.NARS.DataStructures import Memory
from pynars.Narsese import Compound, Judgement, Task, Interval, parser, Term, Truth, Copula, Statement


class Anticipation:

def __init__(self, task, prediction):
self.matched = False
self.task = task
self.prediction = prediction
from typing import Iterable, List, Optional, Tuple


class PredictiveImplication:

def __init__(self, condition, interval, conclusion, task):
condition: Term
interval: Interval
conclusion: Term
to_memory_cooldown: int
expiration: int
task: Task

def __init__(self, condition: Term, interval: Interval, conclusion: Term, task: Task) -> None:
self.condition = condition
"""
As explained the conceptual design, "+1, +2" cannot be used in buffers, thus the interval is only kept in the
Expand All @@ -34,7 +35,7 @@ def __init__(self, condition, interval, conclusion, task):
self.expiration = 0
self.task = task

def get_conclusion(self, condition_task):
def get_conclusion(self, condition_task: BufferTask) -> Tuple[Optional[Interval], Optional[Task]]:
# when "A" is matched with "A =/> B", return B with truth deduction
truth = Truth_deduction(self.task.truth, condition_task.task.truth)
if truth.c < 0.3:
Expand All @@ -45,31 +46,58 @@ def get_conclusion(self, condition_task):
return self.interval, task


class Anticipation:

matched: bool
task: Task
prediction: PredictiveImplication

def __init__(self, task: Task, prediction: PredictiveImplication) -> None:
self.matched = False
self.task = task
self.prediction = prediction


class Slot:
"""
It contains 3 parts: 1) events observed, 2) anticipations made, 3) operations to do.
"""

def __init__(self, num_events, num_anticipations, num_operations):
events: PriorityQueue[BufferTask]
anticipations: List[Anticipation]
num_anticipations: int
operations: list # TODO: operation types
num_operations: int

def __init__(self, num_events: int, num_anticipations: int, num_operations: int) -> None:
self.events = PriorityQueue(num_events)
self.anticipations = []
self.num_anticipations = num_anticipations
self.operations = []
self.num_operations = num_operations

def push(self, item, value):
def push(self, item: BufferTask, value: float):
self.events.push(item, value)

def pop(self):
def pop(self) -> Tuple[BufferTask, float]:
return self.events.pop()

def random_pop(self):
def random_pop(self) -> Optional[BufferTask]:
return self.events.random_pop()


class EventBuffer:

def __init__(self, num_slot, num_events, num_anticipations, num_operations, num_predictive_implications, N=1):
num_events: int
num_anticipations: int
num_operations: int
slots: List[Slot]
curr: int
predictive_implications: PriorityQueue[PredictiveImplication]
reactions: PriorityQueue[Reaction]
N: int

def __init__(self, num_slot: int, num_events: int, num_anticipations: int, num_operations: int, num_predictive_implications: int, N=1) -> None:
# num slot is the number of slots on one side. If num_slot is 2, there are 1 (in the middle) + 2*2=5 slots
self.num_events = num_events
self.num_anticipations = num_anticipations
Expand All @@ -80,13 +108,13 @@ def __init__(self, num_slot, num_events, num_anticipations, num_operations, num_
self.reactions = PriorityQueue(num_predictive_implications * 5)
self.N = N

def push(self, tasks, memory):
def push(self, tasks: Iterable[Task], memory: Memory) -> None:
for task in tasks:
buffer_task = BufferTask(task)
buffer_task.preprocess_effect = Utils.preprocessing(task, memory)
self.slots[self.curr].push(buffer_task, buffer_task.priority)

def pop(self):
def pop(self) -> List[Task]:
ret = []
for _ in range(self.N):
if len(self.slots[self.curr].events) != 0:
Expand All @@ -95,7 +123,7 @@ def pop(self):
return ret

@staticmethod
def contemporary_composition(events):
def contemporary_composition(events: List[Task]) -> Task:
# according to the conceptual design, currently only 2-compounds are allowed,
# though in the future, we may have compounds with many components,

Expand All @@ -120,13 +148,13 @@ def contemporary_composition(events):
budget = Budget_merge(budget, each.budget)

# sentence
sentence = Judgement(term, stamp, truth)
sentence = Judgement(term, stamp, truth) # ! Argument 1 to "Judgement" has incompatible type "Type[Compound]"; expected "Term" (Mypy arg-type)

# task
return Task(sentence, budget)

@staticmethod
def sequential_composition(event_1, interval, event_2):
def sequential_composition(event_1: Task, interval: Interval, event_2: Task) -> Task:
# according to the conceptual design, we currently only have "event_1, interval, event_2" schema,
# though in the future this may also change, but it is too early to decide here

Expand All @@ -145,7 +173,7 @@ def sequential_composition(event_1, interval, event_2):
return Task(sentence, budget)

@staticmethod
def generate_prediction_util(event_1, interval, event_2):
def generate_prediction_util(event_1: Task, interval: Interval, event_2: Task) -> PredictiveImplication:
if interval != 0:
copula = Copula.PredictiveImplication # =/>
else:
Expand All @@ -169,7 +197,7 @@ def generate_prediction_util(event_1, interval, event_2):
# predictive implication
return PredictiveImplication(event_1.term, interval, event_2.term, task)

def compound_composition(self, memory):
def compound_composition(self, memory: Memory) -> None:
"""
After the initial composition, pick the one with the highest priority in the current slot.
Compose it with all other events in the current slot and the previous max events.
Expand All @@ -184,15 +212,15 @@ def compound_composition(self, memory):
curr_max.is_component = 1
curr_composition.append(self.contemporary_composition([curr_max.task, curr_remaining[-1].task]))

previous_max = []
previous_max: List[Optional[BufferTask]] = []
previous_composition = []
for i in range(self.curr):
if len(self.slots[i].events) != 0:
tmp, _ = self.slots[i].pop()
previous_max.append(tmp)
# don't change previous max's "is_component"
curr_max.is_component = 1
previous_composition.append(self.sequential_composition(previous_max[-1].task,
previous_composition.append(self.sequential_composition(previous_max[-1].task, # ? should change this `previous_max[-1]` to `tmp` to assume the non-none type?
Interval(self.curr - i), curr_max.task))
else:
previous_max.append(None)
Expand All @@ -208,12 +236,12 @@ def compound_composition(self, memory):
# add all compositions to the current slot
self.push(curr_composition + previous_composition, memory)

def check_anticipation(self, memory):
def check_anticipation(self, memory: Memory) -> None:
"""
Check all anticipations, award or punish the corresponding predictive implications.
If an anticipation does not even exist, apply the lowest satisfaction.
"""
prediction_award_penalty = []
prediction_award_penalty: List[Tuple[PredictiveImplication, float]] = []
checked_buffer_tasks = []
while len(self.slots[self.curr].events) != 0:
buffer_task, _ = self.slots[self.curr].pop()
Expand All @@ -224,27 +252,27 @@ def check_anticipation(self, memory):
each_anticipation.matched = True
buffer_task.task = revision(each_anticipation.task, buffer_task.task)
satisfaction = 1 - satisfaction_level(each_anticipation.task.truth, buffer_task.task.truth)
prediction_award_penalty.append([each_anticipation.prediction, satisfaction])
prediction_award_penalty.append((each_anticipation.prediction, satisfaction))
checked_buffer_tasks.append(buffer_task)

# if there are some unmatched anticipations, apply the lowest satisfaction
for each_anticipation in self.slots[self.curr].anticipations:
if not each_anticipation.matched:
prediction_award_penalty.append([each_anticipation.prediction, 0])
prediction_award_penalty.append((each_anticipation.prediction, 0))

print("prediction_award_penalty", prediction_award_penalty)

# put all buffer tasks back, some evaluations may change
for each in checked_buffer_tasks:
self.slots[self.curr].push(each, each.priority)
for each_task in checked_buffer_tasks: # ! have to use different variable names to avoid type conflicts
self.slots[self.curr].push(each_task, each_task.priority)

# update the predictive implications
for each in prediction_award_penalty:
each[0].task = revision(each[0].task, parser.parse(each[0].task.term.word + ". %" + str(each[1]) + ";0.9%"))
self.predictive_implications.edit(each[0], each[0].task.truth.e * preprocessing(each[0].task, memory),
lambda x: x.task.term)

def predictive_implication_application(self, memory):
def predictive_implication_application(self, memory: Memory) -> None:
"""
Check all predictive implications, whether some of them can fire.
If so, calculate the corresponding task of the conclusion and create it as an anticipation in the corresponding
Expand All @@ -256,9 +284,9 @@ def predictive_implication_application(self, memory):
implication, _ = self.predictive_implications.pop()
applied = False
for each_event in self.slots[self.curr].events.pq:
if implication.condition == each_event[1].task.term:
if implication.condition == each_event[1].task.term: # ! Unsupported operand types for == ("Term" and "Term") (Mypy operator)
interval, conclusion = implication.get_conclusion(each_event[1])
if interval is None:
if interval is None or conclusion is None: # ! if the interval isn't `None`, same to the conclusion
break
applied = True
implication.expiration = max(0, implication.expiration - 1)
Expand All @@ -273,7 +301,7 @@ def predictive_implication_application(self, memory):
self.predictive_implications.push(each, each.task.truth.e * preprocessing(each.task, memory) *
(1 / (1 + each.expiration)))

def to_memory_predictive_implication(self, memory, threshold_f=0.9, threshold_c=0.8, default_cooldown=100):
def to_memory_predictive_implication(self, memory: Memory, threshold_f: float=0.9, threshold_c: float=0.8, default_cooldown: int=100) -> List[Reaction]:
# when a predictive implication reaches a relatively high truth value, it will be forwarded to the memory
# (not the next level)
# this does not mean it is removed from the predictive implication pq
Expand All @@ -295,6 +323,7 @@ def to_memory_predictive_implication(self, memory, threshold_f=0.9, threshold_c=
# I have to cheat.
# ==================================================================================================

# ? for `each[1].task.term.predicate`, should assume the `each[1]`'s term is a statement?
if each[1].task.term.predicate.word == "<{SELF}-->[good]>":

if (each[1].task.term.subject.is_compound and each[1].task.term.subject.components[-1].word[0]
Expand Down Expand Up @@ -325,7 +354,7 @@ def to_memory_predictive_implication(self, memory, threshold_f=0.9, threshold_c=
return reactions
# ==============================================================================================================

def local_evaluation(self, memory, threshold_f=0.8, threshold_c=0.9, default_cooldown=100):
def local_evaluation(self, memory: Memory, threshold_f: float=0.8, threshold_c: float=0.9, default_cooldown: int=100) -> List[Reaction]:
self.check_anticipation(memory)
self.predictive_implication_application(memory)
# cheating
Expand All @@ -337,7 +366,7 @@ def local_evaluation(self, memory, threshold_f=0.8, threshold_c=0.9, default_coo
# original
# self.to_memory_predictive_implication(memory, threshold_f, threshold_c, default_cooldown)

def memory_based_evaluation(self, memory):
def memory_based_evaluation(self, memory: Memory) -> None:
evaluated_buffer_tasks = []
while len(self.slots[self.curr].events) != 0:
buffer_task, _ = self.slots[self.curr].pop()
Expand All @@ -348,12 +377,12 @@ def memory_based_evaluation(self, memory):
self.slots[self.curr].push(each, each.priority)

@staticmethod
def prediction_revision(existed_prediction, new_prediction):
def prediction_revision(existed_prediction: PredictiveImplication, new_prediction: PredictiveImplication) -> PredictiveImplication:
existed_prediction.task = revision(existed_prediction.task, new_prediction.task)
existed_prediction.expiration = max(0, existed_prediction.expiration - 1)
return existed_prediction

def prediction_generation(self, max_events_per_slot, memory):
def prediction_generation(self, max_events_per_slot: int, memory: Memory) -> None:
"""
For each slot, randomly pop "max events per slot" buffer tasks to generate predictions.
Currently, concurrent predictive implications (==>) are not supported.
Expand All @@ -376,31 +405,32 @@ def prediction_generation(self, max_events_per_slot, memory):
for each_curr_event in selected_buffer_tasks[-1]:
for each_previous_event in selected_buffer_tasks[i]:
if each_curr_event is not None and each_previous_event is not None:
tmp = self.generate_prediction_util(each_previous_event.task, Interval(self.curr - i),
tmp2 = self.generate_prediction_util(each_previous_event.task, Interval(self.curr - i), # ! for type stability, it may need to be renamed to `tmp2`
each_curr_event.task)
# if tmp.task.truth.e * preprocessing(tmp.task, memory) <= 0.05:
# if tmp2.task.truth.e * preprocessing(tmp2.task, memory) <= 0.05:
# continue
existed = None
for j in range(len(self.predictive_implications)):
if self.predictive_implications.pq[j][1].task.term == tmp.task.term:
if self.predictive_implications.pq[j][1].task.term == tmp2.task.term:
existed = self.predictive_implications.pq.pop(j)
break
if existed is not None:
tmp = self.prediction_revision(existed[1], tmp)
tmp2 = self.prediction_revision(existed[1], tmp2)

self.predictive_implications.push(tmp, tmp.task.truth.e * preprocessing(tmp.task, memory))
self.predictive_implications.push(tmp2, tmp2.task.truth.e * preprocessing(tmp2.task, memory))

# after the prediction generation, put the randomly selected buffer tasks back
for i in range(self.curr + 1):
for each in selected_buffer_tasks[i]:
if each is not None:
self.slots[i].push(each, each.priority)

def slots_cycle(self):
def slots_cycle(self) -> None:
self.slots = self.slots[1:] + [Slot(self.num_events, self.num_anticipations, self.num_operations)]

def buffer_cycle(self, tasks, memory, max_events_per_slot=5, threshold_f=0.8, threshold_c=0.9,
default_cooldown=10):
def buffer_cycle(self, tasks: Iterable[Task], memory: Memory,
max_events_per_slot: int=5, threshold_f: float=0.8, threshold_c: float=0.9,
default_cooldown: int=10) -> Tuple[List[Reaction], List[Task]]:
# put all tasks to the current slot
self.push(tasks, memory)
self.compound_composition(memory)
Expand Down
Loading
Loading