Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add integration test for opentsdb put api #1043

Merged
merged 5 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ jobs:
working-directory: integration_tests
run: |
make run-prom
- name: Run OpenTSDB tests
working-directory: integration_tests
run: |
make run-opentsdb
- name: Upload Logs
if: always()
uses: actions/upload-artifact@v3
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,8 @@ run-mysql:
run-prom:
cd prom && ./run-tests.sh

run-opentsdb:
cd opentsdb && ./run-tests.sh

run-recovery: clean build-ceresdb kill-old-process
cd recovery && ./run.sh && ./run.sh shard_based
3 changes: 3 additions & 0 deletions integration_tests/opentsdb/run-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

python ./test-put.py
350 changes: 350 additions & 0 deletions integration_tests/opentsdb/test-put.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
#!/usr/bin/env python
# coding: utf-8

import requests
import time

api_root = 'http://localhost:5440'
headers = {
'Content-Type': 'application/json'
}
table_prefix = 'opentsdb_test_'
table2_prefix = 'opentsdb_test2_'


def now():
return int(time.time()) * 1000


def execute_sql(sql):
r = requests.post('{}/sql'.format(api_root), json={'query': sql}, headers=headers)
return r


def drop_table(table_name):
sql = """
DROP TABLE IF EXISTS {}
""".format(table_name)
r = execute_sql(sql)
assert r.status_code == 200, r.text


def show_create_table(table_name):
sql = """
SHOW CREATE TABLE {}
""".format(table_name)
r = execute_sql(sql)
assert r.status_code == 200
return r


def execute_sql_query(sql):
r = execute_sql(sql)
assert r.status_code == 200
return r


def execute_put(points):
r = requests.post('{}/opentsdb/api/put'.format(api_root), data=points)
return r


def execute_put_then_assert_fail(points):
r = execute_put(points)
assert r.status_code == 500


def execute_put_then_assert_success(points):
r = execute_put(points)
assert r.status_code == 204


def test_put_validate_error():
ts = now();
# empty json string
execute_put_then_assert_fail("")

# invalid json
execute_put_then_assert_fail("{xxx")

# empty metric
execute_put_then_assert_fail("""
{
"metric": "",
"timestamp": {ts},
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
}
""".replace('{ts}', str(ts)))

# empty tag
execute_put_then_assert_fail("""
{
"metric": "sys.cpu.nice",
"timestamp": {ts},
"value": 18,
"tags": {
}
}
""".replace('{ts}', str(ts)))
zouxiang1993 marked this conversation as resolved.
Show resolved Hide resolved

# empty tag name
execute_put_then_assert_fail("""
{
"metric": "sys.cpu.nice",
"timestamp": {ts},
"value": 18,
"tags": {
"": "web01",
"dc": "lga"
}
}
""".replace('{ts}', str(ts)))

# too small timestamp
execute_put_then_assert_fail("""
{
"metric": "sys.cpu.nice",
"timestamp": 1,
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
}
""".replace('{ts}', str(ts)))

# too big timestamp
execute_put_then_assert_fail("""
{
"metric": "sys.cpu.nice",
"timestamp": 10000000000000,
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
}
""".replace('{ts}', str(ts)))


def test_put_single_point_with_int_value():
ts = now()
table_name = table_prefix + str(ts)
drop_table(table_name)

execute_put_then_assert_success("""
{
"metric": "{metric}",
"timestamp": {ts},
"value": 9527,
"tags": {
"host": "web01",
"dc": "lga"
}
}
""".replace('{metric}', table_name).replace('{ts}', str(ts)))

r = show_create_table(table_name)
assert r.text.__contains__('`tsid` uint64 NOT NULL')
assert r.text.__contains__('`timestamp` timestamp NOT NULL')
assert r.text.__contains__('`dc` string TAG')
assert r.text.__contains__('`host` string TAG')
# value is a bigint column
assert r.text.__contains__('`value` bigint')

r = execute_sql_query("""
SELECT timestamp, dc, host, value FROM {metric}
""".replace('{metric}', table_name))
assert r.text == """{"rows":[{"timestamp":{ts},"dc":"lga","host":"web01","value":9527}]}""".strip().replace('{ts}', str(ts))


def test_put_single_point_with_float_value():
ts = now()
table_name = table_prefix + str(ts)
drop_table(table_name)

execute_put_then_assert_success("""
{
"metric": "{metric}",
"timestamp": {ts},
"value": 95.27,
"tags": {
"host": "web01",
"dc": "lga"
}
}
""".replace('{metric}', table_name).replace('{ts}', str(ts)))

