Skip to content

Commit 9aff14e

Browse files
committed
Improve config
Signed-off-by: Zike Yang <zike@apache.org>
1 parent 15e402d commit 9aff14e

File tree

5 files changed

+77
-149
lines changed

5 files changed

+77
-149
lines changed

sdks/fs-python/examples/config.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# This configuration file defines the settings for the string processing function example.
1717

1818
pulsar:
19-
service_url: "pulsar://localhost:6650" # Required: URL of the Pulsar broker
19+
serviceUrl: "pulsar://localhost:6650" # Required: URL of the Pulsar broker
2020
authPlugin: "" # Optional: Authentication plugin class name
2121
authParams: "" # Optional: Authentication parameters
2222

@@ -33,8 +33,8 @@ subscriptionName: "test-sub"
3333

3434
# Optional: List of request source topics
3535
requestSource:
36-
- pulsar: # SourceSpec structure with pulsar configuration
37-
topic: "string-topic" # Topic name for request messages
36+
pulsar: # SourceSpec structure with pulsar configuration
37+
topic: "string-topic" # Topic name for request messages
3838

3939
# Optional: Output sink configuration
4040
sink:

sdks/fs-python/fs_sdk/config.py

Lines changed: 41 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,54 @@
11
import os
22
import yaml
33
from typing import Dict, Any, Optional, List
4+
from pydantic import BaseModel, Field
45

5-
class Config:
6-
def __init__(self, config_path: str = "config.yaml"):
6+
class PulsarConfig(BaseModel):
7+
service_url: str = "pulsar://localhost:6650"
8+
auth_plugin: str = ""
9+
auth_params: str = ""
10+
max_concurrent_requests: int = 10
11+
max_producer_cache_size: int = 100
12+
13+
class ModuleConfig(BaseModel):
14+
active_module: Optional[str] = None
15+
module_configs: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
16+
17+
class SourceSpec(BaseModel):
18+
pulsar: Dict[str, Any] = Field(default_factory=dict)
19+
20+
class SinkSpec(BaseModel):
21+
pulsar: Dict[str, Any] = Field(default_factory=dict)
22+
23+
class Config(BaseModel):
24+
pulsar: PulsarConfig = Field(default_factory=PulsarConfig)
25+
module: Optional[str] = None
26+
sources: List[SourceSpec] = Field(default_factory=list)
27+
requestSource: Optional[SourceSpec] = None
28+
sink: Optional[SinkSpec] = None
29+
subscription_name: str = "fs-sdk-subscription"
30+
name: Optional[str] = None
31+
description: Optional[str] = None
32+
modules: ModuleConfig = Field(default_factory=ModuleConfig)
33+
config: List[Dict[str, Any]] = Field(default_factory=list)
34+
35+
@classmethod
36+
def from_yaml(cls, config_path: str = "config.yaml") -> "Config":
737
"""
838
Initialize configuration from YAML file.
939
1040
Args:
1141
config_path (str): Path to the configuration file
12-
"""
13-
self.config_path = config_path
14-
self.config = self._load_config()
15-
16-
def _load_config(self) -> Dict[str, Any]:
17-
"""
18-
Load configuration from YAML file.
19-
42+
2043
Returns:
21-
Dict[str, Any]: Configuration dictionary
44+
Config: Configuration instance
2245
"""
23-
if not os.path.exists(self.config_path):
24-
raise FileNotFoundError(f"Configuration file not found: {self.config_path}")
46+
if not os.path.exists(config_path):
47+
raise FileNotFoundError(f"Configuration file not found: {config_path}")
2548

26-
with open(self.config_path, 'r') as f:
27-
return yaml.safe_load(f)
49+
with open(config_path, 'r') as f:
50+
config_data = yaml.safe_load(f)
51+
return cls(**config_data)
2852

