Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ func (c *controller) waitInferenceServiceReady(service *kservev1beta1.InferenceS
err = fmt.Errorf("%w\n\nPod container status:\n%s", err, podContainerTable)
}

if podLastTerminationReason != "" {
err = fmt.Errorf("%w\n\nPod last termination reason: %s", err, podLastTerminationReason)
}

if podLastTerminationMessage != "" {
err = fmt.Errorf("%w\n\nPod last termination message:\n%s", err, podLastTerminationMessage)
}
Expand Down
3 changes: 3 additions & 0 deletions api/queue/work/model_service_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {
prevEndpoint.Status = models.EndpointFailed
}

// Propagate Kubernetes error details to user via endpoint message
prevEndpoint.Message = deployment.Error

// record the version endpoint result if deployment
if err := depl.Storage.Save(prevEndpoint); err != nil {
log.Errorf("unable to update endpoint status for model: %s, version: %s, reason: %v", model.Name, version.ID, err)
Expand Down
3 changes: 3 additions & 0 deletions api/queue/work/model_service_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ func TestExecuteDeployment(t *testing.T) {
ResourceRequest: env.DefaultResourceRequest,
VersionID: version.ID,
Namespace: project.Name,
Message: "Failed to deploy",
}, nil)
return mockStorage
},
Expand Down Expand Up @@ -745,6 +746,7 @@ func TestExecuteDeployment(t *testing.T) {
ResourceRequest: env.DefaultResourceRequest,
VersionID: version.ID,
Namespace: project.Name,
Message: "Failed to build image",
}, nil)
return mockStorage
},
Expand Down Expand Up @@ -1189,6 +1191,7 @@ func TestExecuteRedeployment(t *testing.T) {
RevisionID: models.ID(1),
InferenceServiceName: fmt.Sprintf("%s-%d-1", model.Name, version.ID),
Status: models.EndpointRunning,
Message: "Failed to deploy",
}).Return(nil)
return mockStorage
},
Expand Down
10 changes: 8 additions & 2 deletions python/pyfunc-server/pyfuncserver/protocol/upi/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def start(self):
# multiprocessing based on https://github.com/grpc/grpc/tree/master/examples/python/multiprocessing
workers = []
for _ in range(self._config.workers - 1):
worker = multiprocessing.Process(target=self._run_server)
worker = multiprocessing.Process(target=self._run_server_sync)
worker.start()
workers.append(worker)

Expand All @@ -67,7 +67,13 @@ def start(self):
publisher = Publisher(kafka_producer, sampler)
self._predict_service.set_publisher(publisher)

asyncio.get_event_loop().run_until_complete(self._run_server())
self._run_server_sync()

def _run_server_sync(self):
"""Synchronous wrapper to run the async server in a new event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._run_server())

async def _run_server(self):
"""
Expand Down
Loading