Skip to content

Commit 55e67a7

Browse files
Merge pull request openwallet-foundation#206 from nrempel/remove-old-exchange-records
Remove old exchange records
2 parents 76a38d1 + e70cd8d commit 55e67a7

File tree

5 files changed

+1163
-26
lines changed

5 files changed

+1163
-26
lines changed

aries_cloudagent/messaging/credentials/manager.py

Lines changed: 138 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
"""Classes to manage credentials."""
22

33
import asyncio
4+
import datetime
45
import json
56
import logging
67
import time
8+
import random
9+
10+
from ..util import str_to_datetime, datetime_now
711

812
from ...config.injection_context import InjectionContext
913
from ...error import BaseError
@@ -93,16 +97,27 @@ async def prepare_send(
9397
source_credential_exchange = None
9498

9599
if source_credential_exchange_id:
96-
97100
# The cached credential exchange ID may not have an associated credential
98101
# request yet. Wait up to 30 seconds for that to be populated, then
99102
# move on and replace it as the cached credential exchange
100-
101103
lookup_start = time.perf_counter()
102104
while True:
103-
source_credential_exchange = await CredentialExchange.retrieve_by_id(
104-
self._context, source_credential_exchange_id
105-
)
105+
try:
106+
(
107+
source_credential_exchange
108+
) = await CredentialExchange.retrieve_by_id(
109+
self._context, source_credential_exchange_id
110+
)
111+
except StorageNotFoundError:
112+
# It's possible that the cached credential expired
113+
# and was deleted while we are waiting. In this case,
114+
# it is time to issue a new credential offer.
115+
self._logger.debug(
116+
"Credential exchange deleted while"
117+
+ " waiting for credential request"
118+
)
119+
break
120+
106121
if source_credential_exchange.credential_request:
107122
break
108123
if lookup_start + 30 < time.perf_counter():
@@ -453,8 +468,12 @@ async def receive_credential(self, credential_message: CredentialIssue):
453468
},
454469
)
455470

471+
# Copy values from parent but create new record on save (no id)
456472
credential_exchange_record._id = None
457473
credential_exchange_record.thread_id = credential_message._thread_id
474+
credential_exchange_record.parent_thread_id = (
475+
credential_message._thread.pthid
476+
)
458477
credential_exchange_record.credential_id = None
459478
credential_exchange_record.credential = None
460479

