Skip to content

Commit

Permalink
Implement split crossmatch for assumed + summary antibody + display h…
Browse files Browse the repository at this point in the history
…la code.
  • Loading branch information
abragtim committed Apr 19, 2023
1 parent ec974a4 commit d4029a2
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 53 deletions.
33 changes: 33 additions & 0 deletions tests/web/test_do_crossmatch_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,35 @@ def test_do_crossmatch_for_assumed_hla_type(self):
self.assertCountEqual(expected_assumed_hla_typing,
res_assumed_hla_typing)

# assumed hla type without matched antibodies
json = {
"assumed_donor_hla_typing": [['DPA1*01:03', 'DPA1*01:04', 'DPA1*01:06'],
['DPA1*02:01'],
['DPA1*01:04']],
"recipient_antibodies": [{'mfi': 2100,
'name': 'DPA1*01:07',
'cutoff': 2000
},
{'mfi': 2100,
'name': 'DPA1*02:01',
'cutoff': 2000
}],
}
with self.app.test_client() as client:
res = client.post(f'{API_VERSION}/{CROSSMATCH_NAMESPACE}/do-crossmatch', json=json,
headers=self.auth_headers)
self.assertEqual(200, res.status_code)

self.assertEqual(3, len(res.json['hla_to_antibody']))
res_assumed_hla_typing = \
[res.json['hla_to_antibody'][i]['hla_type']
for i in range(len(res.json['hla_to_antibody']))]
expected_assumed_hla_typing = [[asdict(create_hla_type('DPA1'))],
[asdict(create_hla_type('DPA1*01:04'))],
[asdict(create_hla_type('DPA1*02:01'))]]
self.assertCountEqual(expected_assumed_hla_typing,
res_assumed_hla_typing)

# hla type is assumed in split/broad
json = {
"assumed_donor_hla_typing": [['DPA1*01:03', 'DPA1*01:04', 'DPA1*02:06'],
Expand Down Expand Up @@ -212,3 +241,7 @@ def test_do_crossmatch_for_assumed_hla_type(self):
self.assertEqual(400, res.status_code) # ValueError
self.assertEqual('Assumed HLA type is available only for HLA types in high resolution.',
res.json['message'])

# TODO: test display_code

# TODO: test summary antibody
7 changes: 3 additions & 4 deletions txmatching/data_transfer_objects/crossmatch/crossmatch_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ class CrossmatchDTOIn:
assumed_donor_hla_typing: List[List[str]]
recipient_antibodies: List[HLAAntibodiesUploadDTO]

def __post_init__(self):
# TODO:
self.maximum_donor_hla_typing = [hla_type for hla_typing in self.assumed_donor_hla_typing
for hla_type in hla_typing]
def get_maximum_donor_hla_typing(self):
return [hla_type for hla_typing in self.assumed_donor_hla_typing
for hla_type in hla_typing]


@dataclass
Expand Down
1 change: 0 additions & 1 deletion txmatching/patients/hla_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class HLABase:

@dataclass
class HLAType(HLABase, PersistentlyHashable):
# TODO:
display_code: Optional[str] = None

def __post_init__(self):
Expand Down
21 changes: 19 additions & 2 deletions txmatching/utils/hla_system/hla_crossmatch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import copy
from dataclasses import dataclass, field
from typing import Callable, List, Set, Optional

Expand All @@ -9,6 +10,7 @@
HLAPerGroup, HLAType, HLATyping)
from txmatching.utils.enums import (AntibodyMatchTypes, HLAAntibodyType,
HLACrossmatchLevel, HLAGroup)
from txmatching.utils.hla_system.hla_preparation_utils import create_hla_type
from txmatching.utils.hla_system.rel_dna_ser_exceptions import \
MULTIPLE_SERO_CODES_LIST

Expand All @@ -35,7 +37,7 @@ class AntibodyMatchForHLAType:
# HLATypes is correct, so we return several at once
hla_type: List[HLAType]
antibody_matches: List[AntibodyMatch] = field(default_factory=list)
summary_antibody: Optional[AntibodyMatch] = None # TODO: get summary antibody
summary_antibody: Optional[AntibodyMatch] = None

def __post_init__(self):
if self.is_hla_type_assumed() and not self.__is_hla_type_in_high_res():
Expand All @@ -44,9 +46,24 @@ def __post_init__(self):
if self.__is_hla_type_assumed_in_low_res():
raise ValueError("HLA Type can be assumed just in high resolution.")

self.summary_antibody = \
max(self.antibody_matches,
key=lambda match: match.hla_antibody.mfi) if self.antibody_matches else None

def is_hla_type_assumed(self):
return len(self.hla_type) > 1

def get_low_res_code_from_assumed(self):
return self.hla_type[0].code.get_low_res_code()

def convert_assumed_to_low_res(self):
self.hla_type = [create_hla_type(raw_code=self.get_low_res_code_from_assumed())]

def get_copy_with_converted_assumed_to_low_res(self):
res = copy(self)
res.convert_assumed_to_low_res()
return res

def __is_hla_type_in_high_res(self):
return len([hla_type for hla_type in self.hla_type
if not hla_type.code.is_in_high_res()]) == 0
Expand All @@ -55,7 +72,7 @@ def __is_hla_type_assumed_in_low_res(self):
return len({hla_type.code.get_low_res_code() for hla_type in self.hla_type}) > 1

def __hash__(self):
return hash(tuple(self.hla_type))
return hash((tuple(self.hla_type), tuple(self.antibody_matches)))

def __eq__(self, other):
return hash(self) == hash(other)
Expand Down
119 changes: 73 additions & 46 deletions txmatching/web/api/crossmatch_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,68 +45,95 @@ def get_hla_antibodies_and_parsing_issues(antibodies) \
hla_antibodies_raw_list=antibodies_raw_list,
hla_antibodies_per_groups=antibodies_dto.hla_antibodies_per_groups), parsing_issues

