Skip to content

Commit

Permalink
provisional fix for multiple requests from same code in pool problem
Browse files Browse the repository at this point in the history
  • Loading branch information
ipelupessy committed Jan 15, 2019
1 parent 292f3de commit e6d26bd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
26 changes: 21 additions & 5 deletions src/amuse/rfi/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def result(self):
def results(self):
return [self.result()]

def add_result_handler(self, function, args = ()):
def add_result_handler(self, function, args = ()):
self.result_handlers.append([function,args])

def is_mpi_request(self):
Expand Down Expand Up @@ -613,7 +613,7 @@ def wait(self):

if len(requests) > 0:
for index, x in zip(indices, requests):
x.wait_for().waitone()
x.waits_for().waitone()

request_and_handler = self.requests_and_handlers[index]
if request_and_handler.async_request.is_result_available():
Expand All @@ -624,8 +624,16 @@ def wait(self):
request_and_handler.run()
break

requests = [x.async_request.waits_for().request for x in self.requests_and_handlers if x.async_request.is_mpi_request()]
indices = [i for i, x in enumerate(self.requests_and_handlers) if x.async_request.is_mpi_request()]
requests_ = [x.async_request.waits_for().request for x in self.requests_and_handlers if x.async_request.is_mpi_request()]
indices_ = [i for i, x in enumerate(self.requests_and_handlers) if x.async_request.is_mpi_request()]

requests=[]
indices=[]
for r,i in zip(requests_, indices_):
if r not in requests:
requests.append(r)
indices.append(i)

if len(requests) > 0:
index = MPI.Request.Waitany(requests)

Expand All @@ -643,8 +651,16 @@ def wait(self):
request_and_handler.run()
break

sockets = [x.async_request.waits_for().socket for x in self.requests_and_handlers if x.async_request.is_socket_request()]
sockets_ = [x.async_request.waits_for().socket for x in self.requests_and_handlers if x.async_request.is_socket_request()]
indices = [i for i, x in enumerate(self.requests_and_handlers) if x.async_request.is_socket_request()]

sockets=[]
indices=[]
for r,i in zip(sockets_, indices_):
if r not in sockets:
sockets.append(r)
indices.append(i)

if len(sockets) > 0:
readable, _, _ = select.select(sockets, [], [])
indices_to_delete = []
Expand Down
2 changes: 1 addition & 1 deletion test/compile_tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def test18(self):
instance2.stop()
instance3.stop()

def xtest18b(self):
def test18b(self):
""" test pool as depedency 2 """
instance1 = ForTesting(self.exefile)
instance2 = ForTesting(self.exefile)
Expand Down

0 comments on commit e6d26bd

Please sign in to comment.