From acc3a3b080a459148494122d93950be003dcea65 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Tue, 27 Jun 2023 18:22:25 -0400 Subject: [PATCH] Dockerize r3 (#5) --- demo/Endpoint200_hard.txt | 8 +- .../docker-compose.yaml | 67 +++++++++ models/uri_drain/uri_drain.py | 9 +- poetry.lock | 132 +++++++++++++++++- pyproject.toml | 3 + servers/simple/server.py | 55 +++++++- servers/simple/worker.py | 1 + servers/tests/dataset_constructor.py | 6 +- 8 files changed, 270 insertions(+), 11 deletions(-) create mode 100644 demo/crazy_endpoint_generator/docker-compose.yaml diff --git a/demo/Endpoint200_hard.txt b/demo/Endpoint200_hard.txt index 4c745d9..7aaf143 100644 --- a/demo/Endpoint200_hard.txt +++ b/demo/Endpoint200_hard.txt @@ -167,4 +167,10 @@ top1.abc.example.com.net.cn/api/v1/users/this-url-is-special-since-domain-hasdig GET:/api/v2/users/1222222223/similar-to-these-single-occurrence-is-not-handled-correctly-will-be-handled-by-nltk!!!!! /haha1 /haha2 -/jdoia1 \ No newline at end of file +/jdoia1 +HikariCP/Connection/getConnection +HikariCP/Connection/disconnect +HikariCP/Connection/close +ABC/123 +ABC/456 +ABC/789 \ No newline at end of file diff --git a/demo/crazy_endpoint_generator/docker-compose.yaml b/demo/crazy_endpoint_generator/docker-compose.yaml new file mode 100644 index 0000000..6d35e31 --- /dev/null +++ b/demo/crazy_endpoint_generator/docker-compose.yaml @@ -0,0 +1,67 @@ +# Copyright 2023 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +services: + oap: + container_name: oap + image: ghcr.io/apache/skywalking/oap + expose: + - 11800 # gRPC + - 12800 # HTTP + networks: + - manual + environment: + SW_CORE_ENABLE_ENDPOINT_NAME_GROUPING_BY_OPENAPI: "false" + SW_CORE_MAX_HTTP_URIS_NUMBER_PER_SVR: "3000" + SW_AI_PIPELINE_URI_RECOGNITION_SERVER_ADDR: "r3" + SW_AI_PIPELINE_URI_RECOGNITION_SERVER_PORT: "17128" + healthcheck: + test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/11800" ] + interval: 5s + timeout: 60s + retries: 120 + ports: + - "12800:12800" + - "11800:11800" + depends_on: + r3: + condition: service_healthy + + ui: + image: ghcr.io/apache/skywalking/ui + container_name: ui + depends_on: + oap: + condition: service_healthy + networks: + - manual + ports: + - "8080:8080" + environment: + SW_OAP_ADDRESS: "http://oap:12800" + + r3: + build: + context: ../.. + dockerfile: Dockerfile + image: r3:latest # Build from docker build . --tag r3 > docker run -d --name r3 -p 17128:17128 r3 + container_name: r3 + healthcheck: + test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/17128" ] + networks: + - manual + ports: + - "17128:17128" +networks: + manual: \ No newline at end of file diff --git a/models/uri_drain/uri_drain.py b/models/uri_drain/uri_drain.py index c26bb06..d48a2ae 100644 --- a/models/uri_drain/uri_drain.py +++ b/models/uri_drain/uri_drain.py @@ -29,8 +29,9 @@ def get_template(self): # or http(s)://user:password@www.domain.top_level_domain # REASONING:: domain can only appear in the first two tokens, so whenever a dot appears, it must be a domain? # Another part of domain handling is done in create_template when they are hiding behind http(s):// - if ':' in self.log_template_tokens[0]: # It's a URI scheme! - scheme = self.log_template_tokens[0] + first_token = self.log_template_tokens[0] + if ':' in first_token: # It's a URI scheme! + scheme = first_token path = '/'.join(self.log_template_tokens[1:]) # TODO: put this into the config file http_methods = ('OPTIONS', 'GET', 'HEAD', 'PUT', 'POST', 'DELETE', 'PATCH', 'TRACE', 'CONNECT') @@ -40,9 +41,11 @@ def get_template(self): template = f"{scheme}//{path}" return template + elif first_token[0].isupper() or '.' in first_token: # It's something like HikariCP/Connection/getConnection + return '/'.join(self.log_template_tokens) else: template = '/'.join(self.log_template_tokens) - return f'{template}' + return f'/{template}' def __str__(self): # return f"ID={str(self.cluster_id).ljust(5)} : size={str(self.size).ljust(10)}: {self.get_template()}" diff --git a/poetry.lock b/poetry.lock index 787c54e..69a085c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -177,6 +177,29 @@ doc = ["Sphinx (>=6.1.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "s test = ["anyio[trio]", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] trio = ["trio (<0.22)"] +[[package]] +name = "apache-skywalking" +version = "1.0.1" +description = "The Python Agent for Apache SkyWalking, which provides the native tracing/metrics/logging/profiling abilities for Python projects." +optional = false +python-versions = ">=3.7,<3.12" +files = [ + {file = "apache_skywalking-1.0.1-py3-none-any.whl", hash = "sha256:c6f1cbd2ecd2f1effe517323aa0ba78f1eafe49e3eeaf81b63d6eca6e479934a"}, + {file = "apache_skywalking-1.0.1.tar.gz", hash = "sha256:18fbe07f6e12daa6b502385de1586fafe8b9c1dbef240bb1896bb8a4a8c277a7"}, +] + +[package.dependencies] +grpcio = "*" +grpcio-tools = "*" +packaging = "*" +psutil = "*" +wrapt = "*" + +[package.extras] +all = ["kafka-python", "requests (>=2.26.0)"] +http = ["requests (>=2.26.0)"] +kafka = ["kafka-python"] + [[package]] name = "astor" version = "0.8.1" @@ -234,6 +257,17 @@ docs = ["furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib- tests = ["attrs[tests-no-zope]", "zope-interface"] tests-no-zope = ["cloudpickle", "hypothesis", "mypy (>=1.1.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +[[package]] +name = "blinker" +version = "1.6.2" +description = "Fast, simple object-to-object and broadcast signaling" +optional = false +python-versions = ">=3.7" +files = [ + {file = "blinker-1.6.2-py3-none-any.whl", hash = "sha256:c3d739772abb7bc2860abf5f2ec284223d9ad5c76da018234f6f50d6f31ab1f0"}, + {file = "blinker-1.6.2.tar.gz", hash = "sha256:4afd3de66ef3a9f8067559fb7a1cbe555c17dcbe15971b05d1b625c3e7abe213"}, +] + [[package]] name = "cachetools" version = "5.3.1" @@ -638,6 +672,29 @@ ci = ["coverage (==4.*)", "coveralls", "flake8-builtins", "flake8-commas", "flak dev = ["coverage (==4.*)", "flake8-builtins", "flake8-commas", "flake8-fixme", "flake8-print", "flake8-quotes", "flake8-todo", "pytest (>=4)", "pytest-cov (>=2)"] test = ["coverage (==4.*)", "flake8-builtins", "flake8-commas", "flake8-fixme", "flake8-print", "flake8-quotes", "flake8-todo", "pytest (>=4)", "pytest-cov (>=2)"] +[[package]] +name = "flask" +version = "2.3.2" +description = "A simple framework for building complex web applications." +optional = false +python-versions = ">=3.8" +files = [ + {file = "Flask-2.3.2-py3-none-any.whl", hash = "sha256:77fd4e1249d8c9923de34907236b747ced06e5467ecac1a7bb7115ae0e9670b0"}, + {file = "Flask-2.3.2.tar.gz", hash = "sha256:8c2f9abd47a9e8df7f0c3f091ce9497d011dc3b31effcf4c85a6e2b50f4114ef"}, +] + +[package.dependencies] +blinker = ">=1.6.2" +click = ">=8.1.3" +importlib-metadata = {version = ">=3.6.0", markers = "python_version < \"3.10\""} +itsdangerous = ">=2.1.2" +Jinja2 = ">=3.1.2" +Werkzeug = ">=2.3.3" + +[package.extras] +async = ["asgiref (>=3.2)"] +dotenv = ["python-dotenv"] + [[package]] name = "flynt" version = "0.76" @@ -1100,6 +1157,25 @@ files = [ {file = "idna-3.4.tar.gz", hash = "sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4"}, ] +[[package]] +name = "importlib-metadata" +version = "6.7.0" +description = "Read metadata from Python packages" +optional = false +python-versions = ">=3.7" +files = [ + {file = "importlib_metadata-6.7.0-py3-none-any.whl", hash = "sha256:cb52082e659e97afc5dac71e79de97d8681de3aa07ff18578330904a9d18e5b5"}, + {file = "importlib_metadata-6.7.0.tar.gz", hash = "sha256:1aaf550d4f73e5d6783e7acb77aec43d49da8017410afae93822cc9cca98c4d4"}, +] + +[package.dependencies] +zipp = ">=0.5" + +[package.extras] +docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +perf = ["ipython"] +testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-mypy (>=0.9.1)", "pytest-perf (>=0.9.2)", "pytest-ruff"] + [[package]] name = "importlib-resources" version = "5.12.0" @@ -1164,6 +1240,17 @@ pipfile-deprecated-finder = ["pip-shims (>=0.5.2)", "pipreqs", "requirementslib" plugins = ["setuptools"] requirements-deprecated-finder = ["pip-api", "pipreqs"] +[[package]] +name = "itsdangerous" +version = "2.1.2" +description = "Safely pass data to untrusted environments and back." +optional = false +python-versions = ">=3.7" +files = [ + {file = "itsdangerous-2.1.2-py3-none-any.whl", hash = "sha256:2c2349112351b88699d8d4b6b075022c0808887cb7ad10069318a8b0bc88db44"}, + {file = "itsdangerous-2.1.2.tar.gz", hash = "sha256:5dbbc68b317e5e42f327f9021763545dc3fc3bfe22e6deb96aaf1fc38874156a"}, +] + [[package]] name = "jinja2" version = "3.1.2" @@ -1992,6 +2079,32 @@ files = [ {file = "protobuf-4.23.3.tar.gz", hash = "sha256:7a92beb30600332a52cdadbedb40d33fd7c8a0d7f549c440347bc606fb3fe34b"}, ] +[[package]] +name = "psutil" +version = "5.9.5" +description = "Cross-platform lib for process and system monitoring in Python." +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +files = [ + {file = "psutil-5.9.5-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:be8929ce4313f9f8146caad4272f6abb8bf99fc6cf59344a3167ecd74f4f203f"}, + {file = "psutil-5.9.5-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:ab8ed1a1d77c95453db1ae00a3f9c50227ebd955437bcf2a574ba8adbf6a74d5"}, + {file = "psutil-5.9.5-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:4aef137f3345082a3d3232187aeb4ac4ef959ba3d7c10c33dd73763fbc063da4"}, + {file = "psutil-5.9.5-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:ea8518d152174e1249c4f2a1c89e3e6065941df2fa13a1ab45327716a23c2b48"}, + {file = "psutil-5.9.5-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:acf2aef9391710afded549ff602b5887d7a2349831ae4c26be7c807c0a39fac4"}, + {file = "psutil-5.9.5-cp27-none-win32.whl", hash = "sha256:5b9b8cb93f507e8dbaf22af6a2fd0ccbe8244bf30b1baad6b3954e935157ae3f"}, + {file = "psutil-5.9.5-cp27-none-win_amd64.whl", hash = "sha256:8c5f7c5a052d1d567db4ddd231a9d27a74e8e4a9c3f44b1032762bd7b9fdcd42"}, + {file = "psutil-5.9.5-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:3c6f686f4225553615612f6d9bc21f1c0e305f75d7d8454f9b46e901778e7217"}, + {file = "psutil-5.9.5-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7a7dd9997128a0d928ed4fb2c2d57e5102bb6089027939f3b722f3a210f9a8da"}, + {file = "psutil-5.9.5-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:89518112647f1276b03ca97b65cc7f64ca587b1eb0278383017c2a0dcc26cbe4"}, + {file = "psutil-5.9.5-cp36-abi3-win32.whl", hash = "sha256:104a5cc0e31baa2bcf67900be36acde157756b9c44017b86b2c049f11957887d"}, + {file = "psutil-5.9.5-cp36-abi3-win_amd64.whl", hash = "sha256:b258c0c1c9d145a1d5ceffab1134441c4c5113b2417fafff7315a917a026c3c9"}, + {file = "psutil-5.9.5-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:c607bb3b57dc779d55e1554846352b4e358c10fff3abf3514a7a6601beebdb30"}, + {file = "psutil-5.9.5.tar.gz", hash = "sha256:5410638e4df39c54d957fc51ce03048acd8e6d60abc0f5107af51e5fb566eb3c"}, +] + +[package.extras] +test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"] + [[package]] name = "pycodestyle" version = "2.9.1" @@ -2707,6 +2820,23 @@ files = [ {file = "websockets-11.0.3.tar.gz", hash = "sha256:88fc51d9a26b10fc331be344f1781224a375b78488fc343620184e95a4b27016"}, ] +[[package]] +name = "werkzeug" +version = "2.3.6" +description = "The comprehensive WSGI web application library." +optional = false +python-versions = ">=3.8" +files = [ + {file = "Werkzeug-2.3.6-py3-none-any.whl", hash = "sha256:935539fa1413afbb9195b24880778422ed620c0fc09670945185cce4d91a8890"}, + {file = "Werkzeug-2.3.6.tar.gz", hash = "sha256:98c774df2f91b05550078891dee5f0eb0cb797a522c757a2452b9cee5b202330"}, +] + +[package.dependencies] +MarkupSafe = ">=2.1.1" + +[package.extras] +watchdog = ["watchdog (>=2.3)"] + [[package]] name = "wrapt" version = "1.15.0" @@ -2896,4 +3026,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = ">=3.8,<3.12" -content-hash = "a43e684e6f8e61f5716720da7393e81424c0676fd6323ca37d5d3713edac9d2a" +content-hash = "ea439395e3b0076ddc0bcae98b2db221854d40383d7cc32a884a221b4d0db242" diff --git a/pyproject.toml b/pyproject.toml index acdda2e..fe90fb9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,9 @@ cachetools = "^5.3.1" nltk = "^3.8.1" inflect = "^6.0.4" pytest = "^7.3.2" +apache-skywalking = "^1.0.1" +flask = "^2.3.2" + diff --git a/servers/simple/server.py b/servers/simple/server.py index 28a8189..d71fedb 100644 --- a/servers/simple/server.py +++ b/servers/simple/server.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import sys from concurrent import futures import grpc @@ -36,6 +37,8 @@ def __init__(self, uri_main_queue, shared_results_object): self.uri_main_queue = uri_main_queue async def fetchAllPatterns(self, request, context): + # TODO OAP SIDE OR THIS SIDE must save the version, e.g. oap should check if version is > got version, since + # this is a stateful service and it may crash and restart print('-================-') print( f'> Received fetchAllPatterns request for service <{request.service}>, ' @@ -43,7 +46,7 @@ async def fetchAllPatterns(self, request, context): version = str(self.shared_results_object.get_version(service=request.service)) if version == '0': - version = 'NULL' # Expected on OAP side, temp fix + version = 'NULL' # OAP side is NULL, so we must not return NULL otherwise it will always be NULL # https://github.com/apache/skywalking/blob/master/oap-server/ai-pipeline/src/main/java/org # /apache/skywalking/oap/server/ai/pipeline/services/HttpUriRecognitionService.java#LL39C32-L39C32 @@ -53,7 +56,13 @@ async def fetchAllPatterns(self, request, context): print(f'Version do not match, local:{version} vs oap:{request.version}') - patterns = [Pattern(pattern=cluster) for cluster in self.shared_results_object.get_dict_field(request.service)] + cluster_candidates = self.shared_results_object.get_dict_field(request.service) + patterns = [] + for cluster in cluster_candidates: + if '{var}' in cluster: + patterns.append(Pattern(pattern=cluster)) + else: # TODO this is for post processing feature to be added + print("Skipping pattern without {var}, OAP won't need this") print(f'Returning {len(patterns)} patterns') print('-================-') @@ -63,8 +72,13 @@ async def fetchAllPatterns(self, request, context): async def feedRawData(self, request, context): """ Offload CPU intensive work to a separate process via a queue + + There will always be a User service, its in topology, but it will not call fetchAllPatterns """ print(f'> Received feedRawData request for service {request.service}') + if request.service == 'User': + # It should not be called + return Empty() uris = [str(uri.name) for uri in request.unrecognizedUris] service = str(request.service) self.uri_main_queue.put((uris, service)) @@ -87,7 +101,42 @@ async def serve(uri_main_queue, shared_results_object): def run_server(uri_main_queue, shared_results_object): - asyncio.run(serve(uri_main_queue=uri_main_queue, shared_results_object=shared_results_object)) + loop = asyncio.get_event_loop() + try: + # Here `amain(loop)` is the core coroutine that may spawn any + # number of tasks + sys.exit(loop.run_until_complete(serve(uri_main_queue, shared_results_object))) + except KeyboardInterrupt: + # Optionally show a message if the shutdown may take a while + print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True) + + quit() + # TODO Handle interrupt and gracefully shutdown + """ + Learn from this + https://stackoverflow.com/questions/30765606/whats-the-correct-way-to-clean-up-after-an-interrupted-event-loop + """ + # Do not show `asyncio.CancelledError` exceptions during shutdown + # (a lot of these may be generated, skip this if you prefer to see them) + def shutdown_exception_handler(loop, context): + if "exception" not in context \ + or not isinstance(context["exception"], asyncio.CancelledError): + loop.default_exception_handler(context) + + loop.set_exception_handler(shutdown_exception_handler) + + # Handle shutdown gracefully by waiting for all tasks to be cancelled + tasks = asyncio.gather(*asyncio.all_tasks(loop=loop), return_exceptions=True) + tasks.add_done_callback(lambda t: loop.stop()) + tasks.cancel() + + # Keep the event loop running until it is either destroyed or all + # tasks have really terminated + while not tasks.done() and not loop.is_closed(): + loop.run_forever() + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() if __name__ == '__main__': diff --git a/servers/simple/worker.py b/servers/simple/worker.py index 581cc86..da7361a 100644 --- a/servers/simple/worker.py +++ b/servers/simple/worker.py @@ -35,6 +35,7 @@ def run_worker(uri_main_queue, shared_results_object): uri_package = uri_main_queue.get() print('====================') print(f'currently have drain instances for {len(drain_instances)} services') + print(f'drain_instances.keys() = {drain_instances.keys()}') print('-================-') uris, service = uri_package[0], uri_package[1] # print(uri_main_queue.get(timeout=1)) diff --git a/servers/tests/dataset_constructor.py b/servers/tests/dataset_constructor.py index b9b2c93..93a7dfe 100644 --- a/servers/tests/dataset_constructor.py +++ b/servers/tests/dataset_constructor.py @@ -22,10 +22,10 @@ def get_mock_data(): in_file = 'Endpoint100_trivial_3k_repeat.txt' with open(os.path.join(script_dir, in_file)) as f: endpoints = f.read().splitlines() - #counter = Counter(endpoints) + # counter = Counter(endpoints) - #counter_list = list(counter.keys()) - #print(f'count of counter_list: {len(counter_list)}') + # counter_list = list(counter.keys()) + # print(f'count of counter_list: {len(counter_list)}') return endpoints