|
| 1 | +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; |
| 2 | + |
| 3 | +use crate::exceptions::PyValueError; |
| 4 | +use crate::sync::GILOnceCell; |
| 5 | +use crate::types::PyType; |
| 6 | +use crate::{intern, FromPyObject, IntoPy, Py, PyAny, PyObject, PyResult, Python, ToPyObject}; |
| 7 | + |
| 8 | +impl FromPyObject<'_> for IpAddr { |
| 9 | + fn extract(obj: &PyAny) -> PyResult<Self> { |
| 10 | + match obj.getattr(intern!(obj.py(), "packed")) { |
| 11 | + Ok(packed) => { |
| 12 | + if let Ok(packed) = packed.extract::<[u8; 4]>() { |
| 13 | + Ok(IpAddr::V4(Ipv4Addr::from(packed))) |
| 14 | + } else if let Ok(packed) = packed.extract::<[u8; 16]>() { |
| 15 | + Ok(IpAddr::V6(Ipv6Addr::from(packed))) |
| 16 | + } else { |
| 17 | + Err(PyValueError::new_err("invalid packed length")) |
| 18 | + } |
| 19 | + } |
| 20 | + Err(_) => { |
| 21 | + // We don't have a .packed attribute, so we try to construct an IP from str(). |
| 22 | + obj.str()?.to_str()?.parse().map_err(PyValueError::new_err) |
| 23 | + } |
| 24 | + } |
| 25 | + } |
| 26 | +} |
| 27 | + |
| 28 | +impl ToPyObject for Ipv4Addr { |
| 29 | + fn to_object(&self, py: Python<'_>) -> PyObject { |
| 30 | + static IPV4_ADDRESS: GILOnceCell<Py<PyType>> = GILOnceCell::new(); |
| 31 | + IPV4_ADDRESS |
| 32 | + .get_or_try_init_type_ref(py, "ipaddress", "IPv4Address") |
| 33 | + .expect("failed to load ipaddress.IPv4Address") |
| 34 | + .call1((u32::from_be_bytes(self.octets()),)) |
| 35 | + .expect("failed to construct ipaddress.IPv4Address") |
| 36 | + .to_object(py) |
| 37 | + } |
| 38 | +} |
| 39 | + |
| 40 | +impl ToPyObject for Ipv6Addr { |
| 41 | + fn to_object(&self, py: Python<'_>) -> PyObject { |
| 42 | + static IPV6_ADDRESS: GILOnceCell<Py<PyType>> = GILOnceCell::new(); |
| 43 | + IPV6_ADDRESS |
| 44 | + .get_or_try_init_type_ref(py, "ipaddress", "IPv6Address") |
| 45 | + .expect("failed to load ipaddress.IPv6Address") |
| 46 | + .call1((u128::from_be_bytes(self.octets()),)) |
| 47 | + .expect("failed to construct ipaddress.IPv6Address") |
| 48 | + .to_object(py) |
| 49 | + } |
| 50 | +} |
| 51 | + |
| 52 | +impl ToPyObject for IpAddr { |
| 53 | + fn to_object(&self, py: Python<'_>) -> PyObject { |
| 54 | + match self { |
| 55 | + IpAddr::V4(ip) => ip.to_object(py), |
| 56 | + IpAddr::V6(ip) => ip.to_object(py), |
| 57 | + } |
| 58 | + } |
| 59 | +} |
| 60 | + |
| 61 | +impl IntoPy<PyObject> for IpAddr { |
| 62 | + fn into_py(self, py: Python<'_>) -> PyObject { |
| 63 | + self.to_object(py) |
| 64 | + } |
| 65 | +} |
| 66 | + |
| 67 | +#[cfg(test)] |
| 68 | +mod test_ipaddr { |
| 69 | + use std::str::FromStr; |
| 70 | + |
| 71 | + use crate::types::PyString; |
| 72 | + |
| 73 | + use super::*; |
| 74 | + |
| 75 | + #[test] |
| 76 | + fn test_roundtrip() { |
| 77 | + Python::with_gil(|py| { |
| 78 | + fn roundtrip(py: Python<'_>, ip: &str) { |
| 79 | + let ip = IpAddr::from_str(ip).unwrap(); |
| 80 | + let py_cls = if ip.is_ipv4() { |
| 81 | + "IPv4Address" |
| 82 | + } else { |
| 83 | + "IPv6Address" |
| 84 | + }; |
| 85 | + |
| 86 | + let pyobj = ip.into_py(py); |
| 87 | + let repr = pyobj.as_ref(py).repr().unwrap().to_string_lossy(); |
| 88 | + assert_eq!(repr, format!("{}('{}')", py_cls, ip)); |
| 89 | + |
| 90 | + let ip2: IpAddr = pyobj.extract(py).unwrap(); |
| 91 | + assert_eq!(ip, ip2); |
| 92 | + } |
| 93 | + roundtrip(py, "127.0.0.1"); |
| 94 | + roundtrip(py, "::1"); |
| 95 | + roundtrip(py, "0.0.0.0"); |
| 96 | + }); |
| 97 | + } |
| 98 | + |
| 99 | + #[test] |
| 100 | + fn test_from_pystring() { |
| 101 | + Python::with_gil(|py| { |
| 102 | + let py_str = PyString::new(py, "0:0:0:0:0:0:0:1"); |
| 103 | + let ip: IpAddr = py_str.to_object(py).extract(py).unwrap(); |
| 104 | + assert_eq!(ip, IpAddr::from_str("::1").unwrap()); |
| 105 | + |
| 106 | + let py_str = PyString::new(py, "invalid"); |
| 107 | + assert!(py_str.to_object(py).extract::<IpAddr>(py).is_err()); |
| 108 | + }); |
| 109 | + } |
| 110 | +} |
0 commit comments