Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

43 experiment replication batch #47

Merged
merged 11 commits into from
Sep 16, 2022
3 changes: 2 additions & 1 deletion configs/example_cloud_experiment.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
"epoch_save_end_suffix": "end"
},
"reproducibility": {
"seeds": [42]
"seeds": [42],
"parallel_execution": true
}
}
}
3 changes: 2 additions & 1 deletion configs/test/test_experiment.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"cluster": {
"orchestrator": {
"orchestrator_type": "simulated"
"orchestrator_type": "simulated",
"parallel_execution": true
},
"client": {
"prefix": "client",
Expand Down
4 changes: 2 additions & 2 deletions fltk/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fltk.util.config import FedLearningConfig
from fltk.util.config import FedLearnerConfig


class Client(Node):
Expand All @@ -22,7 +22,7 @@ class Client(Node):
"""
running = False

def __init__(self, identifier: str, rank: int, world_size: int, config: FedLearningConfig):
def __init__(self, identifier: str, rank: int, world_size: int, config: FedLearnerConfig):
super().__init__(identifier, rank, world_size, config)

self.loss_function = self.config.get_loss_function()()
Expand Down
17 changes: 9 additions & 8 deletions fltk/core/distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
from fltk.util.results import EpochData

if TYPE_CHECKING:
from fltk.util.config import DistributedConfig, DistLearningConfig
from fltk.util.config import DistributedConfig, DistLearnerConfig


class DistClient(DistNode):

def __init__(self, rank: int, task_id: str, world_size: int, config: DistributedConfig = None,
learning_params: DistLearningConfig = None):
learning_params: DistLearnerConfig = None):
"""
@param rank: PyTorch rank provided by KubeFlow setup.
@type rank: int
Expand All @@ -34,7 +34,7 @@ def __init__(self, rank: int, task_id: str, world_size: int, config: Distributed
@param config: Parsed configuration file representation to extract runtime information from.
@type config: DistributedConfig
@param learning_params: Hyper-parameter configuration to be used during the training process by the learner.
@type learning_params: DistLearningConfig
@type learning_params: DistLearnerConfig
"""
self._logger = logging.getLogger(f'Client-{rank}-{task_id}')

Expand All @@ -48,7 +48,8 @@ def __init__(self, rank: int, task_id: str, world_size: int, config: Distributed

# Create model and dataset
self.loss_function = self.learning_params.get_loss_function()()
self.dataset = get_dist_dataset(self.learning_params.dataset)(self.config, self.learning_params, self._id, self._world_size)
self.dataset = get_dist_dataset(self.learning_params.dataset)(self.config, self.learning_params, self._id,
self._world_size)
self.model = get_net(self.learning_params.model)()
self.device = self._init_device()

Expand Down Expand Up @@ -80,7 +81,7 @@ def prepare_learner(self, distributed: bool = False) -> None:

if self.config.execution_config.tensorboard.active and self._id == 0:
self.tb_writer = SummaryWriter(
str(self.config.get_log_path(self._task_id, self._id, self.learning_params)))
str(self.config.get_log_path(self._task_id, self._id, self.learning_params)))

def stop_learner(self):
"""
Expand All @@ -91,7 +92,7 @@ def stop_learner(self):
self._logger.info(f"Tearing down Client {self._id}")
self.tb_writer.close()

def _init_device(self, default_device: torch.device = torch.device('cpu')): # pylint: disable=no-member
def _init_device(self, default_device: torch.device = torch.device('cpu')): # pylint: disable=no-member
"""
Initialize Torch to use available devices. Either prepares CUDA device, or disables CUDA during execution to run
with CPU only inference/training.
Expand All @@ -102,7 +103,7 @@ def _init_device(self, default_device: torch.device = torch.device('cpu')): # py
@rtype: None
"""
if self.config.cuda_enabled() and torch.cuda.is_available():
return torch.device('cuda') # pylint: disable=no-member
return torch.device('cuda') # pylint: disable=no-member
# Force usage of CPU
torch.cuda.is_available = lambda: False
return default_device
Expand Down Expand Up @@ -188,7 +189,7 @@ def test(self) -> Tuple[float, float, np.array, np.array, np.array]:
outputs = self.model(images)
# Currently, the FLTK framework assumes that a classification task is performed (hence max).
# Future work may add support for non-classification training.
_, predicted = torch.max(outputs.data, 1) # pylint: disable=no-member
_, predicted = torch.max(outputs.data, 1) # pylint: disable=no-member
total += labels.size(0)
correct += (predicted == labels).sum().item()

Expand Down
1 change: 1 addition & 0 deletions fltk/core/distributed/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from fltk.util.config import DistributedConfig


# noinspection PyUnresolvedReferences
def download_datasets(args: Namespace, config: DistributedConfig):
"""
Function to Download datasets to a system. This is currently meant to be run (using the extractor mode of FLTK) to
Expand Down
45 changes: 21 additions & 24 deletions fltk/core/distributed/orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

import abc
import collections
import logging
import time
import uuid
from queue import PriorityQueue
from typing import OrderedDict, Dict, Type, Set, Union
from typing import TYPE_CHECKING

from jinja2 import Environment, FileSystemLoader
from kubeflow.training import PyTorchJobClient
Expand All @@ -15,11 +17,8 @@

from fltk.core.distributed.dist_node import DistNode
from fltk.util.cluster.client import construct_job, ClusterManager
from fltk.util.task import get_job_arrival_class
from fltk.util.task.generator.arrival_generator import ArrivalGenerator, Arrival
from fltk.util.task.task import DistributedArrivalTask, FederatedArrivalTask, ArrivalTask

from typing import TYPE_CHECKING
from fltk.util.task import get_job_arrival_class, DistributedArrivalTask, FederatedArrivalTask, ArrivalTask
from fltk.util.task.generator import ArrivalGenerator

if TYPE_CHECKING:
from fltk.util.config import DistributedConfig
Expand Down Expand Up @@ -256,23 +255,21 @@ def run(self, clear: bool = False, experiment_replication: int = -1) -> None:
while not self.pending_tasks.empty():
curr_task: ArrivalTask = self.pending_tasks.get()
self._logger.info(f"Scheduling arrival of Arrival: {curr_task.id}")
# Create persistent logging information. A these will not be deleted by the Orchestrator, as such, they
# allow you to retrieve information of experiments after removing the PytorchJob after completion.
config_dict, configmap_name_dict = _prepare_experiment_maps(curr_task,
config=self._config,
u_id=curr_task.id,
replication=experiment_replication)
self._create_config_maps(config_dict)

try:
# Create persistent logging information. A these will not be deleted by the Orchestrator, as such, they
# allow you to retrieve information of experiments after removing the PytorchJob after completion.
config_dict, configmap_name_dict = _prepare_experiment_maps(curr_task,
config=self._config,
u_id=curr_task.id,
replication=experiment_replication)
self._create_config_maps(config_dict)

job_to_start = construct_job(self._config, curr_task, configmap_name_dict)
self._logger.info(f"Deploying on cluster: {curr_task.id}")
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.add(curr_task)
except:
pass
job_to_start = construct_job(self._config, curr_task, configmap_name_dict)
self._logger.info(f"Deploying on cluster: {curr_task.id}")
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.add(curr_task)

# TODO: Extend this logic in your real project, this is only meant for demo purposes
# For now we exit the thread after scheduling a single task.

self._logger.info("Still alive...")
# Prevent high cpu utilization by sleeping between checks.
Expand Down Expand Up @@ -307,13 +304,13 @@ def run(self, clear: bool = False, experiment_replication: int = 1) -> None:
self._clear_jobs()
while self._alive and time.time() - start_time < self._config.get_duration():
# 1. Check arrivals
# If new arrivals, store them in arrival list
# If new arrivals, store them in arrival PriorityQueue
while not self._arrival_generator.arrivals.empty():
arrival = self._arrival_generator.arrivals.get()
task = _generate_task(arrival)
self._logger.debug(f"Arrival of: {task}")
self.pending_tasks.put(task)

# 2. Schedule all tasks that arrived previously
while not self.pending_tasks.empty():
# Do blocking request to priority queue
curr_task: ArrivalTask = self.pending_tasks.get()
Expand All @@ -331,9 +328,9 @@ def run(self, clear: bool = False, experiment_replication: int = 1) -> None:
self._logger.info(f"Deploying on cluster: {curr_task.id}")
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.add(curr_task)
if not self._config.cluster_config.orchestrator.parallel_execution:
self.wait_for_jobs_to_complete()

# TODO: Extend this logic in your real project, this is only meant for demo purposes
# For now we exit the thread after scheduling a single task.
self.stop()

self._logger.debug("Still alive...")
Expand Down
4 changes: 2 additions & 2 deletions fltk/core/federator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from fltk.util.config import FedLearningConfig
from fltk.util.config import FedLearnerConfig
NodeReference = Union[Node, str]


Expand Down Expand Up @@ -62,7 +62,7 @@ class Federator(Node):
num_rounds: int
exp_data: DataContainer

def __init__(self, identifier: str, rank: int, world_size: int, config: FedLearningConfig):
def __init__(self, identifier: str, rank: int, world_size: int, config: FedLearnerConfig):
super().__init__(identifier, rank, world_size, config)
self.loss_function = self.config.get_loss_function()()
self.num_rounds = config.rounds
Expand Down
6 changes: 3 additions & 3 deletions fltk/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from fltk.util.log import getLogger

if TYPE_CHECKING:
from fltk.util.config import FedLearningConfig
from fltk.util.config import FedLearnerConfig

# Global dictionary to enable peer to peer communication between clients
global_vars = {}
Expand All @@ -36,7 +36,7 @@ class Node(abc.ABC):
dataset: Any
logger = getLogger(__name__)

def __init__(self, identifier: str, rank: int, world_size: int, config: FedLearningConfig):
def __init__(self, identifier: str, rank: int, world_size: int, config: FedLearnerConfig):
self.config = config
self.id = identifier # pylint: disable=invalid-name
self.rank = rank
Expand All @@ -46,7 +46,7 @@ def __init__(self, identifier: str, rank: int, world_size: int, config: FedLearn
global_vars['self'] = self
self._config(config)

def _config(self, config: FedLearningConfig):
def _config(self, config: FedLearnerConfig):
self.logger.setLevel(config.log_level.value)
self.config.rank = self.rank
self.config.world_size = self.world_size
Expand Down
22 changes: 1 addition & 21 deletions fltk/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,4 @@
from fltk.datasets.cifar100 import CIFAR100Dataset
from fltk.datasets.fashion_mnist import FashionMNISTDataset
from fltk.datasets.mnist import MNIST
from fltk.util.config.definitions import Dataset


def get_train_loader_path(name: Dataset) -> str:
paths = {
Dataset.cifar10: 'data_loaders/cifar10/train_data_loader.pickle',
Dataset.fashion_mnist: 'data_loaders/fashion-mnist/train_data_loader.pickle',
Dataset.cifar100: 'data_loaders/cifar100/train_data_loader.pickle',
Dataset.mnist: 'data_loaders/mnist/train_data_loader.pickle',
}
return paths[name]


def get_test_loader_path(name: Dataset) -> str:
paths = {
Dataset.cifar10: 'data_loaders/cifar10/test_data_loader.pickle',
Dataset.fashion_mnist: 'data_loaders/fashion-mnist/test_data_loader.pickle',
Dataset.cifar100: 'data_loaders/cifar100/test_data_loader.pickle',
Dataset.mnist: 'data_loaders/mnist/test_data_loader.pickle',
}
return paths[name]
from fltk.datasets.dataset import Dataset
4 changes: 2 additions & 2 deletions fltk/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from fltk.util.config import DistLearningConfig
from fltk.util.config import DistLearnerConfig


class Dataset(abc.ABC):
"""
Dataset implementation for Distributed learning experiments.
"""

def __init__(self, config, learning_params: DistLearningConfig, rank: int, world_size: int):
def __init__(self, config, learning_params: DistLearnerConfig, rank: int, world_size: int):
self.config = config
self.learning_params = learning_params

Expand Down
4 changes: 2 additions & 2 deletions fltk/datasets/federated/cifar10.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

from fltk.datasets.federated.dataset import FedDataset
from fltk.samplers import get_sampler
from fltk.util.config import FedLearningConfig
from fltk.util.config import FedLearnerConfig


class FedCIFAR10Dataset(FedDataset):
"""
CIFAR10 Dataset implementation for Distributed learning experiments.
"""

def __init__(self, args: FedLearningConfig):
def __init__(self, args: FedLearnerConfig):
super(FedCIFAR10Dataset, self).__init__(args)
self.init_train_dataset()
self.init_test_dataset()
Expand Down
4 changes: 2 additions & 2 deletions fltk/datasets/federated/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import abstractmethod

from fltk.util.config import FedLearningConfig
from fltk.util.config import FedLearnerConfig
from fltk.util.log import getLogger


Expand All @@ -13,7 +13,7 @@ class FedDataset:
test_loader = None
logger = getLogger(__name__)

def __init__(self, args: FedLearningConfig):
def __init__(self, args: FedLearnerConfig):
self.args = args

def get_args(self):
Expand Down
Loading