Skip to content

Commit

Permalink
Updating documentation; Fixing test bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelvfalc committed Jan 29, 2019
1 parent 8c62cb5 commit c909447
Show file tree
Hide file tree
Showing 16 changed files with 683 additions and 53 deletions.
34 changes: 22 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@ The **Broker** is the framework entry point for the user. It is responsible for

**EUBra-BIGSEA** is committed to making a significant contribution to the **cooperation between Europe and Brazil** in the *area of advanced cloud services for Big Data applications*. See more about in [EUBra-BIGSEA website](http://www.eubra-bigsea.eu/).

To more info about **Broker** and how does it works in **BIGSEA Asperathos environment**, see [details.md](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/docs/details.md) and [asperathos-workflow.md](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/docs/asperathos-workflow.md).
To more info about **Broker** and how does it works in **BIGSEA Asperathos environment**, see [details.md](docs/details.md) and [asperathos-workflow.md](docs/asperathos-workflow.md).

## How does it works?
The broker is implemented following a **plugin architecture**, providing flexibility to customize your deployment using only the plugins you need, avoiding to include unnecessary dependencies (from others plugins) to your deploy environment.
All the integrations with different infrastructures and components are made by specific plugins.

## How to develop a plugin?
See [plugin-development.md](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/docs/plugin-development.md).
See [plugin-development.md](docs/plugin-development.md).

## Requirements
* Python 2.7
* Linux packages: python-dev and python-pip
* Python packages: setuptools, tox and flake8

To **apt** distros, you can use [pre-install.sh](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/pre-install.sh) to install the requirements.
To **apt** distros, you can use [pre-install.sh](pre-install.sh) to install the requirements.

## Install
Clone the [Broker repository](https://github.com/bigsea-ufcg/bigsea-manager.git) in your machine.
Clone the [Broker repository](https://github.com/ufcg-lsd/asperathos-manager) in your machine.

### Configuration
A configuration file is required to run the Broker. **Edit and fill your broker.cfg in the root of Broker directory.** Make sure you have fill up all fields before run.
You can find a template in [config-example.md](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/docs/config-example.md).
You can find a template in [config-example.md](docs/config-example.md).

### Run
In the Broker root directory, start the service using run script:
Expand All @@ -41,13 +41,23 @@ Or using tox command:
```
$ tox -e venv -- broker
```

### Run Unit Tests
In order to execute a unit test of a specific class run the following command:
```
$ pytest broker/test/unit/path/to/test/test_class.py
```
Or run all test cases using tox command:
```
$ tox
```
## Broker REST API
Endpoints are avaliable on [restapi-endpoints.md](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/docs/restapi-endpoints.md) documentation.
Endpoints are avaliable on [restapi-endpoints.md](docs/restapi-endpoints.md) documentation.

## Avaliable plugins
* [Spark Sahara](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/docs/plugins/spark_sahara.md)
* [Spark Mesos](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/docs/plugins/spark_mesos.md)
* [Spark Generic](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/docs/plugins/spark_generic.md)
* [Openstack Generic](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/docs/plugins/openstack_generic.md)
* [Chronos](https://github.com/bigsea-ufcg/bigsea-manager/tree/master/docs/plugins/chronos.md)

* [KubeJobs](docs/plugins/kubejobs.md)
* [Spark Sahara](docs/plugins/spark_sahara.md)
* [Spark Mesos](docs/plugins/spark_mesos.md)
* [Spark Generic](docs/plugins/spark_generic.md)
* [Openstack Generic](docs/plugins/openstack_generic.md)
* [Chronos](docs/plugins/chronos.md)
18 changes: 11 additions & 7 deletions broker/plugins/kubejobs/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ def __init__(self, app_id):
self.rds = None
self.status = "created"
self.job_completed = False
self.terminated = False
self.terminated = False
self.visualizer_url = "URL not generated!"
self.k8s = k8s

def start_application(self, data):
try:
Expand All @@ -55,7 +56,7 @@ def start_application(self, data):
# Provision a redis database for the job. Die in case of error.
# TODO(clenimar): configure ``timeout`` via a request param,
# e.g. api.redis_creation_timeout.
redis_ip, redis_port = k8s.provision_redis_or_die(self.app_id)
redis_ip, redis_port = self.k8s.provision_redis_or_die(self.app_id)
#agent_port = k8s.create_cpu_agent(self.app_id)

# inject REDIS_HOST in the environment
Expand All @@ -66,7 +67,9 @@ def start_application(self, data):
data['env_vars']['SCONE_CONFIG_ID'] = data['config_id']

# create a new Redis client and fill the work queue
self.rds = redis.StrictRedis(host=redis_ip, port=redis_port)
if(self.rds == None):
self.rds = redis.StrictRedis(host=redis_ip, port=redis_port)

queue_size = len(jobs)

# Check if a visualizer will be created
Expand Down Expand Up @@ -109,7 +112,7 @@ def start_application(self, data):

print "Creating Job"

k8s.create_job(self.app_id,
self.k8s.create_job(self.app_id,
data['cmd'], data['img'],
data['init_size'], data['env_vars'], config_id=data["config_id"])

Expand Down Expand Up @@ -137,7 +140,7 @@ def start_application(self, data):

while not self.job_completed and not self.terminated:
self.update_application_state("ongoing")
self.job_completed = k8s.completed(self.app_id)
self.job_completed = self.k8s.completed(self.app_id)
time.sleep(1)

# Stop monitor, controller and visualizer
Expand All @@ -158,8 +161,9 @@ def start_application(self, data):
print "Stoped services"

# delete redis resources
time.sleep(float(30))
if not self.get_application_state() == 'terminated':
k8s.delete_redis_resources(self.app_id)
self.k8s.delete_redis_resources(self.app_id)

except Exception as ex:
self.update_application_state("error")
Expand All @@ -174,7 +178,7 @@ def update_application_state(self, state):
self.status = state

def terminate_job(self):
k8s.terminate_job(self.app_id)
self.k8s.terminate_job(self.app_id)
self.update_application_state("terminated")
self.terminated = True

Expand Down
Empty file.
34 changes: 34 additions & 0 deletions broker/tests/unit/mocks/body_request.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"username": "user",
"password": "psswrd",
"cmd": ["python", "app.py"],
"img": "ip:port/kubejobsapp:demo",
"init_size": 1,
"redis_workload": "http://test.test",
"monitor_plugin": "kubejobs",
"enable_visualizer": false,
"monitor_info": {
"expected_time": 500
},
"visualizer_plugin": "k8s-grafana",
"visualizer_info": {
"datasource_type": "influxdb"
},
"env_vars": {
"S_USERNAME": "user",
"PASSWORD":"psswrd",
"AUTH_URL":"https://auth.com",
"PROJECT_ID":"00001",
"PROJECT_NAME": "project",
"PROJECT_DOMAIN_NAME":"domain",
"USER_DOMAIN_NAME":"domain",
"CONTAINER_NAME":"container"
},
"enable_visualizer":true,
"visualizer_plugin":"k8s-grafana",
"visualizer_info":{
"datasource_type":"influxdb"
},
"config_id": 1,
"enable_auth": false
}
146 changes: 146 additions & 0 deletions broker/tests/unit/mocks/k8s_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright (c) 2017 UFCG-LSD.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This entire module has the object of simulate a
Kubernetes (k8s) Object with tests purposes
"""

"""
Class that represents a mock of the Job object
"""
class Job():

""" Constructor of the mock of a Job object
Returns:
Job: The simulation of a Job object
"""
def __init__(self, active):
self.status = Status(active)

"""
Class that represents a mock of the Status object
"""
class Status():

""" Constructor of the mock of a Status object
Args:
active (string): Representing status of the object.
Returns:
Status: The simulation of a Status object
"""
def __init__(self, active):
self.active = active

"""
Class that represents a mock of the k8s object
"""
class MockKube():

""" Constructor of the mock of a k8s object
Args:
app_id (string): Representing id of the application.
replicas (int): Representing number of replicas of the application
namespace (string): Representing the namespace of the application
Returns:
MockKube: A mock of a k8s object
"""
def __init__(self, app_id, replicas=1, namespace="default"):
self.jobs = {namespace: {app_id: replicas}}

""" Function that simulates the read of a namespaced job
Args:
name (string): Representing name of the job.
namespace (string): Representing the namespace of the job
Returns:
Job: The simulation of the Job object searched
"""
def read_namespaced_job(self, name, namespace="default"):
out = Job(self.jobs[namespace][name])
return out


""" Function that simulates the creation of a job
Args:
app_id (string): Representing id of the application
cmd (string): Representing the command executed
img (string): Representing the image of the application
init_size (string): Representing the size of the application
env_vars (string): Representing the environmental variables of
the application
config_id (string): Representing the config id of the application
Returns:
None
"""
def create_job(self, app_id, cmd, img, init_size,
env_vars, config_id=""):
pass

""" Function that simulates the provision of death of
the redis
Args:
app_id (string): Representing id of the application
Returns:
tuple: Representing the redis_ip and node_port
"""
def provision_redis_or_die(self, app_id):
return (None, None)

""" Function that gets the status of completion of
a job.
Args:
app_id (string): Representing id of the application
Returns:
bool: Representing the status of completion
of the job
"""
def completed(self, app_id):
return True

""" Function that simulates a deletion of the
redis resources
Args:
app_id (string): Representing id of the application
Returns:
None
"""
def delete_redis_resources(self, app_id):
pass

""" Function that simulates a termination
of the job.
Args:
app_id (string): Representing id of the application
Returns:
None
"""
def terminate_job(self, app_id):
self.jobs["default"].pop(app_id)
73 changes: 73 additions & 0 deletions broker/tests/unit/mocks/redis_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright (c) 2017 UFCG-LSD.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Class that represents a mock of the redis object
"""
class MockRedis():

""" Constructor of the mock of a redis object
Returns:
MockRedis: The simulation of a redis object
"""
def __init__(self):
self.map = {}

""" Function the simulates the push of a job in the
redis queue
Args:
metric_queue (string): Representing the metric queue
metric (Object): Representing the metric to be pushed in the
queue.
Returns:
None
"""
def rpush(self, metric_queue, metric):
if self.map.get(metric_queue) == None:
self.map[metric_queue] = []

self.map[metric_queue].append(metric)

""" Function the simulates the pop of a job from the
redis queue
Args:
metric_queue (string): Representing the metric queue
Returns:
Object: Representing the metric pop from the queue
"""
def rpop(self, metric_queue):
try:
return self.map.get(metric_queue).pop(0)
except Exception as e:
print e

""" Function the simulates the deletion of a
redis queue
Args:
queue_name (string): Representing the name of the queue to
be deleted.
Returns:
None
"""
def delete(self, queue_name):
self.map.pop(queue_name)

Empty file.
Empty file.
Loading

0 comments on commit c909447

Please sign in to comment.