Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/connectors/sinks/tdengine-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ TDengineSink accepts the following configuration parameters:
- `subtable` - Subtable name as a string, or a callable that receives the current message data and returns a string. If empty, a hash value will be generated from the data as the subtable name.
- `fields_keys` - Iterable (list) of strings used as TDengine "fields", or a callable that receives the current message data and returns an iterable of strings. If present, must not overlap with `tags_keys`. If empty, the whole record value will be used. Default: `()`.
- `tags_keys` - Iterable (list) of strings used as TDengine "tags", or a callable that receives the current message data and returns an iterable of strings. If present, must not overlap with `fields_keys`. Given keys are popped from the value dictionary since the same key cannot be both a tag and field. If empty, no tags will be sent. Default: `()`.
- `time_key` - Optional key to use as the timestamp for TDengine. If not set, the record's Kafka timestamp is used. Default: `None`.
- `time_key` - Optional key to use as the timestamp for TDengine. If not set, the record's Kafka timestamp is used. Default: `None`. The value can be an `int`, `string` (RFC3339 format), or `datetime`.**If `time_key` is set and the value is not present in the record, the record's Kafka timestamp will be used.**
- `time_setter` - Optional column name to use as the timestamp for TDengine. Also accepts a callable that receives the current message data and returns either the desired time or `None` (use default). The time can be an `int`, `string` (RFC3339 format), or `datetime`. The time must match the `time_precision` argument if not a `datetime` object, else raises. Default: `None`. **If time_setter is set as string, it behaves like `time_key`, but if the key is not present in the record, it will be raised as `KeyError`.** Caution: If both `time_key` and `time_setter` are set, a `ValueError` will be raised.
- `time_precision` - Time precision for the timestamp. One of: `"ms"`, `"ns"`, `"us"`, `"s"`. Default: `"ms"`.
- `allow_missing_fields` - If `True`, skip missing field keys instead of raising `KeyError`. Default: `False`.
- `include_metadata_tags` - If `True`, includes the record's key, topic, and partition as tags. Default: `False`.
Expand Down
191 changes: 150 additions & 41 deletions quixstreams/sinks/community/tdengine/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import ssl
import sys
import time
from datetime import datetime, timezone
from typing import Any, Callable, Iterable, Literal, Mapping, Optional, Union, get_args
from urllib.parse import urlencode, urljoin

Expand All @@ -23,18 +24,27 @@
logger = logging.getLogger(__name__)

TimePrecision = Literal["ms", "ns", "us", "s"]
TIME_PRECISION_LEN = {
"s": 10,
"ms": 13,
"ns": 16,
"us": 19,
}


InfluxDBValueMap = dict[str, Union[str, int, float, bool]]

FieldsCallable = Callable[[InfluxDBValueMap], Iterable[str]]
SupertableCallable = Callable[[InfluxDBValueMap], str]
TagsCallable = Callable[[InfluxDBValueMap], Iterable[str]]
SubtableNameCallable = Callable[[InfluxDBValueMap], str]
TimeCallable = Callable[[InfluxDBValueMap], Optional[Union[str, int, datetime]]]

FieldsSetter = Union[Iterable[str], FieldsCallable]
SupertableSetter = Union[str, SupertableCallable]
TagsSetter = Union[Iterable[str], TagsCallable]
SubtableNameSetter = Union[str, SubtableNameCallable]
TimeSetter = Union[str, TimeCallable]


