Skip to content

Commit 1f08a8b

Browse files
committed
drop debug
1 parent 90f3f07 commit 1f08a8b

File tree

3 files changed

+1
-68
lines changed

3 files changed

+1
-68
lines changed

python/cog/server/connection.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ class AsyncConnection(Generic[X]):
1818
def __init__(self, conn: Connection) -> None:
1919
self.wrapped_conn = conn
2020
self.started = False
21-
print("conn __init__")
22-
# perhaps the lock should be here
23-
# self.sync_lock = threading.Lock()
2421

2522
async def async_init(self) -> None:
2623
if self.started:
@@ -29,10 +26,10 @@ async def async_init(self) -> None:
2926
# mp may have handled something already but let's dup so exit is clean
3027
dup_fd = os.dup(fd)
3128
sock = socket.socket(fileno=dup_fd)
32-
# sock = socket.socket(fileno=fd)
3329
# we don't want to see EAGAIN, we'd rather wait
3430
# however, perhaps this is wrong and in some cases this could still block terribly
3531
# sock.setblocking(False)
32+
sock.setblocking(True)
3633
# TODO: use /proc/sys/net/core/rmem_max, but special-case language models
3734
sz = 65536
3835
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, sz)
@@ -94,11 +91,6 @@ def _send_bytes(self, buf: bytes) -> None:
9491
def send(self, obj: Any) -> None:
9592
self._send_bytes(_ForkingPickler.dumps(obj, protocol=5))
9693

97-
# # perhaps we could do it like this
98-
# def send_sync(self, obj: Any) -> None:
99-
# with self.sync_lock:
100-
# self.wrapped_conn.send(obj)
101-
10294
# we could implement async def drain() but it's not really necessary for our purposes
10395

10496
def close(self) -> None:

python/cog/server/helpers.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@
77
from typing import Callable, Optional, Sequence, TextIO
88

99