2953
def get_config_value(self, config_name: str) -> Any:
3054
"""
@@ -36,93 +60,11 @@ def get_config_value(self, config_name: str) -> Any:
3660
Returns:
3761
Any: The configuration value, or None if not found
3862
"""
39-
config_section = self.config.get('config', [])
40-
for item in config_section:
63+
for item in self.config:
4164
if config_name in item:
4265
return item[config_name]
4366
return None
44-
45-
@property
46-
def service_url(self) -> str:
47-
"""Get Pulsar service URL."""
48-
return self.config.get('pulsar', {}).get('service_url', 'pulsar://localhost:6650')
49-
50-
@property
51-
def auth_plugin(self) -> str:
52-
"""Get Pulsar auth plugin."""
53-
return self.config.get('pulsar', {}).get('authPlugin', '')
54-
55-
@property
56-
def auth_params(self) -> str:
57-
"""Get Pulsar auth parameters."""
58-
return self.config.get('pulsar', {}).get('authParams', '')
59-
60-
@property
61-
def module(self) -> str:
62-
"""Get the module name."""
63-
return self.config.get('module')
64-
65-
@property
66-
def sources(self) -> List[Dict[str, Any]]:
67-
"""Get the sources configuration."""
68-
return self.config.get('sources', [])
69-
70-
@property
71-
def request_sources(self) -> List[Dict[str, Any]]:
72-
"""Get the request sources configuration."""
73-
return self.config.get('requestSource', [])
74-
75-
@property
76-
def sinks(self) -> List[Dict[str, Any]]:
77-
"""Get the sinks configuration."""
78-
return self.config.get('sink', [])
79-
80-
def get_pulsar_source_config(self, source: Dict[str, Any]) -> Dict[str, Any]:
81-
"""Get Pulsar source configuration from a source spec."""
82-
return source.get('pulsar', {})
83-
84-
def get_pulsar_sink_config(self, sink: Dict[str, Any]) -> Dict[str, Any]:
85-
"""Get Pulsar sink configuration from a sink spec."""
86-
return sink.get('pulsar', {})
87-
88-
@property
89-
def request_topic(self) -> str:
90-
"""Get request topic for the active module."""
91-
topic = self.config.get('request_topic')
92-
if not topic:
93-
raise ValueError("request_topic is not set in config.yaml")
94-
return topic
95-
96-
@property
97-
def subscription_name(self) -> str:
98-
"""Get subscription name for the active module."""
99-
return self.config.get('subscription_name', 'fs-sdk-subscription')
100-
101-
@property
102-
def name(self) -> str:
103-
"""Get the function name."""
104-
return self.config.get('name')
105-
106-
@property
107-
def description(self) -> str:
108-
"""Get the function description."""
109-
return self.config.get('description')
110-
111-
@property
112-
def max_concurrent_requests(self) -> int:
113-
"""Get maximum number of concurrent requests."""
114-
return self.config.get('pulsar', {}).get('max_concurrent_requests', 10)
115-
116-
@property
117-
def max_producer_cache_size(self) -> int:
118-
"""Get maximum number of producers to cache."""
119-
return self.config.get('pulsar', {}).get('max_producer_cache_size', 100)
120-
121-
@property
122-
def active_module(self) -> Optional[str]:
123-
"""Get the name of the active module."""
124-
return self.config.get('modules', {}).get('active_module')
12567

12668
def get_module_config(self, module_name: str) -> Dict[str, Any]:
12769
"""Get configuration for a specific module."""
128-
return self.config.get('modules', {}).get(module_name, {})
70+
return self.modules.module_configs.get(module_name, {})

