Skip to content

Commit

Permalink
Add tests for IcalHttpRequestHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
agateau committed Aug 12, 2024
1 parent 56c6a9f commit 08dfe0e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 11 deletions.
51 changes: 50 additions & 1 deletion yokadi/tests/icaltestcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@


import unittest
from datetime import datetime
from datetime import datetime, timedelta

import icalendar

from yokadi.ycli import tui
from yokadi.ycli.projectcmd import getProjectFromName
Expand Down Expand Up @@ -147,3 +149,50 @@ def testGenerateCal(self):
expected = sorted(["p2", f"t1 ({t1.id})", f"t2new ({t2new.id})", f"t2started ({t2started.id})"])

self.assertEqual(summaries, expected)

def testHandlerProcessVTodoModifyTask(self):
# Create a task
task = dbutils.addTask("p1", "t1", interactive=False)
self.session.commit()

# Create a vTodo to modify the task
modified = datetime.now()
created = modified + timedelta(hours=-1)
vTodo = icalendar.Todo()
vTodo["UID"] = yical.TASK_UID % str(task.id)
vTodo.add("CREATED", created)
vTodo.add("LAST-MODIFIED", modified)
vTodo.add("summary", "new title")

# Process the vTodo
newTaskDict = {}
yical.IcalHttpRequestHandler.processVTodo(newTaskDict, vTodo)

# The task title must have changed
task = dbutils.getTaskFromId(task.id)
self.assertEqual(task.title, "new title")

# newTaskDict must not have changed
self.assertEqual(newTaskDict, {})

def testHandlerProcessVTodoCreateTask(self):
# Create a vTodo to add a new task
modified = datetime.now()
created = modified + timedelta(hours=-1)
vTodo = icalendar.Todo()
vTodo["UID"] = "zogzog"
vTodo.add("summary", "new task")

# Process the vTodo
newTaskDict = {}
yical.IcalHttpRequestHandler.processVTodo(newTaskDict, vTodo)

# The task should be in newTaskDict
newTaskList = list(newTaskDict.items())
self.assertEqual(len(newTaskList), 1)

(uid, taskId) = newTaskList[0]

# And the task can be retried
task = dbutils.getTaskFromId(taskId)
self.assertEqual(task.title, "new task")
24 changes: 14 additions & 10 deletions yokadi/yical/yical.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,39 +163,43 @@ def do_PUT(self):
for vTodo in cal.walk():
if "UID" in vTodo:
try:
self._processVTodo(vTodo)
IcalHttpRequestHandler.processVTodo(self.newTask, vTodo)
except YokadiException as e:
self.send_response(503, e)

# Tell caller everything is ok
self.send_response(200)
self.end_headers()

def _processVTodo(self, vTodo):
# This is static method to make it easier to test
@staticmethod
def processVTodo(newTaskDict, vTodo):
session = db.getSession()
if vTodo["UID"] in self.newTask:
uid = vTodo["UID"]
if uid in newTaskDict:
# This is a recent new task but remote ical calendar tool is not
# aware of new Yokadi UID. Update it here to avoid duplicate new tasks
print("update UID to avoid duplicate task")
vTodo["UID"] = TASK_UID % self.newTask[vTodo["UID"]]
uid = TASK_UID % newTaskDict[uid]
vTodo["UID"] = uid

if vTodo["UID"].startswith(UID_PREFIX):
if uid.startswith(UID_PREFIX):
# This is a yokadi Task.
if vTodo["LAST-MODIFIED"].dt > vTodo["CREATED"].dt:
# Task has been modified
print("Modified task: %s" % vTodo["UID"])
result = TASK_RE.match(vTodo["UID"])
print("Modified task: %s" % uid)
result = TASK_RE.match(uid)
if result:
id = result.group(1)
task = dbutils.getTaskFromId(id)
print("Task found in yokadi db: %s" % task.title)
updateTaskFromVTodo(task, vTodo)
session.commit()
else:
raise YokadiException("Task %s does exist in yokadi db " % id)
raise YokadiException("Task %s does exist in yokadi db " % uid)
else:
# This is a new task
print("New task %s (%s)" % (vTodo["summary"], vTodo["UID"]))
print("New task %s (%s)" % (vTodo["summary"], uid))
keywordDict = {}
task = dbutils.addTask(INBOX_PROJECT, vTodo["summary"],
keywordDict, interactive=False)
Expand All @@ -205,7 +209,7 @@ def _processVTodo(self, vTodo):
# if user update it right after creation without reloading the
# yokadi UID
# TODO: add purge for old UID
self.newTask[vTodo["UID"]] = task.id
newTaskDict[uid] = task.id


class YokadiIcalServer(Thread):
Expand Down

0 comments on commit 08dfe0e

Please sign in to comment.