Skip to content

Commit df7edbd

Browse files
authored
Merge pull request spulec#59 from spulec/fix-checking-ppid
Change orphan child checking of parent id.
2 parents 2b117e6 + 65b60dc commit df7edbd

File tree

4 files changed

+38
-28
lines changed

4 files changed

+38
-28
lines changed

development.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
coverage==4.4.1
22
mock==1.0.1
3-
moto==1.3.4
3+
moto==1.3.8
44
nose==1.3.0
55
pre-commit==0.7.6
66
sure==1.2.2

pyqs/worker.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def get_conn(region=None, access_key_id=None, secret_access_key=None):
3838

3939
class BaseWorker(Process):
4040
def __init__(self, *args, **kwargs):
41+
self.parent_id = kwargs.pop('parent_id')
4142
super(BaseWorker, self).__init__(*args, **kwargs)
4243
self.should_exit = Event()
4344

@@ -48,7 +49,7 @@ def shutdown(self):
4849
self.should_exit.set()
4950

5051
def parent_is_alive(self):
51-
if os.getppid() == 1:
52+
if os.getppid() != self.parent_id:
5253
logger.info(
5354
"Parent process has gone away, exiting process {}!".format(
5455
os.getpid()))
@@ -255,6 +256,7 @@ def __init__(self, queue_prefixes, worker_concurrency, interval, batchsize,
255256
self.setup_internal_queue(worker_concurrency)
256257
self.reader_children = []
257258
self.worker_children = []
259+
self._pid = os.getpid()
258260
self._initialize_reader_children()
259261
self._initialize_worker_children(worker_concurrency)
260262
self._running = True
@@ -271,6 +273,7 @@ def _initialize_reader_children(self):
271273
ReadWorker(
272274
queue_url, self.internal_queue, self.batchsize,
273275
connection_args=self.connection_args,
276+
parent_id=self._pid,
274277
)
275278
)
276279

@@ -280,6 +283,7 @@ def _initialize_worker_children(self, number):
280283
ProcessWorker(
281284
self.internal_queue, self.interval,
282285
connection_args=self.connection_args,
286+
parent_id=self._pid,
283287
)
284288
)
285289

@@ -367,6 +371,7 @@ def _replace_reader_children(self):
367371
worker = ReadWorker(
368372
queue_url, self.internal_queue, self.batchsize,
369373
connection_args=self.connection_args,
374+
parent_id=self._pid,
370375
)
371376
worker.start()
372377
self.reader_children.append(worker)
@@ -381,6 +386,7 @@ def _replace_worker_children(self):
381386
worker = ProcessWorker(
382387
self.internal_queue, self.interval,
383388
connection_args=self.connection_args,
389+
parent_id=self._pid,
384390
)
385391
worker.start()
386392
self.worker_children.append(worker)

tests/test_manager_worker.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def test_main_method(ManagerWorker):
100100

101101
ManagerWorker.assert_called_once_with(
102102
['email1', 'email2'], 2, 1, 10, prefetch_multiplier=2,
103-
region='us-east-1', secret_access_key=None, access_key_id=None,
103+
region=None, secret_access_key=None, access_key_id=None,
104104
)
105105
ManagerWorker.return_value.start.assert_called_once_with()
106106

tests/test_worker.py

+29-25
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def test_worker_fills_internal_queue():
4343
conn.send_message(QueueUrl=queue_url, MessageBody=message)
4444

4545
internal_queue = Queue()
46-
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE)
46+
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE, parent_id=1)
4747
worker.read_message()
4848

4949
packed_message = internal_queue.get(timeout=1)
@@ -78,7 +78,7 @@ def test_worker_fills_internal_queue_only_until_maximum_queue_size():
7878
conn.send_message(QueueUrl=queue_url, MessageBody=message)
7979

8080
internal_queue = Queue(maxsize=2)
81-
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE)
81+
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE, parent_id=1)
8282
worker.read_message()
8383

8484
# The internal queue should only have two messages on it
@@ -110,7 +110,7 @@ def test_worker_fills_internal_queue_from_celery_task():
110110
conn.send_message(QueueUrl=queue_url, MessageBody=message)
111111

112112
internal_queue = Queue()
113-
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE)
113+
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE, parent_id=1)
114114
worker.read_message()
115115