@@ -499,12 +518,6 @@ async def store_credential(
499518
credential_exchange_record.credential_id = credential_id
500519
credential_exchange_record.credential = credential
501520

502-
# clear unnecessary data
503-
credential_exchange_record.credential_offer = None
504-
credential_exchange_record.credential_request = None
505-
credential_exchange_record.raw_credential = None
506-
# credential_request_metadata may be reused
507-
508521
await credential_exchange_record.save(self.context, reason="Store credential")
509522

510523
credential_stored_message = CredentialStored()
@@ -513,6 +526,49 @@ async def store_credential(
513526
credential_exchange_record.parent_thread_id,
514527
)
515528

529+
# Always delete this record if it's a child
530+
531+
# Get parent exchange record if parent id exists
532+
parent_thread_id = credential_exchange_record.parent_thread_id
533+
if parent_thread_id:
534+
# We delete the current record but only if it has a parent_id
535+
# because we don't want to delete any new parents
536+
try:
537+
await credential_exchange_record.delete_record(self.context)
538+
except StorageNotFoundError:
539+
# It's possible for another thread to have already deleted
540+
# this record
541+
self._logger.debug("Failed to delete credential exchange record")
542+
543+
# Delete old records if they are no longer used
544+
545+
# Run approx every 100 runs
546+
if random.randint(1, 100) == 1:
547+
# Query undeleted stored exchange records for possible expired parents
548+
old_credential_exchange_records = await CredentialExchange.query(
549+
self.context,
550+
tag_filter={
551+
"state": CredentialExchange.STATE_STORED,
552+
"initiator": CredentialExchange.INITIATOR_EXTERNAL,
553+
},
554+
)
555+
556+
for old_credential_exchange_record in old_credential_exchange_records:
557+
last_updated_string = old_credential_exchange_record.updated_at
558+
last_updated = str_to_datetime(last_updated_string)
559+
one_hour_ago = datetime_now() - datetime.timedelta(hours=1)
560+
561+
# delete parent exchange records more than 1 hour old
562+
if last_updated < one_hour_ago:
563+
try:
564+
await old_credential_exchange_record.delete_record(self.context)
565+
except StorageNotFoundError:
566+
# It's possible for another thread to have already deleted
567+
# this record
568+
self._logger.debug(
569+
"Failed to delete credential exchange record"
570+
)
571+
516572
return credential_exchange_record, credential_stored_message
517573

518574
async def credential_stored(self, credential_stored_message: CredentialStored):
@@ -524,6 +580,7 @@ async def credential_stored(self, credential_stored_message: CredentialStored):
524580
525581
"""
526582

583+
# Get current exchange record by thread id
527584
credential_exchange_record = await CredentialExchange.retrieve_by_tag_filter(
528585
self.context,
529586
tag_filter={
@@ -532,13 +589,76 @@ async def credential_stored(self, credential_stored_message: CredentialStored):
532589
},
533590
)
534591

535-
# clear unnecessary data
536-
credential_exchange_record.credential_offer = None
537-
credential_exchange_record.credential_request = None
538-
credential_exchange_record.credential_request_metadata = None
539-
credential_exchange_record.credential_values = None
540-
541592
credential_exchange_record.state = CredentialExchange.STATE_STORED
542593
await credential_exchange_record.save(self.context, reason="Credential stored")
543594

544-
return credential_exchange_record
595+
# Always delete this record if it's a child
596+
597+
# Get parent exchange record if parent id exists
598+
parent_thread_id = credential_exchange_record.parent_thread_id
599+
if parent_thread_id:
600+
# We delete the current record but only if it has a parent_id
601+
# because we don't want to delete any new parents
602+
try:
603+
await credential_exchange_record.delete_record(self.context)
604+
except StorageNotFoundError:
605+
# It's possible for another thread to have already deleted
606+
# this record
607+
self._logger.debug("Failed to delete credential exchange record")
608+
609+
# Delete old records if they are no longer used
610+
611+
# Run approx every 100 runs
612+
if random.randint(1, 100) == 1:
613+
# Query undeleted stored exchange records for possible expired parents
614+
old_credential_exchange_records = await CredentialExchange.query(
615+
self.context,
616+
tag_filter={
617+
"state": CredentialExchange.STATE_STORED,
618+
"initiator": CredentialExchange.INITIATOR_SELF,
619+
},
620+
)
621+
622+
for old_credential_exchange_record in old_credential_exchange_records:
623+
cache: BaseCache = await self._context.inject(BaseCache)
624+
625+
cached_credential_ex_id = await cache.get(
626+
"credential_exchange::"
627+
+ f"{old_credential_exchange_record.credential_definition_id}::"
628+
+ f"{old_credential_exchange_record.connection_id}"
629+
)
630+
631+
# If this old credential is still in the cache, then it's definitely
632+
# an active parent record
633+
if (
634+
old_credential_exchange_record.credential_exchange_id
635+
!= cached_credential_ex_id
636+
):
637+
# We check if any child threads are still relying on
638+
# information from this record. If not, we can delete.
639+
child_records = await CredentialExchange.query(
640+
self.context,
641+
tag_filter={
642+
"parent_thread_id": (
643+
old_credential_exchange_record.thread_id
644+
),
645+
"initiator": CredentialExchange.INITIATOR_SELF,
646+
},
647+
)
648+
649+
# If this credential isn't in the cache and there are no child
650+
# records which reference this as parent, we can delete
651+
if len(child_records) == 0:
652+
try:
653+
await old_credential_exchange_record.delete_record(
654+
self.context
655+
)
656+
self._logger.debug(
657+
"Parent credential exchange record successfully deleted"
658+
)
659+
except StorageNotFoundError:
660+
# It's possible for another thread to have already deleted
661+
# this record
662+
self._logger.debug(
663+
"Failed to delete parent credential exchange record"
664+
)

0 commit comments

Comments
 (0)