def raise_not_implemented_if_theoretical_antibody(hla_antibody):
if hla_antibody.type == HLAAntibodyType.THEORETICAL or \
hla_antibody.second_raw_code:
raise TXMNotImplementedFeatureException(
'This functionality is not currently available for dual antibodies. '
'We apologize and will try to change this in future versions.')

def get_hla_types_correspond_antibody(assumed_hla_type: List[HLAType],
hla_antibody: HLAAntibody) -> List[HLAType]:
return [hla_type for hla_type in assumed_hla_type
if hla_type.code == hla_antibody.code]

def fulfill_with_common_matches(antibody_matches, crossmatched_antibodies):
for match_per_group in crossmatched_antibodies:
for antibody_group_match in match_per_group.antibody_matches:
raise_not_implemented_if_theoretical_antibody(antibody_group_match.hla_antibody)
for antibody_hla_match in antibody_matches:
common_matched_hla_types: List[HLAType] = get_hla_types_correspond_antibody(
antibody_hla_match.hla_type, antibody_group_match.hla_antibody
)
if len(common_matched_hla_types) > 0:
antibody_hla_match.hla_type = common_matched_hla_types
antibody_hla_match.antibody_matches.append(antibody_group_match)

def solve_uncrossmatched_hla_types(antibody_hla_matches: List[AntibodyMatchForHLAType]):
def convert_assumed_hla_type_to_split(antibody_hla_match):
antibody_hla_match.hla_type = [
create_hla_type(
raw_code=antibody_hla_match.hla_type[0].code.get_low_res_code())]

for antibody_hla_match in antibody_hla_matches:
if antibody_hla_matches.count(antibody_hla_match) > 1:
del antibody_hla_matches[antibody_hla_matches.index(antibody_hla_match)] # TODO: mb wrong.
if len(antibody_hla_match.hla_type) > 1 and \
antibody_hla_match.antibody_matches:
convert_assumed_hla_type_to_split(antibody_hla_match)
def get_parsing_issues_for_hla_typing(antibody_matches_for_hla_type):
maximum_hla_typing_raw = [hla_type.raw_code for antibody_match in antibody_matches_for_hla_type
for hla_type in antibody_match.hla_type]
typing_parsing_issues, _ = parse_hla_typing_raw_and_return_parsing_issue_list(
HLATypingRawDTO(
hla_types_list=[HLATypeRaw(hla_type) for hla_type in
maximum_hla_typing_raw]
))
return typing_parsing_issues

def get_unique_from_list(lst: list):
return list(dict.fromkeys(lst).keys())

crossmatch_dto = request_body(CrossmatchDTOIn)

hla_antibodies, antibodies_parsing_issues = get_hla_antibodies_and_parsing_issues(
crossmatch_dto.recipient_antibodies)