10-
def debug(*args: str, f: io.IOBase = open("/tmp/debug", "a")) -> None: # noqa
11-
print(*args, file=f, flush=True)
12-
13-
1410
async def async_fdopen(fd: int) -> asyncio.StreamReader:
1511
loop = asyncio.get_running_loop()
1612
reader = asyncio.StreamReader()
@@ -102,31 +98,23 @@ def __init__(
10298

10399
def drain(self) -> None:
104100
if self.is_async:
105-
debug("ignoring drain")
106101
# if we're async, we assume that logs will be processed promptly,
107102
# and we don't want to block the event loop
108103
return
109104
self.drain_event.clear()
110105
for stream in self._streams:
111-
debug(repr(stream), stream.name)
112106
stream.write(self.drain_token + "\n")
113-
debug(repr(stream), "flush")
114107
stream.flush()
115-
debug("wait drain")
116108
if not self.drain_event.wait(timeout=1):
117-
debug("drain timed out")
118109
raise RuntimeError("output streams failed to drain")
119-
debug("drain done")
120110

121111
def shutdown(self) -> None:
122112
if not self.is_alive():
123-
debug("skipping shutdown because not alive")
124113
return
125114
for stream in self._streams:
126115
stream.write(self.terminate_token + "\n")
127116
stream.flush()
128117
break # only need to write one terminate token
129-
debug("joining")
130118
self.join()
131119

132120
async def shutdown_async(self) -> None:
@@ -149,13 +137,10 @@ def run(self) -> None:
149137
drain_tokens_needed += 1
150138

151139
while not should_exit:
152-
debug("selector.select")
153140
for key, _ in selector.select():
154-
debug("selector key")
155141
stream = key.data
156142

157143
for line in stream.wrapped:
158-
debug("redirector saw", line)
159144
if not line.endswith("\n"):
160145
# TODO: limit how much we're prepared to buffer on a
161146
# single line
@@ -180,12 +165,9 @@ def run(self) -> None:
180165
# thing in the line was a drain token (or a terminate
181166
# token).
182167
if full_line:
183-
debug("write hook")
184168
self._write_hook(stream.name, stream.original, full_line + "\n")
185-
debug("write hook done")
186169

187170
if drain_tokens_seen >= drain_tokens_needed:
188-
debug("drain event set")
189171
self.drain_event.set()
190172
drain_tokens_seen = 0
191173

@@ -203,22 +185,18 @@ async def switch_to_async(self) -> None:
203185
204186
We must not call write_hook twice for the same data during the switch.
205187
"""
206-
debug("switch async, drain")
207188
# Drain the streams to ensure all buffered data is processed
208189
try:
209190
self.drain()
210191
except RuntimeError:
211-
debug("drain failed")
212192
raise
213-
debug("drain done, shutdown")
214193

215194
# Shut down the thread
216195
# we do this before starting a coroutine that will also read from the same fd
217196
# so that shutdown can find the terminate tokens correctly
218197
self.shutdown()
219198
self.stream_tasks = []
220199
self.is_async = True
221-
debug("set is async")
222200

223201
for stream in self._streams:
224202
# Open each stream as a StreamReader
@@ -235,7 +213,6 @@ async def switch_to_async(self) -> None:
235213
async def process_stream(
236214
self, stream: WrappedStream, reader: asyncio.StreamReader
237215
) -> None:
238-
debug("process_stream", stream.name)
239216
buffer = io.StringIO()
240217
drain_tokens_seen = 0
241218
should_exit = False
@@ -246,7 +223,6 @@ async def process_stream(
246223
break
247224

248225
line = line.decode()
249-
debug("redirector saw", line)
250226

251227
if not line.endswith("\n"):
252228
buffer.write(line)
@@ -270,7 +246,6 @@ async def process_stream(
270246
self._write_hook(stream.name, stream.original, full_line + "\n")
271247

272248
if drain_tokens_seen >= len(self._streams):
273-
debug("drain event set")
274249
self.drain_event.set()
275250
drain_tokens_seen = 0
276251
if should_exit:

python/cog/server/worker.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ def __init__(
119119
super().__init__()
120120

121121
def run(self) -> None:
122-
debug("run")
123122
self._sync_events_lock = threading.Lock()
124123
# If we're running at a shell, SIGINT will be sent to every process in
125124
# the process group. We ignore it in the child process and require that
@@ -146,33 +145,24 @@ def run(self) -> None:
146145
self._stream_redirector.start()
147146
# </could be moved into StreamRedirector>
148147

149-
debug("setup")
150148
self._setup()
151-
debug("loop")
152149
self._loop() # shuts down stream redirector the correct way
153-
debug("loop done")
154150
self._events.close()
155151

156152
async def _async_init(self) -> None:
157-
debug("async_init start")
158153
if self._events_async:
159-
debug("async_init finished")
160154
return
161155
# if AsyncConnection is created before switch_to_async, a race condition can cause drain to fail
162156
# and write, seemingly, to block
163157
# maybe because we're trying to call StreamWriter.write when no event loop is running?
164158
await self._stream_redirector.switch_to_async()
165159
self._events_async = AsyncConnection(self._events)
166160
await self._events_async.async_init()
167-
debug("async_init done")
168161

169162
def _setup(self) -> None:
170-
debug("_setup start")
171163
with self._handle_setup_error():
172164
# we need to load the predictor to know if setup is async
173-
debug("'about to load")
174165
self._predictor = load_predictor_from_ref(self._predictor_ref)
175-
debug("loaded ref")
176166
self._predictor.log = self._log
177167
# if users want to access the same event loop from setup and predict,
178168
# both have to be async. if setup isn't async, it doesn't matter if we
@@ -181,36 +171,24 @@ def _setup(self) -> None:
181171
# otherwise, if setup is sync and the user does new_event_loop to use a ClientSession,
182172
# then tries to use the same session from async predict, they would get an error.
183173
# that's significant if connections are open and would need to be discarded
184-
debug("async predictor")
185174
if is_async_predictor(self._predictor):
186-
debug("getting loop")
187175
self.loop = get_loop()
188-
debug("got loop")
189-
debug("getattr")
190176
# Could be a function or a class
191177
if hasattr(self._predictor, "setup"):
192-
debug("inspect")
193178
if inspect.iscoroutinefunction(self._predictor.setup):
194179
# we should probably handle Shutdown during this process?
195180
# possibly we prefer to not stop-start the event loop
196181
# between these calls
197182
self.loop.run_until_complete(self._async_init())
198183
self.loop.run_until_complete(run_setup_async(self._predictor))
199-
debug("run_setup_async done")
200184
else:
201-
debug("sync setup")
202185
run_setup(self._predictor)
203-
debug("_setup done inside ctx mgr")
204-
debug("_setup done")
205186

206187
@contextlib.contextmanager
207188
def _handle_setup_error(self) -> Iterator[None]:
208189
done = Done()
209-
debug("done")
210190
try:
211-
debug("yield")
212191
yield
213-
debug("yield done")
214192
except Exception as e:
215193
traceback.print_exc()
216194
done.error = True
@@ -226,15 +204,11 @@ def _handle_setup_error(self) -> Iterator[None]:
226204
# we can arrive here if there was an error setting up stream_redirector
227205
# for example, because drain failed
228206
# in this case this drain could block or fail
229-
debug("setup done, calling drain")
230207
try:
231208
self._stream_redirector.drain()
232209
except Exception as e:
233-
debug("exc", str(e))
234210
raise
235-
debug("sending setup done")
236211
self.send(("SETUP", done))
237-
debug("sent setup done")
238212

239213
def _loop_sync(self) -> None:
240214
while True:
@@ -253,7 +227,6 @@ def _loop_sync(self) -> None:
253227
self._stream_redirector.shutdown()
254228

255229
async def _loop_async(self) -> None:
256-
debug("loop async")
257230
await self._async_init()
258231
assert self._events_async
259232
tasks: dict[str, asyncio.Task[None]] = {}
@@ -277,14 +250,11 @@ async def _loop_async(self) -> None:
277250
print(f"Got unexpected cancellation: {ev}", file=sys.stderr)
278251
else:
279252
print(f"Got unexpected event: {ev}", file=sys.stderr)
280-
debug("shutdown_async")
281253
await self._stream_redirector.shutdown_async()
282254
self._events_async.close()
283255

284256
def _loop(self) -> None:
285-
debug("in loop")
286257
if is_async(get_predict(self._predictor)):
287-
debug("async loop")
288258
self.loop.run_until_complete(self._loop_async())
289259
else:
290260
self._loop_sync()
@@ -320,14 +290,10 @@ def _emit_metric(self, name: str, value: "int | float") -> None:
320290

321291
def send(self, obj: Any) -> None:
322292
if self._events_async:
323-
debug("sending on async")
324293
self._events_async.send(obj)
325-
debug("sent on async")
326294
else:
327-
debug("send lock")
328295
with self._sync_events_lock:
329296
self._events.send(obj)
330-
debug("finished sync send")
331297

332298
def _mk_send(self, id: str) -> Callable[[PublicEventType], None]:
333299
def send(event: PublicEventType) -> None:

0 commit comments

Comments
 (0)