Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 106 additions & 67 deletions vulnerabilities/api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def get_fixing_vulnerabilities(self, obj):
return [vuln.vulnerability_id for vuln in obj.fixing_vulnerabilities.all()]


class AdvisoryPackageV2Serializer(serializers.ModelSerializer):
class PackageV3Serializer(serializers.ModelSerializer):
purl = serializers.CharField(source="package_url")
risk_score = serializers.FloatField(read_only=True)
affected_by_vulnerabilities = serializers.SerializerMethodField()
Expand All @@ -353,26 +353,38 @@ class Meta:

def get_affected_by_vulnerabilities(self, package):
"""Return a dictionary with advisory as keys and their details, including fixed_by_packages."""
impacts = package.affected_in_impacts.select_related("advisory").prefetch_related(
"fixed_by_packages"
)

avids = {impact.advisory.avid for impact in impacts if impact.advisory_id}

latest_advisories = AdvisoryV2.objects.latest_for_avids(avids)
advisory_by_avid = {adv.avid: adv for adv in latest_advisories}

result = {}
request = self.context.get("request")
for impact in package.affected_in_impacts.all():
advisory = impact.advisory

for impact in impacts:
avid = impact.advisory.avid
advisory = advisory_by_avid.get(avid)
if not advisory:
continue
fixed_by_packages = [pkg.purl for pkg in impact.fixed_by_packages.all()]
code_fixes = CodeFixV2.objects.filter(advisory=advisory).distinct()
code_fix_urls = [
reverse("advisory-codefix-detail", args=[code_fix.id], request=request)
for code_fix in code_fixes
]
result[advisory.avid] = {
"advisory_id": advisory.avid,
"fixed_by_packages": fixed_by_packages,
"code_fixes": code_fix_urls,
}

return result

def get_fixing_vulnerabilities(self, package):
return [impact.advisory.avid for impact in package.fixed_in_impacts.all()]
impacts = package.fixed_in_impacts.select_related("advisory")

avids = {impact.advisory.avid for impact in impacts if impact.advisory_id}

latest_advisories = AdvisoryV2.objects.latest_for_avids(avids)

return [adv.avid for adv in latest_advisories]

def get_next_non_vulnerable_version(self, package):
if next_non_vulnerable := package.get_non_vulnerable_versions()[0]:
Expand Down Expand Up @@ -1013,9 +1025,9 @@ def get_view_name(self):
return "Pipeline Jobs"


class AdvisoriesPackageV2ViewSet(viewsets.ReadOnlyModelViewSet):
class PackageV3ViewSet(viewsets.ReadOnlyModelViewSet):
queryset = PackageV2.objects.all()
serializer_class = AdvisoryPackageV2Serializer
serializer_class = PackageV3Serializer
filter_backends = [filters.DjangoFilterBackend]
filterset_class = AdvisoryPackageV2FilterSet

Expand All @@ -1039,35 +1051,42 @@ def get_queryset(self):
)

def list(self, request, *args, **kwargs):
filtered_queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(filtered_queryset)
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)

advisories = set()
if page is not None:
for package in page:
advisories.update({impact.advisory for impact in package.affected_in_impacts.all()})
advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()})
packages = page if page is not None else queryset

# Serialize the vulnerabilities with advisory_id and advisory label as keys
advisory_data = {f"{adv.avid}": AdvisoryV2Serializer(adv).data for adv in advisories}
avids = set()

# Serialize the current page of packages
serializer = self.get_serializer(page, many=True)
data = serializer.data
for package in packages:
for impact in package.affected_in_impacts.all():
if impact.advisory_id:
avids.add(impact.advisory.avid)

# Use 'self.get_paginated_response' to include pagination data
return self.get_paginated_response({"advisories": advisory_data, "packages": data})
for impact in package.fixed_in_impacts.all():
if impact.advisory_id:
avids.add(impact.advisory.avid)

# If pagination is not applied, collect vulnerabilities for all packages
for package in queryset:
advisories.update({impact.advisory for impact in package.affected_in_impacts.all()})
advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()})
latest_advisories = AdvisoryV2.objects.latest_for_avids(avids)

advisory_data = {f"{adv.avid}": AdvisoryV2Serializer(adv).data for adv in advisories}
advisory_data = {adv.avid: AdvisoryV2Serializer(adv).data for adv in latest_advisories}

serializer = self.get_serializer(queryset, many=True)
data = serializer.data
return Response({"advisories": advisory_data, "packages": data})
serializer = self.get_serializer(packages, many=True)

if page is not None:
return self.get_paginated_response(
{
"packages": serializer.data,
"advisories": advisory_data,
}
)

return Response(
{
"packages": serializer.data,
"advisories": advisory_data,
}
)

@extend_schema(
request=PackageurlListSerializer,
Expand All @@ -1093,17 +1112,16 @@ def bulk_lookup(self, request):
"message": "A non-empty 'purls' list of PURLs is required.",
},
)
validated_data = serializer.validated_data
purls = validated_data.get("purls")