116116
packed_message = internal_queue.get(timeout=1)
@@ -159,7 +159,7 @@ def test_worker_processes_tasks_from_internal_queue():
159159
)
160160

161161
# Process message
162-
worker = ProcessWorker(internal_queue, INTERVAL)
162+
worker = ProcessWorker(internal_queue, INTERVAL, parent_id=1)
163163
worker.process_message()
164164

165165
task_results.should.equal(['Test message'])
@@ -203,7 +203,7 @@ def test_worker_fills_internal_queue_and_respects_visibility_timeouts():
203203

204204
# Run Reader
205205
internal_queue = Queue(maxsize=1)
206-
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE)
206+
worker = ReadWorker(queue_url, internal_queue, BATCHSIZE, parent_id=1)
207207
worker.read_message()
208208

209209
# Check log messages
@@ -251,7 +251,7 @@ def test_worker_processes_tasks_and_logs_correctly():
251251
)
252252

253253
# Process message
254-
worker = ProcessWorker(internal_queue, INTERVAL)
254+
worker = ProcessWorker(internal_queue, INTERVAL, parent_id=1)
255255
worker.process_message()
256256

257257
# Check output
@@ -301,7 +301,7 @@ def test_worker_processes_tasks_and_logs_warning_correctly():
301301
)
302302

303303
# Process message
304-
worker = ProcessWorker(internal_queue, INTERVAL)
304+
worker = ProcessWorker(internal_queue, INTERVAL, parent_id=1)
305305
worker.process_message()
306306

307307
# Check output
@@ -314,7 +314,7 @@ def test_worker_processes_tasks_and_logs_warning_correctly():
314314
logger.handlers[0].messages['error'][0].lower().should.contain(
315315
msg1.lower())
316316
msg2 = (
317-
'raise ValueError("Need to be given basestring, was given '
317+
'"Need to be given basestring, was given '
318318
'{}".format(message))\nValueError: Need to be given basestring, '
319319
'was given 23'
320320
) # noqa
@@ -329,7 +329,7 @@ def test_worker_processes_empty_queue():
329329
"""
330330
internal_queue = Queue()
331331

332-
worker = ProcessWorker(internal_queue, INTERVAL)
332+
worker = ProcessWorker(internal_queue, INTERVAL, parent_id=1)
333333
worker.process_message()
334334

335335

@@ -338,9 +338,9 @@ def test_parent_process_death(os):
338338
"""
339339
Test worker processes recognize parent process death
340340
"""
341-
os.getppid.return_value = 1
341+
os.getppid.return_value = 123
342342

343-
worker = BaseWorker()
343+
worker = BaseWorker(parent_id=1)
344344
worker.parent_is_alive().should.be.false
345345

346346

@@ -351,7 +351,7 @@ def test_parent_process_alive(os):
351351
"""
352352
os.getppid.return_value = 1234
353353

354-
worker = BaseWorker()
354+
worker = BaseWorker(parent_id=1234)
355355
worker.parent_is_alive().should.be.true
356356

357357

@@ -366,14 +366,14 @@ def test_read_worker_with_parent_process_alive_and_should_not_exit(os):
366366
queue_url = conn.create_queue(QueueName="tester")['QueueUrl']
367367

368368
# Setup PPID
369-
os.getppid.return_value = 1234
369+
os.getppid.return_value = 1
370370

371371
# Setup dummy read_message
372372
def read_message():
373373
raise Exception("Called")
374374

375375
# When I have a parent process, and shutdown is not set
376-
worker = ReadWorker(queue_url, "foo", BATCHSIZE)
376+
worker = ReadWorker(queue_url, "foo", BATCHSIZE, parent_id=1)
377377
worker.read_message = read_message
378378

379379
# Then read_message() is reached
@@ -397,7 +397,7 @@ def test_read_worker_with_parent_process_alive_and_should_exit(os):
397397
q = Queue(1)
398398

399399
# When I have a parent process, and shutdown is set
400-
worker = ReadWorker(queue_url, q, BATCHSIZE)
400+
worker = ReadWorker(queue_url, q, BATCHSIZE, parent_id=1)
401401
worker.read_message = Mock()
402402
worker.shutdown()
403403

