Skip to content

Commit b8fa116

Browse files
tjholmjyecusch
andauthored
fix: add on method to BucketRef
--------- Co-authored-by: Jye Cusch <jye.cusch@nitric.io>
1 parent 6b20506 commit b8fa116

File tree

8 files changed

+130
-108
lines changed

8 files changed

+130
-108
lines changed

nitric/application.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def run(cls) -> None:
7575

7676
loop.run_until_complete(asyncio.gather(*[wkr.start() for wkr in cls._workers]))
7777
except KeyboardInterrupt:
78+
7879
print("\nexiting")
7980
except ConnectionRefusedError:
8081
raise NitricUnavailableException(

nitric/context.py

Lines changed: 1 addition & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,10 @@
2828
from opentelemetry import propagate
2929

3030
from nitric.proto.schedules.v1 import ServerMessage as ScheduleServerMessage
31-
from nitric.proto.storage.v1 import BlobEventRequest, BlobEventType
3231
from nitric.proto.topics.v1 import ClientMessage as TopicClientMessage
3332
from nitric.proto.topics.v1 import MessageResponse as TopicResponse
3433
from nitric.proto.topics.v1 import ServerMessage as TopicServerMessage
3534

36-
# from nitric.proto.websockets.v1 import ServerMessage as WebsocketServerMessage
37-
# from nitric.proto.websockets.v1 import WebsocketEventResponse
38-
3935
Record = Dict[str, Union[str, List[str]]]
4036
PROPAGATOR = propagate.get_global_textmap()
4137

@@ -255,73 +251,6 @@ def __init__(self, request: AnyWebsocketRequest, response: Optional[AnyWebsocket
255251
self.res = WebsocketResponse()
256252

257253

258-
class BucketNotifyRequest:
259-
"""Represents a translated Event, from a subscribed bucket notification, forwarded from the Nitric Membrane."""
260-
261-
bucket_name: str
262-
key: str
263-
notification_type: BlobEventType
264-
265-
def __init__(self, bucket_name: str, key: str, notification_type: BlobEventType):
266-
"""Construct a new EventRequest."""
267-
self.bucket_name = bucket_name
268-
self.key = key
269-
self.notification_type = notification_type
270-
271-
272-
class BucketNotifyResponse:
273-
"""Represents the response to a trigger from a Bucket."""
274-
275-
def __init__(self, success: bool = True):
276-
"""Construct a new BucketNotificationResponse."""
277-
self.success = success
278-
279-
280-
class BucketNotificationContext:
281-
"""Represents the full request/response context for a bucket notification trigger."""
282-
283-
def __init__(self, request: BucketNotifyRequest, response: Optional[BucketNotifyResponse] = None):
284-
"""Construct a new BucketNotificationContext."""
285-
self.req = request
286-
self.res = response if response else BucketNotifyResponse()
287-
288-
289-
class FileNotifyRequest(BucketNotifyRequest):
290-
"""Represents a translated Event, from a subscribed bucket notification, forwarded from the Nitric Membrane."""
291-
292-
def __init__(
293-
self,
294-
bucket_name: str,
295-
bucket_ref: Any, # can't import BucketRef due to circular dependency problems
296-
key: str,
297-
notification_type: BlobEventType,
298-
):
299-
"""Construct a new FileNotificationRequest."""
300-
super().__init__(bucket_name=bucket_name, key=key, notification_type=notification_type)
301-
self.file = bucket_ref.file(key)
302-
303-
304-
class FileNotificationContext(BucketNotificationContext):
305-
"""Represents the full request/response context for a bucket notification trigger."""
306-
307-
def __init__(self, request: FileNotifyRequest, response: Optional[BucketNotifyResponse] = None):
308-
"""Construct a new FileNotificationContext."""
309-
super().__init__(request=request, response=response)
310-
self.req = request
311-
312-
@staticmethod
313-
def _from_client_message_with_bucket(msg: BlobEventRequest, bucket_ref) -> FileNotificationContext:
314-
"""Construct a new FileNotificationTrigger from a Bucket Notification trigger from the Nitric Membrane."""
315-
return FileNotificationContext(
316-
request=FileNotifyRequest(
317-
bucket_name=msg.bucket_name,
318-
key=msg.blob_event.key,
319-
bucket_ref=bucket_ref,
320-
notification_type=msg.blob_event.type,
321-
)
322-
)
323-
324-
325254
# == Schedules ==
326255

327256

@@ -352,16 +281,7 @@ def __init__(self, msg: ScheduleServerMessage):
352281
self.res = IntervalResponse(msg.id)
353282

354283

355-
C = TypeVar(
356-
"C",
357-
TriggerContext,
358-
HttpContext,
359-
MessageContext,
360-
FileNotificationContext,
361-
BucketNotificationContext,
362-
WebsocketContext,
363-
IntervalContext,
364-
)
284+
C = TypeVar("C")
365285

366286

367287
class Middleware(Protocol, Generic[C]):
@@ -383,15 +303,11 @@ async def __call__(self, ctx: C) -> C | None:
383303
HttpMiddleware = Middleware[HttpContext]
384304
EventMiddleware = Middleware[MessageContext]
385305
IntervalMiddleware = Middleware[IntervalContext]
386-
BucketNotificationMiddleware = Middleware[BucketNotificationContext]
387-
FileNotificationMiddleware = Middleware[FileNotificationContext]
388306
WebsocketMiddleware = Middleware[WebsocketContext]
389307

390308
HttpHandler = Handler[HttpContext]
391309
EventHandler = Handler[MessageContext]
392310
IntervalHandler = Handler[IntervalContext]
393-
BucketNotificationHandler = Handler[BucketNotificationContext]
394-
FileNotificationHandler = Handler[FileNotificationContext]
395311
WebsocketHandler = Handler[WebsocketContext]
396312

397313

nitric/resources/apis.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def __init__(
118118
if middleware is None:
119119
middleware = []
120120
if security is None:
121-
security = {}
121+
security = []
122122
self.middleware = middleware
123123
self.security = security
124124
self.path = path
@@ -202,7 +202,7 @@ def _route(self, match: str, opts: Optional[RouteOptions] = None) -> Route:
202202
opts = RouteOptions()
203203

204204
if self.middleware is not None:
205-
opts.middleware = self.middleware + opts.middleware
205+
opts.middleware = (self.middleware + opts.middleware) if opts.middleware is not None else self.middleware
206206

207207
r = Route(self, match, opts)
208208
self.routes.append(r)

nitric/resources/buckets.py

Lines changed: 121 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@
3232

3333
from nitric.application import Nitric
3434
from nitric.bidi import AsyncNotifierList
35-
from nitric.context import BucketNotificationContext, BucketNotificationHandler, BucketNotifyRequest, FunctionServer
35+
from nitric.context import FunctionServer, Handler, Middleware
3636
from nitric.exception import InvalidArgumentException, exception_from_grpc_error
3737
from nitric.proto.resources.v1 import Action, ResourceDeclareRequest, ResourceIdentifier, ResourceType
3838
from nitric.proto.storage.v1 import (
39+
BlobEventRequest,
3940
BlobEventResponse,
4041
BlobEventType,
4142
ClientMessage,
@@ -54,6 +55,84 @@
5455
from nitric.utils import new_default_channel
5556

5657

58+
class BucketNotifyRequest:
59+
"""Represents a translated Event, from a subscribed bucket notification, forwarded from the Nitric Membrane."""
60+
61+
bucket_name: str
62+
key: str
63+
notification_type: BlobEventType
64+
bucket: BucketRef
65+
file: FileRef
66+
67+
def __init__(self, bucket_name: str, key: str, notification_type: BlobEventType):
68+
"""Construct a new BucketNotifyRequest."""
69+
self.bucket_name = bucket_name
70+
self.key = key
71+
self.notification_type = notification_type
72+
self.bucket = BucketRef(bucket_name)
73+
self.file = self.bucket.file(key)
74+
75+
76+
class BucketNotifyResponse:
77+
"""Represents the response to a trigger from a Bucket."""
78+
79+
def __init__(self, success: bool = True):
80+
"""Construct a new BucketNotificationResponse."""
81+
self.success = success
82+
83+
84+
class BucketNotificationContext:
85+
"""Represents the full request/response context for a bucket notification trigger."""
86+
87+
def __init__(self, request: BucketNotifyRequest, response: Optional[BucketNotifyResponse] = None):
88+
"""Construct a new BucketNotificationContext."""
89+
self.req = request
90+
self.res = response if response else BucketNotifyResponse()
91+
92+
93+
class FileNotifyRequest(BucketNotifyRequest):
94+
"""Represents a translated Event, from a subscribed bucket notification, forwarded from the Nitric Membrane."""
95+
96+
def __init__(
97+
self,
98+
bucket_name: str,
99+
bucket_ref: BucketRef,
100+
key: str,
101+
notification_type: BlobEventType,
102+
):
103+
"""Construct a new FileNotificationRequest."""
104+
super().__init__(bucket_name=bucket_name, key=key, notification_type=notification_type)
105+
self.file = bucket_ref.file(key)
106+
107+
108+
class FileNotificationContext(BucketNotificationContext):
109+
"""Represents the full request/response context for a bucket notification trigger."""
110+
111+
def __init__(self, request: FileNotifyRequest, response: Optional[BucketNotifyResponse] = None):
112+
"""Construct a new FileNotificationContext."""
113+
super().__init__(request=request, response=response)
114+
self.req = request
115+
116+
@staticmethod
117+
def _from_client_message_with_bucket(msg: BlobEventRequest, bucket_ref) -> FileNotificationContext:
118+
"""Construct a new FileNotificationTrigger from a Bucket Notification trigger from the Nitric Membrane."""
119+
return FileNotificationContext(
120+
request=FileNotifyRequest(
121+
bucket_name=msg.bucket_name,
122+
key=msg.blob_event.key,
123+
bucket_ref=bucket_ref,
124+
notification_type=msg.blob_event.type,
125+
)
126+
)
127+
128+
129+
BucketNotificationMiddleware = Middleware[BucketNotificationContext]
130+
BucketNotificationHandler = Handler[BucketNotificationContext]
131+
132+
FileNotificationMiddleware = Middleware[FileNotificationContext]
133+
FileNotificationHandler = Handler[FileNotificationContext]
134+
135+
57136
class BucketRef(object):
58137
"""A reference to a deployed storage bucket, used to interact with the bucket at runtime."""
59138

@@ -90,6 +169,21 @@ async def exists(self, key: str) -> bool:
90169
)
91170
return resp.exists
92171

172+
def on(
173+
self, notification_type: str, notification_prefix_filter: str
174+
) -> Callable[[BucketNotificationHandler], None]:
175+
"""Create and return a bucket notification decorator for this bucket."""
176+
177+
def decorator(func: BucketNotificationHandler) -> None:
178+
Listener(
179+
bucket_name=self.name,
180+
notification_type=notification_type,
181+
notification_prefix_filter=notification_prefix_filter,
182+
handler=func,
183+
)
184+
185+
return decorator
186+
93187

94188
class FileMode(Enum):
95189
"""Definition of available operation modes for file signed URLs."""
@@ -121,7 +215,7 @@ async def write(self, body: bytes):
121215
Will create the file if it doesn't already exist.
122216
"""
123217
try:
124-
await self._bucket._storage_stub.write( # type: ignore pylint: disable=protected-access
218+
await self._bucket._storage_stub.write(
125219
storage_write_request=StorageWriteRequest(bucket_name=self._bucket.name, key=self.key, body=body)
126220
)
127221
except GRPCError as grpc_err:
@@ -130,7 +224,7 @@ async def write(self, body: bytes):
130224
async def read(self) -> bytes:
131225
"""Read this files contents from the bucket."""
132226
try:
133-
response = await self._bucket._storage_stub.read( # type: ignore pylint: disable=protected-access
227+
response = await self._bucket._storage_stub.read(
134228
storage_read_request=StorageReadRequest(bucket_name=self._bucket.name, key=self.key)
135229
)
136230
return response.body
@@ -140,7 +234,7 @@ async def read(self) -> bytes:
140234
async def delete(self):
141235
"""Delete this file from the bucket."""
142236
try:
143-
await self._bucket._storage_stub.delete( # type: ignore pylint: disable=protected-access
237+
await self._bucket._storage_stub.delete(
144238
storage_delete_request=StorageDeleteRequest(bucket_name=self._bucket.name, key=self.key)
145239
)
146240
except GRPCError as grpc_err:
@@ -150,27 +244,33 @@ async def upload_url(self, expiry: Optional[Union[timedelta, int]] = None):
150244
"""
151245
Get a temporary writable URL to this file.
152246
153-
Parameters:
154-
155-
expiry (timedelta or int, optional): The expiry time for the signed URL.
156-
If an integer is provided, it is treated as seconds. Default is 600 seconds.
247+
Parameters
248+
----------
249+
expiry : int, timedelta, optional
250+
The expiry time for the signed URL.
251+
If an integer is provided, it is treated as seconds. Default is 600 seconds.
157252
158-
Returns:
253+
Returns
254+
-------
159255
str: The signed URL.
256+
160257
"""
161258
return await self._sign_url(mode=FileMode.WRITE, expiry=expiry)
162259

163260
async def download_url(self, expiry: Optional[Union[timedelta, int]] = None):
164261
"""
165262
Get a temporary readable URL to this file.
166263
167-
Parameters:
168-
169-
expiry (timedelta or int, optional): The expiry time for the signed URL.
170-
If an integer is provided, it is treated as seconds. Default is 600 seconds.
264+
Parameters
265+
----------
266+
expiry : int, timedelta, optional
267+
The expiry time for the signed URL.
268+
If an integer is provided, it is treated as seconds. Default is 600 seconds.
171269
172-
Returns:
270+
Returns
271+
-------
173272
str: The signed URL.
273+
174274
"""
175275
return await self._sign_url(mode=FileMode.READ, expiry=expiry)
176276

@@ -182,7 +282,7 @@ async def _sign_url(self, mode: FileMode = FileMode.READ, expiry: Optional[Union
182282
expiry = timedelta(seconds=expiry)
183283

184284
try:
185-
response = await self._bucket._storage_stub.pre_sign_url( # type: ignore pylint: disable=protected-access
285+
response = await self._bucket._storage_stub.pre_sign_url(
186286
storage_pre_sign_url_request=StoragePreSignUrlRequest(
187287
bucket_name=self._bucket.name, key=self.key, operation=mode.to_request_operation(), expiry=expiry
188288
)
@@ -257,7 +357,7 @@ def _perms_to_actions(self, *args: BucketPermission) -> List[Action]:
257357
return [action for perm in args for action in permission_actions_map[perm]]
258358

259359
def _to_resource_id(self) -> ResourceIdentifier:
260-
return ResourceIdentifier(name=self.name, type=ResourceType.Bucket) # type:ignore
360+
return ResourceIdentifier(name=self.name, type=ResourceType.Bucket)
261361

262362
def allow(
263363
self,
@@ -316,6 +416,7 @@ def __init__(
316416
key_prefix_filter=notification_prefix_filter,
317417
)
318418

419+
# noinspection PyProtectedMember
319420
Nitric._register_worker(self)
320421

321422
async def _listener_request_iterator(self):
@@ -359,9 +460,12 @@ async def start(self) -> None:
359460
print(f"Stream terminated: {e.message}")
360461
except grpclib.exceptions.StreamTerminatedError:
361462
print("Stream from membrane closed.")
463+
except KeyboardInterrupt:
464+
print("Keyboard interrupt")
362465
finally:
363466
print("Closing client stream")
364467
channel.close()
468+
print("Listener stopped")
365469

366470

367471
def bucket(name: str) -> Bucket:
@@ -370,4 +474,4 @@ def bucket(name: str) -> Bucket:
370474
371475
If a bucket has already been registered with the same name, the original reference will be reused.
372476
"""
373-
return Nitric._create_resource(Bucket, name) # type: ignore pylint: disable=protected-access
477+
return Nitric._create_resource(Bucket, name)

nitric/resources/queues.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def __init__(self, name: str):
177177
super().__init__(name)
178178

179179
def _to_resource_id(self) -> ResourceIdentifier:
180-
return ResourceIdentifier(name=self.name, type=ResourceType.Queue) # type:ignore
180+
return ResourceIdentifier(name=self.name, type=ResourceType.Queue)
181181

182182
def _perms_to_actions(self, *args: QueuePermission) -> List[Action]:
183183
permission_actions_map: dict[QueuePermission, List[Action]] = {

0 commit comments

Comments
 (0)