# Fetch packages matching the provided purls
purls = serializer.validated_data.get("purls")

packages = (
PackageV2.objects.for_purls(purls)
.prefetch_related(
Prefetch(
"affected_in_impacts",
queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related(
"fixed_by_packages",
"fixed_by_packages"
),
),
Prefetch(
Expand All @@ -1114,26 +1132,34 @@ def bulk_lookup(self, request):
.with_is_vulnerable()
)

# Collect vulnerabilities associated with these packages
advisories = set()
avids = set()

for package in packages:
advisories.update({impact.advisory for impact in package.affected_in_impacts.all()})
advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()})
for impact in package.affected_in_impacts.all():
if impact.advisory_id:
avids.add(impact.advisory.avid)

# Serialize vulnerabilities with vulnerability_id as keys
advisory_data = {adv.avid: AdvisoryV2Serializer(adv).data for adv in advisories}
for impact in package.fixed_in_impacts.all():
if impact.advisory_id:
avids.add(impact.advisory.avid)

# Serialize packages
package_data = AdvisoryPackageV2Serializer(
latest_advisories = AdvisoryV2.objects.latest_for_avids(avids)

advisory_data = {
adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data
for adv in latest_advisories
}

package_data = PackageV3Serializer(
packages,
many=True,
context={"request": request},
).data

return Response(
{
"advisories": advisory_data,
"packages": package_data,
"advisories": advisory_data,
}
)

Expand Down Expand Up @@ -1161,6 +1187,7 @@ def bulk_search(self, request):
"message": "A non-empty 'purls' list of PURLs is required.",
},
)

validated_data = serializer.validated_data
purls = validated_data.get("purls")
purl_only = validated_data.get("purl_only", False)
Expand Down Expand Up @@ -1202,24 +1229,31 @@ def bulk_search(self, request):

packages = query

# Collect vulnerabilities associated with these packages
advisories = set()
avids = set()
for package in packages:
advisories.update({impact.advisory for impact in package.affected_in_impacts.all()})
advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()})

advisory_data = {adv.avid: AdvisoryV2Serializer(adv).data for adv in advisories}
for impact in package.affected_in_impacts.all():
if impact.advisory_id:
avids.add(impact.advisory.avid)
for impact in package.fixed_in_impacts.all():
if impact.advisory_id:
avids.add(impact.advisory.avid)

latest_advisories = AdvisoryV2.objects.latest_for_avids(avids)
advisory_data = {
adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data
for adv in latest_advisories
}

if not purl_only:
package_data = AdvisoryPackageV2Serializer(
package_data = PackageV3Serializer(
packages,
many=True,
context={"request": request},
).data
return Response(
{
"advisories": advisory_data,
"packages": package_data,
"advisories": advisory_data,
}
)

Expand Down Expand Up @@ -1249,24 +1283,31 @@ def bulk_search(self, request):
)
packages = query

# Collect vulnerabilities associated with these packages
advisories = set()
avids = set()
for package in packages:
advisories.update({impact.advisory for impact in package.affected_in_impacts.all()})
advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()})

advisory_data = {adv.advisory_id: AdvisoryV2Serializer(adv).data for adv in advisories}
for impact in package.affected_in_impacts.all():
if impact.advisory_id:
avids.add(impact.advisory.avid)
for impact in package.fixed_in_impacts.all():
if impact.advisory_id:
avids.add(impact.advisory.avid)

latest_advisories = AdvisoryV2.objects.latest_for_avids(avids)
advisory_data = {
adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data
for adv in latest_advisories
}

if not purl_only:
package_data = AdvisoryPackageV2Serializer(
package_data = PackageV3Serializer(
packages,
many=True,
context={"request": request},
).data
return Response(
{
"advisories": advisory_data,
"packages": package_data,
"advisories": advisory_data,
}
)

Expand Down Expand Up @@ -1316,6 +1357,4 @@ def lookup(self, request):
purl = validated_data.get("purl")

qs = self.get_queryset().for_purls([purl]).with_is_vulnerable()
return Response(
AdvisoryPackageV2Serializer(qs, many=True, context={"request": request}).data
)
return Response(PackageV3Serializer(qs, many=True, context={"request": request}).data)
17 changes: 17 additions & 0 deletions vulnerabilities/migrations/0107_remove_advisoryv2_date_imported.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 4.2.25 on 2025-12-29 07:48

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0106_alter_advisoryreference_url_and_more"),
]

operations = [
migrations.RemoveField(
model_name="advisoryv2",
name="date_imported",
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 4.2.25 on 2025-12-29 08:17

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0107_remove_advisoryv2_date_imported"),
]

operations = [
migrations.AddIndex(
model_name="advisoryv2",
index=models.Index(
fields=["avid", "-date_collected", "-id"], name="advisory_latest_by_avid_idx"
),
),
]
Loading
Loading