@@ -416,13 +416,13 @@ def test_read_worker_with_parent_process_dead_and_should_not_exit(os):
416416
queue_url = conn.create_queue(QueueName="tester")['QueueUrl']
417417

418418
# Setup PPID
419-
os.getppid.return_value = 1
419+
os.getppid.return_value = 123
420420

421421
# Setup internal queue
422422
q = Queue(1)
423423

424424
# When I have no parent process, and shutdown is not set
425-
worker = ReadWorker(queue_url, q, BATCHSIZE)
425+
worker = ReadWorker(queue_url, q, BATCHSIZE, parent_id=1)
426426
worker.read_message = Mock()
427427

428428
# Then I return from run()
@@ -437,14 +437,14 @@ def test_process_worker_with_parent_process_alive_and_should_not_exit(os):
437437
is not set
438438
"""
439439
# Setup PPID
440-
os.getppid.return_value = 1234
440+
os.getppid.return_value = 1
441441

442442
# Setup dummy read_message
443443
def process_message():
444444
raise Exception("Called")
445445

446446
# When I have a parent process, and shutdown is not set
447-
worker = ProcessWorker("foo", INTERVAL)
447+
worker = ProcessWorker("foo", INTERVAL, parent_id=1)
448448
worker.process_message = process_message
449449

450450
# Then process_message() is reached
@@ -461,7 +461,7 @@ def test_process_worker_with_parent_process_dead_and_should_not_exit(os):
461461
os.getppid.return_value = 1
462462

463463
# When I have no parent process, and shutdown is not set
464-
worker = ProcessWorker("foo", INTERVAL)
464+
worker = ProcessWorker("foo", INTERVAL, parent_id=1)
465465
worker.process_message = Mock()
466466

467467
# Then I return from run()
@@ -478,7 +478,7 @@ def test_process_worker_with_parent_process_alive_and_should_exit(os):
478478
os.getppid.return_value = 1234
479479

480480
# When I have a parent process, and shutdown is set
481-
worker = ProcessWorker("foo", INTERVAL)
481+
worker = ProcessWorker("foo", INTERVAL, parent_id=1)
482482
worker.process_message = Mock()
483483
worker.shutdown()
484484

@@ -487,10 +487,14 @@ def test_process_worker_with_parent_process_alive_and_should_exit(os):
487487

488488

489489
@mock_sqs
490-
def test_worker_processes_shuts_down_after_processing_its_max_number_of_msgs():
490+
@patch("pyqs.worker.os")
491+
def test_worker_processes_shuts_down_after_processing_its_max_number_of_msgs(
492+
os):
491493
"""
492494
Test worker processes shutdown after processing maximum number of messages
493495
"""
496+
os.getppid.return_value = 1
497+
494498
# Setup SQS Queue
495499
conn = boto3.client('sqs', region_name='us-east-1')
496500
queue_url = conn.create_queue(QueueName="tester")['QueueUrl']
@@ -535,7 +539,7 @@ def test_worker_processes_shuts_down_after_processing_its_max_number_of_msgs():
535539
)
536540

537541
# When I Process messages
538-
worker = ProcessWorker(internal_queue, INTERVAL)
542+
worker = ProcessWorker(internal_queue, INTERVAL, parent_id=1)
539543
worker._messages_to_process_before_shutdown = 2
540544

541545
# Then I return from run()
@@ -584,7 +588,7 @@ def test_worker_processes_discard_tasks_that_exceed_their_visibility_timeout():
584588
)
585589

586590
# When I process the message
587-
worker = ProcessWorker(internal_queue, INTERVAL)
591+
worker = ProcessWorker(internal_queue, INTERVAL, parent_id=1)
588592
worker.process_message()
589593

590594
# Then I get an error about exceeding the visibility timeout
@@ -648,7 +652,7 @@ def sleep_and_queue(internal_queue):
648652
thread.start()
649653

650654
# When I Process messages
651-
worker = ProcessWorker(internal_queue, INTERVAL)
655+
worker = ProcessWorker(internal_queue, INTERVAL, parent_id=1)
652656
worker._messages_to_process_before_shutdown = 2
653657

654658
# Then I return from run() after processing 2 messages

0 commit comments

Comments
 (0)