class TDengineSink(BatchingSink):
Expand All @@ -47,6 +57,7 @@ def __init__(
fields_keys: FieldsSetter = (),
tags_keys: TagsSetter = (),
time_key: Optional[str] = None,
time_setter: Optional[TimeSetter] = None,
time_precision: TimePrecision = "ms",
allow_missing_fields: bool = False,
include_metadata_tags: bool = False,
Expand Down Expand Up @@ -105,6 +116,20 @@ def __init__(
By default, the record timestamp will be used with "ms" time precision.
When using a custom key, you may need to adjust the `time_precision` setting
to match.
The time value can be an `int`, `string` (RFC3339 format), or `datetime`.
The time value must match the `time_precision` argument if not a `datetime` object, else raises.
>***NOTE***:
> - If both `time_key` and `time_setter` are set, raises `ValueError`.
> - If time_key is set and the value is not present in the record, the record's kafka timestamp will be used.
:param time_setter: an optional column name to use as "time" when convert to InfluxDB line protocol.
Also accepts a callable which receives the current message data and
returns either the desired time or `None` (use default).
The time can be an `int`, `string` (RFC3339 format), or `datetime`.
The time must match the `time_precision` argument if not a `datetime` object, else raises.
By default, a record's kafka timestamp with "ms" time precision is used.
>***NOTE***:
> - If both `time_key` and `time_setter` are set, raises `ValueError`.
> - If time_setter is set as string, it behaves like `time_key`, but if the key is not present in the record, it will be raised as `KeyError`.
:param time_precision: a time precision to use when convert to InfluxDB line protocol.
Possible values: "ms", "ns", "us", "s".
Default - `"ms"`.
Expand Down Expand Up @@ -135,7 +160,11 @@ def __init__(
on_client_connect_success=on_client_connect_success,
on_client_connect_failure=on_client_connect_failure,
)

# check time_key and time_setter
if time_key is not None and time_setter is not None:
raise ValueError(
"Only one of 'time_key' or 'time_setter' can be set, not both."
)
if time_precision not in (time_args := get_args(TimePrecision)):
raise ValueError(
f"Invalid 'time_precision' argument {time_precision}; "
Expand Down Expand Up @@ -186,39 +215,21 @@ def __init__(
"database": database,
}
self._client: Optional[urllib3.PoolManager] = None
self._supertable_name = self._supertable_callable(supertable)
self._subtable_name = self._subtable_name_callable(subtable)
self._fields_keys = self._fields_callable(fields_keys)
self._tags_keys = self._tags_callable(tags_keys)
self._supertable_name = _supertable_callable(supertable)
self._subtable_name = _subtable_name_callable(subtable)
self._fields_keys = _fields_callable(fields_keys)
self._tags_keys = _tags_callable(tags_keys)
if time_key is not None:
# To be compatible with the old behavior, record's Kafka timestamp is used when time_key does not exist
self._time_setter = lambda value: value[time_key] if time_key in value else None
else:
self._time_setter = _time_callable(time_setter)
self._include_metadata_tags = include_metadata_tags
self._time_key = time_key
self._write_precision = time_precision
self._batch_size = batch_size
self._allow_missing_fields = allow_missing_fields
self._convert_ints_to_floats = convert_ints_to_floats

def _supertable_callable(self, setter: SupertableSetter) -> SupertableCallable:
if callable(setter):
return setter
return lambda value: setter

def _fields_callable(self, setter: FieldsSetter) -> FieldsCallable:
if callable(setter):
return setter
return lambda value: setter

def _tags_callable(self, setter: TagsSetter) -> TagsCallable:
if callable(setter):
return setter
return lambda value: setter

def _subtable_name_callable(
self, setter: SubtableNameSetter
) -> SubtableNameCallable:
if callable(setter):
return setter
return lambda value: setter

def setup(self):
if self._client_args["verify_ssl"]:
cert_reqs = ssl.CERT_REQUIRED
Expand Down Expand Up @@ -288,20 +299,31 @@ def write(self, batch: SinkBatch):
subtable = self._subtable_name
fields_keys = self._fields_keys
tags_keys = self._tags_keys
time_key = self._time_key
time_setter = self._time_setter

for write_batch in batch.iter_chunks(n=self._batch_size):
records = []

min_timestamp = sys.maxsize
max_timestamp = -1
min_timestamp = None
max_timestamp = None

for item in write_batch:
value = item.value
copied_value = value.copy() # Copy to print the original value in case of errors
# Evaluate these before we alter the value
_measurement = supertable(value)
# check if _measurement is empty
if _measurement is None or not _measurement.strip():
raise ValueError(
f'Supertable name cannot be empty for record with key "{item.key}" '
f"and topic '{batch.topic}', "
f"record value: {copied_value}"
)
_tags_keys = tags_keys(value)
_fields_keys = fields_keys(value)
_subtable_name = subtable(item.value)
ts = time_setter(value)

tags = {}
for tag_key in _tags_keys:
if tag_key in value:
Expand All @@ -313,7 +335,6 @@ def write(self, batch: SinkBatch):
tags["__topic"] = batch.topic
tags["__partition"] = batch.partition

tags["__subtable"] = _subtable_name
if _fields_keys:
fields = {
f: value[f]
Expand All @@ -328,29 +349,72 @@ def write(self, batch: SinkBatch):
k: float(v) if isinstance(v, int) else v
for k, v in fields.items()
}
ts = (
value[time_key]
if time_key is not None and time_key in value
else item.timestamp
)
# check if fields is empty
if not fields:
raise ValueError(
f'No fields found in the record for supertable "{_measurement}" '
f"and subtable name '{_subtable_name}', "
f"record value: {copied_value}"
)

tags["__subtable"] = _subtable_name

# check if fields and tags overlap
fields_tags_overlap = set(fields) & set(tags)
if fields_tags_overlap:
overlap_str = ",".join(str(k) for k in fields_tags_overlap)
raise ValueError(
f'Keys {overlap_str} are present in both "fields" and "tags" '
f"for supertable '{_measurement}' and subtable '{_subtable_name}'"
)

if ts is None:
ts = item.timestamp

elif not isinstance(ts, valid := (str, int, datetime)):
raise TypeError(
f'TDengine "time" field expects: {valid}, got {type(ts)}'
)

if isinstance(ts, int):
time_len = len(str(ts))
expected = TIME_PRECISION_LEN[self._write_precision]
if time_len != expected:
raise ValueError(
f'`time_precision` of "{self._write_precision}" '
f"expects a {expected}-digit integer epoch, "
f"got {time_len} (timestamp: {ts})."
)

record = {
"measurement": _measurement,
"tags": tags,
"fields": fields,
"time": ts,
}
records.append(record)
min_timestamp = min(ts, min_timestamp)
max_timestamp = max(ts, max_timestamp)
min_timestamp = min(ts, min_timestamp or _ts_min_default(ts))
max_timestamp = max(ts, max_timestamp or _ts_max_default(ts))
if not records:
logger.debug("No records to write")
logger.warning(
f"No records to write for batch with key '{item.key}' "
f"and topic '{batch.topic}', "
f"record value: {copied_value}"
)
continue
_start = time.monotonic()
l: list[bytes] = [b""] * len(records)
for i, point in enumerate(records):
p = Point.from_dict(point, self._write_precision)
l[i] = p.to_line_protocol().encode("utf-8")
body = b"\n".join(l)
body = b"\n".join([item for item in l if item])
if body == b"":
logger.warning(
f"No valid records to write for batch with key '{item.key}' "
f"and topic '{batch.topic}', "
f"record value: {copied_value}"
)
continue
timeout = urllib3.Timeout(total=self._client_args["timeout"] / 1_000)
logger.debug(f"Sending data to {self._client_args['url']} : {body}")
resp = self._client.request(
Expand All @@ -376,3 +440,48 @@ def write(self, batch: SinkBatch):
raise SinkBackpressureError(retry_after=int(retry_after)) from err
elif resp.status != 204:
raise err

def _ts_min_default(timestamp: Union[int, str, datetime]):
if isinstance(timestamp, int):
return sys.maxsize
elif isinstance(timestamp, str):
return "~" # lexicographically largest ASCII char
elif isinstance(timestamp, datetime):
return datetime.max.replace(tzinfo=timezone.utc)


def _ts_max_default(timestamp: Union[int, str, datetime]):
if isinstance(timestamp, int):
return -1
elif isinstance(timestamp, str):
return ""
elif isinstance(timestamp, datetime):
return datetime.min.replace(tzinfo=timezone.utc)

def _subtable_name_callable( setter: SubtableNameSetter) -> SubtableNameCallable:
if callable(setter):
return setter
return lambda value: setter

def _supertable_callable(setter: SupertableSetter) -> SupertableCallable:
if callable(setter):
return setter
return lambda value: setter

def _fields_callable(setter: FieldsSetter) -> FieldsCallable:
if callable(setter):
return setter
return lambda value: setter

def _tags_callable(setter: TagsSetter) -> TagsCallable:
if callable(setter):
return setter
return lambda value: setter

def _time_callable(setter: Optional[TimeSetter]) -> TimeCallable:
if callable(setter):
return setter
if isinstance(setter, str):
# If setter not in value, it will raise KeyError
return lambda value: value[setter]
return lambda value: None # the kafka timestamp will be used