Skip to content

Commit

Permalink
Prefer len(os.sched_getaffinity(0)) over os.cpu_count() (#1866)
Browse files Browse the repository at this point in the history
Closes #30

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Christopher Harris (https://github.com/cwharris)

Approvers:
  - Anuradha Karuppiah (https://github.com/AnuradhaKaruppiah)

URL: #1866
  • Loading branch information
cwharris authored Aug 31, 2024
1 parent d36ec8e commit 583149c
Show file tree
Hide file tree
Showing 33 changed files with 37 additions and 37 deletions.
4 changes: 2 additions & 2 deletions docs/source/developer_guide/guides/2_real_world_phishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ To start, we will need to instantiate and set a few attributes of the `Config` c
config = Config()
config.mode = PipelineModes.NLP

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))
config.feature_length = model_fea_length

with open(labels_file, encoding='UTF-8') as fh:
Expand Down Expand Up @@ -563,7 +563,7 @@ def run_pipeline(use_stage_function: bool,
config.mode = PipelineModes.NLP

# Set the thread count to match our cpu count
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))
config.feature_length = model_fea_length

with open(labels_file, encoding='UTF-8') as fh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ from morpheus.cli.utils import load_labels_file
CppConfig.set_should_use_cpp(False)

config = Config()
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))
config.ae = ConfigAutoEncoder()
config.ae.feature_columns = load_labels_file(get_package_relative_file("data/columns_ae_azure.txt"))
```
Expand Down
2 changes: 1 addition & 1 deletion examples/abp_pcap_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@click.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use.",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/developer_guide/2_1_real_world_phishing/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def run_pipeline(use_stage_function: bool,
config.mode = PipelineModes.NLP

# Set the thread count to match our cpu count
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))
config.feature_length = model_fea_length

with open(labels_file, encoding='UTF-8') as fh:
Expand Down
2 changes: 1 addition & 1 deletion examples/developer_guide/2_2_rabbitmq/read_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def run_pipeline(use_source_function: bool):
configure_logging(log_level=logging.DEBUG)

config = Config()
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

# Create a linear pipeline object
pipeline = LinearPipeline(config)
Expand Down
2 changes: 1 addition & 1 deletion examples/developer_guide/2_2_rabbitmq/write_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def run_pipeline():
input_file = os.path.join(root_dir, 'examples/data/email.jsonlines')

config = Config()
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

# Create a linear pipeline object
pipeline = LinearPipeline(config)
Expand Down
2 changes: 1 addition & 1 deletion examples/developer_guide/4_rabbitmq_cpp_stage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
# Example RabbitMQ stages
This example builds upon the `examples/developer_guide/2_2_rabbitmq` example adding a C++ implementation for the `RabbitMQSourceStage` along with adding package install scripts.

This example adds two flags to the `read_simple.py` script. A `--use_cpp` flag which defaults to `True` and a `--num_threads` flag which defaults to the number of cores on the system as returned by `os.cpu_count()`.
This example adds two flags to the `read_simple.py` script. A `--use_cpp` flag which defaults to `True` and a `--num_threads` flag which defaults to the number of cores on the system as returned by `len(os.sched_getaffinity(0))`.

## Supported Environments
| Environment | Supported | Notes |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@click.option('--use_cpp', default=True)
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def run_pipeline():
input_file = os.path.join(root_dir, 'examples/data/email.jsonlines')

config = Config()
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

# Create a linear pipeline object
pipeline = LinearPipeline(config)
Expand Down
2 changes: 1 addition & 1 deletion examples/digital_fingerprinting/production/grafana/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def run_pipeline(train_users,

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

config.ae = ConfigAutoEncoder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def generate_ae_config(source: str,
pipeline_batch_size: int = 0,
edge_buffer_size: int = 0,
use_cpp: bool = False,
num_threads: int = os.cpu_count()):
num_threads: int = len(os.sched_getaffinity(0))):
config = Config()

CppConfig.set_should_use_cpp(use_cpp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def run_pipeline(train_users,

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

config.ae = ConfigAutoEncoder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def run_pipeline(train_users,

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

config.ae = ConfigAutoEncoder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@
"\n",
"CppConfig.set_should_use_cpp(False)\n",
"\n",
"config.num_threads = os.cpu_count()\n",
"config.num_threads = len(os.sched_getaffinity(0))\n",
"\n",
"config.ae = ConfigAutoEncoder()\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
"\n",
"CppConfig.set_should_use_cpp(False)\n",
"\n",
"config.num_threads = os.cpu_count()\n",
"config.num_threads = len(os.sched_getaffinity(0))\n",
"\n",
"config.ae = ConfigAutoEncoder()\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@
"\n",
"CppConfig.set_should_use_cpp(False)\n",
"\n",
"config.num_threads = os.cpu_count()\n",
"config.num_threads = len(os.sched_getaffinity(0))\n",
"\n",
"config.ae = ConfigAutoEncoder()\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@
"\n",
"CppConfig.set_should_use_cpp(False)\n",
"\n",
"config.num_threads = os.cpu_count()\n",
"config.num_threads = len(os.sched_getaffinity(0))\n",
"\n",
"config.ae = ConfigAutoEncoder()\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@click.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def run_pipeline(train_users,

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

config.ae = ConfigAutoEncoder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def run_pipeline(train_users,

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

config.ae = ConfigAutoEncoder()

Expand Down
2 changes: 1 addition & 1 deletion examples/doca/run_udp_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
)
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
show_default=True,
help="Number of internal pipeline threads to use.",
Expand Down
2 changes: 1 addition & 1 deletion examples/gnn_fraud_detection_pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@click.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use.",
)
Expand Down
4 changes: 2 additions & 2 deletions examples/llm/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def run():
@run.command(help="Runs a simple finite pipeline with a single execution of a LangChain agent from a fixed input")
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down Expand Up @@ -67,7 +67,7 @@ def simple(**kwargs):
@run.command(help="Runs a pipeline LangChain agents which pulls inputs from a Kafka message bus")
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/completion/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def run():
@run.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/rag/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def run():
@run.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vdb_upload/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def run():
)
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/log_parsing/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
@click.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use.",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/ransomware_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
@click.option('--use_cpp', default=False, help="Enable C++ execution for this pipeline, currently this is unsupported.")
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use.",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/sid_visualization/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _generate_frames(self):
@click.option('--use_cpp', default=True)
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use.",
)
Expand Down
2 changes: 1 addition & 1 deletion python/morpheus/morpheus/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def install(**kwargs):

@cli.group(short_help="Run one of the available pipelines", no_args_is_help=True, cls=AliasedGroup)
@click.option('--num_threads',
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use")
@click.option('--pipeline_batch_size',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource):
Maximum number of requests to queue before rejecting requests. If `None` then `config.edge_buffer_size` will be
used.
num_server_threads : int, default None
Number of threads to use for the HTTP server. If `None` then `os.cpu_count()` will be used.
Number of threads to use for the HTTP server. If `None` then `len(os.sched_getaffinity(0))` will be used.
max_payload_size : int, default 10
The maximum size in megabytes of the payload that the server will accept in a single request.
request_timeout_secs : int, default 30
Expand Down Expand Up @@ -117,7 +117,7 @@ def __init__(self,
self._sleep_time = sleep_time
self._queue_timeout = queue_timeout
self._max_queue_size = max_queue_size or config.edge_buffer_size
self._num_server_threads = num_server_threads or os.cpu_count()
self._num_server_threads = num_server_threads or len(os.sched_getaffinity(0))
self._max_payload_size_bytes = max_payload_size * 1024 * 1024
self._request_timeout_secs = request_timeout_secs
self._lines = lines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class HttpServerSinkStage(PassThruTypeMixin, SinglePortStage):
Maximum number of requests to queue before rejecting requests. If `None` then `config.edge_buffer_size` will be
used. Once the queue is full, the incoming edge buffer will begin to fill up.
num_server_threads : int, default None
Number of threads to use for the HTTP server. If `None` then `os.cpu_count()` will be used.
Number of threads to use for the HTTP server. If `None` then `len(os.sched_getaffinity(0))` will be used.
max_rows_per_response : int, optional
Maximum number of rows to include in a single response, by default 10000.
overflow_pct: float, optional
Expand Down Expand Up @@ -103,7 +103,7 @@ def __init__(self,
self._port = port
self._endpoint = endpoint
self._method = method
self._num_server_threads = num_server_threads or os.cpu_count()
self._num_server_threads = num_server_threads or len(os.sched_getaffinity(0))
self._max_rows_per_response = max_rows_per_response
self._overflow_pct = overflow_pct
self._request_timeout_secs = request_timeout_secs
Expand Down
2 changes: 1 addition & 1 deletion tests/common/test_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def make_parse_fn(status: HTTPStatus = HTTPStatus.OK,
@pytest.mark.parametrize("method", ["GET", "POST", "PUT"])
@pytest.mark.parametrize("use_callback", [True, False])
@pytest.mark.parametrize("use_context_mgr", [True, False])
@pytest.mark.parametrize("num_threads", [1, 2, min(8, os.cpu_count())])
@pytest.mark.parametrize("num_threads", [1, 2, min(8, len(os.sched_getaffinity(0)))])
@pytest.mark.parametrize("status,content_type,content",
[(HTTPStatus.OK, MimeTypes.TEXT.value, "OK"),
(HTTPStatus.OK, MimeTypes.JSON.value, '{"test": "OK"}'),
Expand Down

0 comments on commit 583149c

Please sign in to comment.