sdks/fs-python/fs_sdk/function.py

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,23 @@ def __init__(
5757
ValueError: If no module is specified in config or if the specified module
5858
doesn't have a corresponding process function.
5959
"""
60-
self.config = Config(config_path)
60+
self.config = Config.from_yaml(config_path)
6161
self.process_funcs = process_funcs
6262

6363
# Create authentication if specified
6464
auth = None
65-
if self.config.auth_plugin:
65+
if self.config.pulsar.auth_plugin:
6666
auth = pulsar.Authentication(
67-
self.config.auth_plugin,
68-
self.config.auth_params
67+
self.config.pulsar.auth_plugin,
68+
self.config.pulsar.auth_params
6969
)
7070

7171
self.client = Client(
72-
self.config.service_url,
72+
self.config.pulsar.service_url,
7373
authentication=auth,
7474
operation_timeout_seconds=30
7575
)
76-
self.semaphore = asyncio.Semaphore(10) # Default max concurrent requests
76+
self.semaphore = asyncio.Semaphore(self.config.pulsar.max_concurrent_requests)
7777
self.metrics = Metrics()
7878
self.metrics_server = MetricsServer(self.metrics)
7979
self._shutdown_event = asyncio.Event()
@@ -111,21 +111,18 @@ def _setup_consumer(self):
111111

112112
# Collect topics from sources
113113
for source in self.config.sources:
114-
pulsar_config = self.config.get_pulsar_source_config(source)
115-
if pulsar_config:
116-
topic = pulsar_config.get('topic')
114+
if 'pulsar' in source:
115+
topic = source['pulsar'].get('topic')
117116
if topic:
118117
topics.append(topic)
119118
logger.info(f"Added source topic: {topic}")
120119

121120
# Collect topics from request sources
122-
for source in self.config.request_sources:
123-
pulsar_config = self.config.get_pulsar_source_config(source)
124-
if pulsar_config:
125-
topic = pulsar_config.get('topic')
126-
if topic:
127-
topics.append(topic)
128-
logger.info(f"Added request source topic: {topic}")
121+
if self.config.requestSource and self.config.requestSource.pulsar:
122+
topic = self.config.requestSource.pulsar.get('topic')
123+
if topic:
124+
topics.append(topic)
125+
logger.info(f"Added request source topic: {topic}")
129126

130127
if not topics:
131128
raise ValueError("No valid sources or request sources found in config")
@@ -255,10 +252,9 @@ async def process_request(self, message):
255252
response_topic = message.properties().get('response_topic')
256253

257254
# If no response_topic is provided, use the sink topic as default
258-
if not response_topic and self.config.sinks:
259-
sink_config = self.config.get_pulsar_sink_config(self.config.sinks[0])
260-
if sink_config and 'topic' in sink_config:
261-
response_topic = sink_config['topic']
255+
if not response_topic and self.config.sink:
256+
if self.config.sink.pulsar and 'topic' in self.config.sink.pulsar:
257+
response_topic = self.config.sink.pulsar['topic']
262258
logger.info(f"Using sink topic as default response topic: {response_topic}")
263259

264260
if not response_topic:

sdks/fs-python/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pulsar-client>=3.0.0
2-
aiohttp>=3.8.0
2+
aiohttp>=3.8.0
3+
pydantic>=2.0.0

sdks/fs-python/tests/test_function.py

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import asyncio
88
from unittest.mock import Mock, patch, AsyncMock
99
from fs_sdk.function import FSFunction
10-
from fs_sdk.config import Config
10+
from fs_sdk.config import Config, PulsarConfig, ModuleConfig, SinkSpec, SourceSpec
1111
from fs_sdk.metrics import Metrics, MetricsServer
1212

1313
class TestFSFunction:
@@ -19,28 +19,17 @@ def mock_config(self):
1919
config = Mock(spec=Config)
2020
config.module = "test_module"
2121
config.subscription_name = "test_subscription"
22-
config.service_url = "pulsar://localhost:6650"
23-
config.auth_plugin = None
24-
config.auth_params = None
25-
config.sources = ["source1"]
26-
config.request_sources = ["request_source1"]
27-
config.sinks = ["sink1"]
28-
# Add the .config dict so property methods work
29-
config.config = {
30-
"pulsar": {
31-
"authPlugin": "",
32-
"authParams": "",
33-
"serviceUrl": "pulsar://localhost:6650"
34-
},
35-
"module": "test_module",
36-
"subscriptionName": "test_subscription",
37-
"sources": [],
38-
"requestSources": [],
39-
"sinks": []
40-
}
41-
# Mock Pulsar config methods
42-
config.get_pulsar_source_config.return_value = {"topic": "test_topic"}
43-
config.get_pulsar_sink_config.return_value = {"topic": "response_topic"}
22+
config.pulsar = PulsarConfig(
23+
service_url="pulsar://localhost:6650",
24+
auth_plugin="",
25+
auth_params="",
26+
max_concurrent_requests=10,
27+
max_producer_cache_size=100
28+
)
29+
config.sources = [SourceSpec(pulsar={"topic": "test_topic"})]
30+
config.requestSource = SourceSpec(pulsar={"topic": "request_topic"})
31+
config.sink = SinkSpec(pulsar={"topic": "response_topic"})
32+
config.modules = ModuleConfig(active_module="test_module")
4433
return config
4534

4635
@pytest.fixture
@@ -86,7 +75,7 @@ def mock_metrics_server(self):
8675
def function(self, mock_config, mock_process_func, mock_client, mock_consumer,
8776
mock_producer, mock_metrics, mock_metrics_server):
8877
"""Create a FSFunction instance with mocks, patching Config to avoid file IO."""
89-
with patch('fs_sdk.function.Config', return_value=mock_config), \
78+
with patch('fs_sdk.function.Config.from_yaml', return_value=mock_config), \
9079
patch('fs_sdk.function.Client', return_value=mock_client), \
9180
patch('fs_sdk.function.Metrics', return_value=mock_metrics), \
9281
patch('fs_sdk.function.MetricsServer', return_value=mock_metrics_server):
@@ -103,7 +92,7 @@ def function(self, mock_config, mock_process_func, mock_client, mock_consumer,
10392
@pytest.mark.asyncio
10493
async def test_init(self, mock_config, mock_process_func):
10594
"""Test FSFunction initialization."""
106-
with patch('fs_sdk.function.Config', return_value=mock_config), \
95+
with patch('fs_sdk.function.Config.from_yaml', return_value=mock_config), \
10796
patch('fs_sdk.function.Client'), \
10897
patch('fs_sdk.function.Metrics'), \
10998
patch('fs_sdk.function.MetricsServer'):
@@ -153,7 +142,7 @@ async def test_process_request_no_response_topic(self, function, mock_metrics):
153142
message = Mock()
154143
message.data.return_value = json.dumps({"test": "data"}).encode('utf-8')
155144
message.properties.return_value = {"request_id": "test_id"}
156-
function.config.sinks = []
145+
function.config.sink = None
157146

158147
# Execute
159148
await function.process_request(message)

0 commit comments

Comments
 (0)