- 
                Notifications
    You must be signed in to change notification settings 
- Fork 8
feat: Allow agent to handle signals for graceful termination #176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
e3d8a7d    to
    2af430c      
    Compare
  
    2af430c    to
    e77e535      
    Compare
  
    |  | ||
| self._shutting_down = True | ||
| print("Shutting down, canceling all tasks...") | ||
| self.cancel_tasks() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cancels in sequence, and not parallel but it should be ok because we only run one at a time AFAIK?
| # There seems to be a weird bug with grpcio that makes subsequent requests fail with | ||
| # concurrent rpc limit exceeded if we set maximum_current_rpcs to 1. Setting it to 2 | ||
| # fixes it, even though in practice, we only run one request at a time. | ||
| server = aio.server( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main change, so all requests run in the main thread.
| print("Terminating the agent process...") | ||
| process.terminate() | ||
| process.wait(timeout=PROCESS_SHUTDOWN_TIMEOUT) | ||
| print("Agent process shutdown gracefully") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be excessive logging, I can take it out. But it was useful for debugging.
| """An internal problem caused by (most probably) the agent.""" | ||
|  | ||
|  | ||
| PROCESS_SHUTDOWN_TIMEOUT = 5 # seconds | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's a good default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can default to 60?
The question becomes how can we configure this from outside
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could add an optional field here to allow callers to override the default:
isolate/src/isolate/server/definitions/server.proto
Lines 24 to 29 in 64144e9
| message BoundFunction { | |
| repeated EnvironmentDefinition environments = 1; | |
| SerializedObject function = 2; | |
| optional SerializedObject setup_func = 3; | |
| bool stream_logs = 4; | |
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defaulted to 60 and can be overridden via env var, I think this should work for now. Realized we can't add it to the request because it's possible for the agent to be re-used across functions.
| """An internal problem caused by (most probably) the agent.""" | ||
|  | ||
|  | ||
| PROCESS_SHUTDOWN_TIMEOUT = 5 # seconds | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can default to 60?
The question becomes how can we configure this from outside
a126bfe    to
    4961a58      
    Compare
  
    b0a4f93    to
    015cf6c      
    Compare
  
    015cf6c    to
    201d4b9      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great!
|  | ||
| PROCESS_SHUTDOWN_TIMEOUT = 5 # seconds | ||
| PROCESS_SHUTDOWN_TIMEOUT_SECONDS = float( | ||
| os.getenv("ISOLATE_SHUTDOWN_GRACE_PERIOD", "60") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very good solution
| if self.future and not self.future.running(): | ||
| self.future.cancel() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if we dont cancel it, then what happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in almost all cases, nothing. But there could be rare race conditions where the future that hasn't started yet starts executing after this leading to an orphaned agent process (more likely to happen when server is handling multiple tasks).
The chances are quite low, but it's more correct to always cancel imo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can just do a log about this scenario then
|  | ||
|  | ||
| @pytest.fixture | ||
| def isolate_server_subprocess(monkeypatch): | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
a6885d6    to
    201d4b9      
    Compare
  
    8acb8b4    to
    42d04ec      
    Compare
  
    This reverts commit 42d04ec.
The main issue with implementing sig term handling was with with the default gprc implementation, the requests would run in the child threads and in python, only the main thread could register signal handlers.
Initially looked at spawning a subprocess per request from the agent or switching the grpc server to use the ProcessPoolExecutor but both had issues. However, there's a cleaner way: switch to the asyncio grpc server. This runs all requests in the main thread.
Tested manually and added integration tests.
See previous attempt here https://github.com/fal-ai/isolate/pull/174/files. The shutdown test is quite good, so re-used and extended it.