Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ jobs:
run: make check-format
- name: Run linter
run: make lint
- name: Type check
run: make typecheck
- name: Security audit
run: make audit

Expand All @@ -53,5 +55,7 @@ jobs:
run: poetry run ruff format --check leakix/ tests/ example/ executable/
- name: Run linter
run: poetry run ruff check leakix/ tests/ example/ executable/
- name: Type check
run: poetry run mypy leakix/
- name: Security audit
run: poetry run pip-audit
19 changes: 8 additions & 11 deletions leakix/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class Scope(Enum):


class HostResult(Model):
Services: fields.Optional(fields.List(fields.Nested(l9format.L9Event)))
Leaks: fields.Optional(fields.List(fields.Nested(l9format.L9Event)))
Services: fields.Optional(fields.List(fields.Nested(l9format.L9Event))) # type: ignore[valid-type]
Leaks: fields.Optional(fields.List(fields.Nested(l9format.L9Event))) # type: ignore[valid-type]


DEFAULT_URL = "https://leakix.net"
Expand Down Expand Up @@ -97,9 +97,8 @@ def get(
if queries is None or len(queries) == 0:
serialized_query = EmptyQuery().serialize()
else:
serialized_query = [q.serialize() for q in queries]
serialized_query = " ".join(serialized_query)
serialized_query = f"{serialized_query}"
parts = [q.serialize() for q in queries]
serialized_query = " ".join(parts)
url = f"{self.base_url}/search"
r = self.__get(
url=url,
Expand Down Expand Up @@ -188,9 +187,8 @@ def bulk_export(self, queries: list[Query] | None = None) -> AbstractResponse:
if queries is None or len(queries) == 0:
serialized_query = EmptyQuery().serialize()
else:
serialized_query = [q.serialize() for q in queries]
serialized_query = " ".join(serialized_query)
serialized_query = f"{serialized_query}"
parts = [q.serialize() for q in queries]
serialized_query = " ".join(parts)
params = {"q": serialized_query}
r = requests.get(url, params=params, headers=self.headers, stream=True)
if r.status_code == 200:
Expand Down Expand Up @@ -226,9 +224,8 @@ def bulk_service(self, queries: list[Query] | None = None) -> AbstractResponse:
if queries is None or len(queries) == 0:
serialized_query = EmptyQuery().serialize()
else:
serialized_query = [q.serialize() for q in queries]
serialized_query = " ".join(serialized_query)
serialized_query = f"{serialized_query}"
parts = [q.serialize() for q in queries]
serialized_query = " ".join(parts)
params = {"q": serialized_query}
r = requests.get(url, params=params, headers=self.headers, stream=True)
if r.status_code == 200:
Expand Down
6 changes: 3 additions & 3 deletions leakix/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@


class L9Subdomain(Model):
subdomain: fields.Str()
distinct_ips: fields.Int()
last_seen: fields.DateTime()
subdomain: fields.Str() # type: ignore[valid-type]
distinct_ips: fields.Int() # type: ignore[valid-type]
last_seen: fields.DateTime() # type: ignore[valid-type]
4 changes: 2 additions & 2 deletions leakix/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@


class APIResult(Model):
name: fields.Str()
description: fields.Str()
name: fields.Str() # type: ignore[valid-type]
description: fields.Str() # type: ignore[valid-type]


class Plugin(Enum):
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mypy = "*"
requests-mock = "*"
ruff = "*"
pip-audit = "*"
types-requests = "^2.32.4.20260107"

[tool.ruff]
line-length = 88
Expand All @@ -40,6 +41,7 @@ python_version = "3.13"
warn_return_any = true
warn_unused_configs = true
ignore_missing_imports = false
mypy_path = "stubs"

[tool.pytest.ini_options]
testpaths = ["tests"]
Expand Down
3 changes: 3 additions & 0 deletions stubs/l9format/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from l9format import l9format as l9format

__version__: str
132 changes: 132 additions & 0 deletions stubs/l9format/l9format.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from datetime import datetime
from typing import Any

from serde.model import Model

class L9HttpEvent(Model):
root: str
url: str
status: int
length: int
header: dict[str, str] | None
title: str
favicon_hash: str

class ServiceCredentials(Model):
noauth: bool
username: str
password: str
key: str
raw: str | None

class SoftwareModule(Model):
name: str
version: str
fingerprint: str

class Software(Model):
name: str
version: str
os: str
modules: list[SoftwareModule] | None
fingerprint: str

class Certificate(Model):
cn: str
domain: list[str] | None
fingerprint: str
key_algo: str
key_size: int
issuer_name: str
not_before: datetime
not_after: datetime
valid: bool

class GeoPoint(Model):
lat: Any
lon: Any

class GeoLocation(Model):
continent_name: str | None
region_iso_code: str | None
city_name: str | None
country_iso_code: str | None
country_name: str | None
region_name: str | None
location: GeoPoint | None

class L9SSHEvent(Model):
fingerprint: str
version: int
banner: str
motd: str

class DatasetSummary(Model):
rows: int
files: int
size: int
collections: int
infected: bool
ransom_notes: list[str] | None

class L9LeakEvent(Model):
stage: str
type: str
severity: str
dataset: DatasetSummary

class L9SSLEvent(Model):
detected: bool
enabled: bool
jarm: str
cypher_suite: str
version: str
certificate: Certificate

class L9ServiceEvent(Model):
credentials: ServiceCredentials
software: Software

class Network(Model):
organization_name: str
asn: int
network: str

class L9Event(Model):
event_type: str
event_source: str
event_pipeline: list[str] | None
event_fingerprint: str | None
ip: str
port: str
host: str
reverse: str
mac: str | None
vendor: str | None
transport: list[str] | None
protocol: str
http: L9HttpEvent
summary: str
time: datetime
ssl: L9SSLEvent | None
ssh: L9SSHEvent
service: L9ServiceEvent
leak: L9LeakEvent | None
tags: list[str] | None
geoip: GeoLocation
network: Network

class L9Aggregation(Model):
summary: str | None
ip: str
resource_id: str
open_ports: list[str]
leak_count: int
leak_event_count: int
events: list[L9Event]
plugins: list[str]
geoip: GeoLocation
network: Network
creation_date: datetime
update_date: datetime
fresh: bool
5 changes: 5 additions & 0 deletions stubs/serde/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from serde import fields as fields
from serde.exceptions import ValidationError as ValidationError
from serde.model import Model as Model

__all__: list[str]
6 changes: 6 additions & 0 deletions stubs/serde/exceptions.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import Any

class ValidationError(Exception):
def __init__(self, message: str, **kwargs: Any) -> None: ...

class ContextError(Exception): ...
62 changes: 62 additions & 0 deletions stubs/serde/fields.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import Any

class Field:
def __init__(self, **kwargs: Any) -> None: ...

class Instance(Field):
def __init__(self, ty: type, **kwargs: Any) -> None: ...

class Str(Instance):
def __init__(self, **kwargs: Any) -> None: ...

class Int(Instance):
def __init__(self, **kwargs: Any) -> None: ...

class Bool(Instance):
def __init__(self, **kwargs: Any) -> None: ...

class Float(Instance):
def __init__(self, **kwargs: Any) -> None: ...

class Bytes(Instance):
def __init__(self, **kwargs: Any) -> None: ...

class DateTime(Instance):
def __init__(self, format: str = ..., **kwargs: Any) -> None: ...

class Date(DateTime):
def __init__(self, format: str = ..., **kwargs: Any) -> None: ...

class Time(DateTime):
def __init__(self, format: str = ..., **kwargs: Any) -> None: ...

class Nested(Instance):
def __init__(self, model_cls: type, **kwargs: Any) -> None: ...

class Optional(Field):
def __init__(self, inner: Any = ..., **kwargs: Any) -> None: ...

class List(Field):
def __init__(self, element: Any = ..., **kwargs: Any) -> None: ...

class Set(Field):
def __init__(self, element: Any = ..., **kwargs: Any) -> None: ...

class Tuple(Field):
def __init__(self, *elements: Any, **kwargs: Any) -> None: ...

class Dict(Field):
def __init__(
self,
key: Any = ...,
value: Any = ...,
**kwargs: Any,
) -> None: ...

class Flatten(Nested): ...

class Literal(Field):
def __init__(self, value: Any, **kwargs: Any) -> None: ...

class Choice(Field):
def __init__(self, choices: Any, **kwargs: Any) -> None: ...
13 changes: 13 additions & 0 deletions stubs/serde/model.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from collections import OrderedDict
from typing import Any, Self

class Model:
def __init__(self, *args: Any, **kwargs: Any) -> None: ...
def __eq__(self, other: object) -> bool: ...
def __hash__(self) -> int: ...
def to_dict(self) -> OrderedDict[str, Any]: ...
def to_json(self, **kwargs: Any) -> str: ...
@classmethod
def from_dict(cls, d: dict[str, Any]) -> Self: ...
@classmethod
def from_json(cls, s: str, **kwargs: Any) -> Self: ...