r = show_create_table(table_name)
assert r.text.__contains__('`tsid` uint64 NOT NULL')
assert r.text.__contains__('`timestamp` timestamp NOT NULL')
assert r.text.__contains__('`dc` string TAG')
assert r.text.__contains__('`host` string TAG')
# value is a double column
assert r.text.__contains__('`value` double')

r = execute_sql_query("""
SELECT timestamp, dc, host, value FROM {metric}
""".replace('{metric}', table_name))
assert r.text == """
{"rows":[{"timestamp":{ts},"dc":"lga","host":"web01","value":95.27}]}
""".strip().replace('{ts}', str(ts))


def test_put_single_point_with_second_timestamp():
ts = now()
ts_in_seconds = ts // 1000;
table_name = table_prefix + str(ts)
drop_table(table_name)

execute_put_then_assert_success("""
{
"metric": "{metric}",
"timestamp": {ts},
"value": 95.27,
"tags": {
"host": "web01",
"dc": "lga"
}
}
""".replace('{metric}', table_name).replace('{ts}', str(ts_in_seconds)))

r = execute_sql_query("""
SELECT timestamp, dc, host, value FROM {metric}
""".replace('{metric}', table_name))
assert r.text == """
{"rows":[{"timestamp":{ts},"dc":"lga","host":"web01","value":95.27}]}
""".strip().replace('{ts}', str(ts))


def test_put_multi_points_with_different_tags_in_one_table():
ts = now()
table_name = table_prefix + str(ts)
drop_table(table_name)

execute_put_then_assert_success("""
[
{
"metric": "{metric}",
"timestamp": {ts},
"value": 18,
"tags": {
"host": "web01"
}
},
{
"metric": "{metric}",
"timestamp": {ts},
"value": 9,
"tags": {
"dc": "lga"
}
}
]
""".replace('{metric}', table_name).replace('{ts}', str(ts)))

r = execute_sql_query("""
SELECT timestamp, dc, host, value FROM {metric} ORDER BY value desc
""".replace('{metric}', table_name))
assert r.text == """
{"rows":[{"timestamp":{ts},"dc":null,"host":"web01","value":18},{"timestamp":{ts},"dc":"lga","host":null,"value":9}]}
""".strip().replace('{ts}', str(ts))


# CeresDB internal error: "Column: value in table: ??? data type is not same, expected: bigint, actual: double"
def test_put_multi_points_with_different_datatype_in_one_table():
ts = now()
table_name = table_prefix + str(ts)
drop_table(table_name)

execute_put_then_assert_fail("""
[
{
"metric": "{metric}",
"timestamp": {ts},
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
},
{
"metric": "{metric}",
"timestamp": {ts},
"value": 9.999,
"tags": {
"host": "web02",
"dc": "lga"
}
}
]
""".replace('{metric}', table_name).replace('{ts}', str(ts)))


def test_put_multi_points_in_multi_table():
ts = now()
table_name = table_prefix + str(ts)
table2_name = table2_prefix + str(ts)
drop_table(table_name)
drop_table(table2_name)

execute_put_then_assert_success("""
[
{
"metric": "{metric}",
"timestamp": {ts},
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
},
{
"metric": "{metric2}",
"timestamp": {ts},
"value": 9,
"tags": {
"host": "web02",
"dc": "lga"
}
}
]
""".replace('{metric}', table_name).replace('{metric2}', table2_name).replace('{ts}', str(ts)))

r = execute_sql_query("""
SELECT timestamp, dc, host, value FROM {metric}
""".replace('{metric}', table_name))
assert r.text == """
{"rows":[{"timestamp":{ts},"dc":"lga","host":"web01","value":18}]}
""".strip().replace('{ts}', str(ts))

r = execute_sql_query("""
SELECT timestamp, dc, host, value FROM {metric}
""".replace('{metric}', table2_name))
assert r.text == """
{"rows":[{"timestamp":{ts},"dc":"lga","host":"web02","value":9}]}
""".strip().replace('{ts}', str(ts))


def main():
print("OpenTSDB test start.")

test_put_validate_error()

test_put_single_point_with_int_value()
test_put_single_point_with_float_value()
test_put_single_point_with_second_timestamp()

test_put_multi_points_with_different_tags_in_one_table()
test_put_multi_points_with_different_datatype_in_one_table()
test_put_multi_points_in_multi_table()

print("OpenTSDB test finished.")


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
warp::path!("influxdb" / "v1" / ..).and(write_api.or(query_api))
}

// POST /opentsdb/api/put
fn opentsdb_api(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
Expand Down
Loading