Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Make batched writing support all iterables #746

Merged
merged 5 commits into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import socket
import struct
import time
from itertools import chain, islice

import msgpack
import requests
import requests.exceptions
from six.moves import xrange
from six.moves.urllib.parse import urlparse

from influxdb.line_protocol import make_lines, quote_ident, quote_literal
Expand Down Expand Up @@ -565,8 +565,17 @@ def ping(self):

@staticmethod
def _batches(iterable, size):
for i in xrange(0, len(iterable), size):
yield iterable[i:i + size]
# Iterate over an iterable producing iterables of batches. Based on:
# http://code.activestate.com/recipes/303279-getting-items-in-batches/
iterator = iter(iterable)
while True:
try: # Try get the first element in the iterator...
head = (next(iterator),)
except StopIteration:
return # ...so that we can stop if there isn't one
# Otherwise, lazily slice the rest of the batch
rest = islice(iterator, size - 1)
yield chain(head, rest)

def _write_points(self,
points,
Expand Down
30 changes: 30 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,36 @@ def test_write_points_batch(self):
self.assertEqual(expected_last_body,
m.last_request.body.decode('utf-8'))

def test_write_points_batch_generator(self):
"""Test write points batch from a generator for TestInfluxDBClient."""
dummy_points = [
{"measurement": "cpu_usage", "tags": {"unit": "percent"},
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
{"measurement": "network", "tags": {"direction": "in"},
"time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
{"measurement": "network", "tags": {"direction": "out"},
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
]
dummy_points_generator = (point for point in dummy_points)
expected_last_body = (
"network,direction=out,host=server01,region=us-west "
"value=12.0 1257894000000000000\n"
)

with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)
cli = InfluxDBClient(database='db')
cli.write_points(points=dummy_points_generator,
database='db',
tags={"host": "server01",
"region": "us-west"},
batch_size=2)
self.assertEqual(m.call_count, 2)
self.assertEqual(expected_last_body,
m.last_request.body.decode('utf-8'))

def test_write_points_udp(self):
"""Test write points UDP for TestInfluxDBClient object."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
Expand Down
27 changes: 27 additions & 0 deletions influxdb/tests/server_tests/client_test_with_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,33 @@ def test_write_points_batch(self):
self.assertIn(12, net_out['series'][0]['values'][0])
self.assertIn(12.34, cpu['series'][0]['values'][0])

def test_write_points_batch_generator(self):
"""Test writing points in a batch from a generator."""
dummy_points = [
{"measurement": "cpu_usage", "tags": {"unit": "percent"},
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
{"measurement": "network", "tags": {"direction": "in"},
"time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
{"measurement": "network", "tags": {"direction": "out"},
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
]
dummy_points_generator = (point for point in dummy_points)
self.cli.write_points(points=dummy_points_generator,
tags={"host": "server01",
"region": "us-west"},
batch_size=2)
time.sleep(5)
net_in = self.cli.query("SELECT value FROM network "
"WHERE direction=$dir",
bind_params={'dir': 'in'}
).raw
net_out = self.cli.query("SELECT value FROM network "
"WHERE direction='out'").raw
cpu = self.cli.query("SELECT value FROM cpu_usage").raw
self.assertIn(123, net_in['series'][0]['values'][0])
self.assertIn(12, net_out['series'][0]['values'][0])
self.assertIn(12.34, cpu['series'][0]['values'][0])

def test_query(self):
"""Test querying data back from server."""
self.assertIs(True, self.cli.write_points(dummy_point))
Expand Down