crossmatched_antibodies_per_group = get_crossmatched_antibodies_per_group(
donor_hla_typing=create_hla_typing(crossmatch_dto.maximum_donor_hla_typing),
donor_hla_typing=create_hla_typing(crossmatch_dto.get_maximum_donor_hla_typing()),
recipient_antibodies=hla_antibodies,
use_high_resolution=True)
antibody_matches_for_hla_type = [AntibodyMatchForHLAType(
hla_type=[create_hla_type(raw_code=hla) for hla in hla_typing],
antibody_matches=[]) for hla_typing in crossmatch_dto.assumed_donor_hla_typing]

fulfill_with_common_matches(antibody_matches_for_hla_type,
self.__fulfill_with_common_matches(antibody_matches_for_hla_type,
crossmatched_antibodies_per_group)
solve_uncrossmatched_hla_types(antibody_matches_for_hla_type) # TODO: rename?

typing_parsing_issues, _ = parse_hla_typing_raw_and_return_parsing_issue_list(
HLATypingRawDTO(
hla_types_list=[HLATypeRaw(hla_type) for hla_type in
crossmatch_dto.maximum_donor_hla_typing]
# TODO: change it! not maximum, ale only used
))
antibody_matches_for_hla_type = get_unique_from_list(antibody_matches_for_hla_type)
antibody_matches_for_hla_type = self.__solve_uncrossmatched_assumed_hla_types(
antibody_matches_for_hla_type, hla_antibodies)

typing_parsing_issues = get_parsing_issues_for_hla_typing(antibody_matches_for_hla_type)

return response_ok(CrossmatchDTOOut(
hla_to_antibody=antibody_matches_for_hla_type,
parsing_issues=antibodies_parsing_issues + typing_parsing_issues
))

@staticmethod
def __fulfill_with_common_matches(antibody_matches, crossmatched_antibodies):

def raise_not_implemented_if_theoretical_antibody(hla_antibody):
if hla_antibody.type == HLAAntibodyType.THEORETICAL or \
hla_antibody.second_raw_code:
raise TXMNotImplementedFeatureException(
'This functionality is not currently available for dual antibodies. '
'We apologize and will try to change this in future versions.')

def get_hla_types_correspond_antibody(assumed_hla_type: List[HLAType],
hla_antibody: HLAAntibody) -> List[HLAType]:
return [hla_type for hla_type in assumed_hla_type
if hla_type.code == hla_antibody.code]

for match_per_group in crossmatched_antibodies:
for antibody_group_match in match_per_group.antibody_matches:
raise_not_implemented_if_theoretical_antibody(antibody_group_match.hla_antibody)
for antibody_hla_match in antibody_matches:
common_matched_hla_types: List[HLAType] = get_hla_types_correspond_antibody(
antibody_hla_match.hla_type, antibody_group_match.hla_antibody
)
if len(common_matched_hla_types) > 0:
antibody_hla_match.hla_type = common_matched_hla_types
antibody_hla_match.antibody_matches.append(antibody_group_match)

def __solve_uncrossmatched_assumed_hla_types(self, antibody_matches, hla_antibodies):

def distribute_antibody_matches_solved_unsolved(matches):
solved_antibody_matches = []
unsolved_antibody_matches = []
for antibody_match in matches:
if not antibody_match.antibody_matches and antibody_match.is_hla_type_assumed():
unsolved_antibody_matches.append(
antibody_match.get_copy_with_converted_assumed_to_low_res())
else:
solved_antibody_matches.append(antibody_match)
return solved_antibody_matches, unsolved_antibody_matches

solved_antibody_matches, unsolved_assumed_antibody_matches = \
distribute_antibody_matches_solved_unsolved(antibody_matches)
crossmatched_antibodies_for_unsolved = \
get_crossmatched_antibodies_per_group(
donor_hla_typing=create_hla_typing([match.get_low_res_code_from_assumed()
for match in unsolved_assumed_antibody_matches]),
recipient_antibodies=hla_antibodies,
use_high_resolution=True
)
self.__fulfill_with_common_matches(unsolved_assumed_antibody_matches,
crossmatched_antibodies_for_unsolved)

solved_antibody_matches.extend(unsolved_assumed_antibody_matches)
return solved_antibody_matches

0 comments on commit d4029a2

Please sign in to comment.