Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run JobCoontroller as separate application #951

Merged
merged 26 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
docker compose & health impl
  • Loading branch information
pyalex committed Aug 19, 2020
commit 6c01f7671d16259d1669f60c21db590be8aeed08
1 change: 1 addition & 0 deletions infra/docker-compose/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ COMPOSE_PROJECT_NAME=feast
FEAST_VERSION=0.6.2
GCP_SERVICE_ACCOUNT=./gcp-service-accounts/key.json
FEAST_CORE_CONFIG=./core/core.yml
FEAST_JC_CONFIG=./jc/jc.yml
FEAST_HISTORICAL_SERVING_CONFIG=./serving/historical-serving.yml
FEAST_HISTORICAL_SERVING_ENABLED=false
FEAST_ONLINE_SERVING_CONFIG=./serving/online-serving.yml
9 changes: 0 additions & 9 deletions infra/docker-compose/core/core.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
feast:
jobs:
polling_interval_milliseconds: 20000
job_update_timeout_seconds: 240
active_runner: direct
runners:
- name: direct
type: DirectRunner
options:
tempLocation: gs://bucket/tempLocation
stream:
type: kafka
options:
Expand Down
17 changes: 17 additions & 0 deletions infra/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ services:
- /opt/feast/feast-core.jar
- --spring.config.location=classpath:/application.yml,file:/etc/feast/application.yml

jc:
image: gcr.io/kf-feast/feast-jc:${FEAST_VERSION}
volumes:
- ${FEAST_JC_CONFIG}:/etc/feast/application.yml
- ${GCP_SERVICE_ACCOUNT}:/etc/gcloud/service-accounts/key.json
environment:
GOOGLE_APPLICATION_CREDENTIALS: /etc/gcloud/service-accounts/key.json
depends_on:
- kafka
ports:
- 6570:6570
command:
- java
- -jar
- /opt/feast/feast-job-coordinator.jar
- --spring.config.location=classpath:/application.yml,file:/etc/feast/application.yml

jupyter:
image: gcr.io/kf-feast/feast-jupyter:${FEAST_VERSION}
volumes:
Expand Down
15 changes: 15 additions & 0 deletions infra/docker-compose/jc/jc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
feast:
jobs:
polling_interval_milliseconds: 20000
job_update_timeout_seconds: 240
active_runner: direct
runners:
- name: direct
type: DirectRunner
options:
tempLocation: gs://bucket/tempLocation
stream:
type: kafka
options:
topic: feast-features
bootstrapServers: "kafka:9092,localhost:9094"
7 changes: 7 additions & 0 deletions infra/scripts/test-load.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ else
fi

wait_for_docker_image gcr.io/kf-feast/feast-core:"${FEAST_VERSION}"
wait_for_docker_image gcr.io/kf-feast/feast-jc:"${FEAST_VERSION}"
wait_for_docker_image gcr.io/kf-feast/feast-serving:"${FEAST_VERSION}"
wait_for_docker_image gcr.io/kf-feast/feast-jupyter:"${FEAST_VERSION}"

Expand Down Expand Up @@ -65,6 +66,12 @@ export FEAST_CORE_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSett
# Wait for Feast Core to be ready
"${PROJECT_ROOT_DIR}"/infra/scripts/wait-for-it.sh ${FEAST_CORE_CONTAINER_IP_ADDRESS}:6565 --timeout=120

# Get Feast Job Coordinator container IP address
export FEAST_JC_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_jc_1)

# Wait for Feast Job Coordinator to be ready
"${PROJECT_ROOT_DIR}"/infra/scripts/wait-for-it.sh ${FEAST_JC_CONTAINER_IP_ADDRESS}:6570 --timeout=120

# Get Feast Online Serving container IP address
export FEAST_ONLINE_SERVING_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_online_serving_1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ public class InMemoryJobRepository implements JobRepository {
@Autowired
public InMemoryJobRepository(JobManager jobManager) {
this.jobManager = jobManager;
this.storage = new HashMap<>();

// this.storage =
// this.jobManager.listRunningJobs().stream().collect(Collectors.toMap(Job::getId, j ->
// j));
this.storage =
this.jobManager.listRunningJobs().stream().collect(Collectors.toMap(Job::getId, j -> j));
}

/**
Expand Down
54 changes: 54 additions & 0 deletions job-coordinator/src/main/java/feast/jc/grpc/HealthServiceImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.jc.grpc;

import feast.jc.dao.JobRepository;
import io.grpc.Status;
import io.grpc.health.v1.HealthGrpc.HealthImplBase;
import io.grpc.health.v1.HealthProto.HealthCheckRequest;
import io.grpc.health.v1.HealthProto.HealthCheckResponse;
import io.grpc.health.v1.HealthProto.HealthCheckResponse.ServingStatus;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
import org.springframework.beans.factory.annotation.Autowired;

@Slf4j
@GrpcService
public class HealthServiceImpl extends HealthImplBase {
private final JobRepository jobRepository;

@Autowired
public HealthServiceImpl(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}

@Override
public void check(
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
try {
jobRepository.findAll();
responseObserver.onNext(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Health Check: unable to retrieve projects.\nError: %s", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}
}