Skip to content

Commit

Permalink
Development (#361)
Browse files Browse the repository at this point in the history
* Add group associations to roles (#319)

* Sync up the generated code from openapi generator with what we have currently (#331)

* applied formatting (#341)

* pre-commit setup and dev reqs (#342)

* Refactor config handling with pydantic (#332)

* Make sdk config backwards compatible. (#355)

* Fix merge conflicts between development and main branch (#353)

* optimizer compatibility with tensorflow and example for medmnist keras/pytorch (#320)

Tensorflow compatibility for new optimizers was added, which included fedavg, fedadam, fedadagrad, and fedyogi.

A shell script for tesing all 8 possible combinations of optimizers and frameworks is included.
This allows the medmnist example to be run with keras (the folder structure was refactored to include a trainer and aggregator for keras).

The typo in fedavg.py has now been fixed.

* feat+fix: grpc support for hierarchical fl (#321)

Hierarchical fl didn't work with grpc as backend. This is because
groupby field was not considered in metaserver service and p2p
backend.

In addition, a middle aggregator hangs even after a job is
completed. This deadlock occurs because p2p backend cleanup code is
called as a part of a channel cleanup. However, in a middle
aggregator, p2p backend is responsible for tasks across all
channnels. The p2p cleanup code couldn't finish cleanup because
a broadcast task for in the other channel can't finish. This bug is
fixed here by getting the p2p backend cleanup code out side of channel
cleanup code.

* documenation for metaserver/mqtt local (#322)

Documentation for using metaserver will allow users to run examples with a local broker.
It also allows for mqtt local brokers.
This decreases the chances of any job ID collisions.

Modifications to the config.json for the mnist example were made in order to make it easier to switch to a local broker.
The readme does indicate how to do this for other examples now.

Co-authored-by: vboxuser <[email protected]>

* feat: asynchronous fl (#323)

Asynchronous FL is implemented for two-tier topology and three-tier
hierarchical topology.

The main algorithm is based on the following two papers:
- https://arxiv.org/pdf/2111.04877.pdf
- https://arxiv.org/pdf/2106.06639.pdf

Two examples for asynchronous fl are also added. One is for a two-tier
topology and the other for a three-tier hierarchical topology.

This implementation includes the core algorithm but  doesn't include
SecAgg algorithm (presented in the papers), which is not the scope of
this change.

* fix+refactor: asyncfl loss divergence (#330)

For asyncfl, a client (trainer) should send delta by subtracting local
weights from original global weights after training. In the current
implementation, the whole local weights were sent to a
server (aggregator). This causes loss divergence.

Supporting delta update requires refactoring of aggregators of
synchronous fl (horizontal/{top_aggregator.py, middle_aggregator.py})
as well as optimizers' do() function.

The changes here support delta update universally across all types of
modes (horizontal synchronous, asynchronous, and hybrid).

* fix: conflict bewtween integer tensor and float tensor (#335)

Model architectures can have integer tensors. Applying aggregation on
those tensors results in type mistmatch and throws a runtime error:
"RuntimeError: result type Float can't be cast to the desired output
type Long"

Integer tensors don't matter in back propagation. So, as a workaround
to the issue, we typecast to the original dtype when the original type
is different from the dtype of weighted tensors for aggregation. In
this way, we can keep the model architecture as is.

* refactor: config for hybrid example in library (#334)

To enable library-only execution for hybrid example, its configuration
files are updated accordingly. The revised configuration has local
mqtt and p2p broker config and p2p broker is selected.

* misc: asynchronous hierarchical fl example (#340)

Since the Flame SDK supports asynchronous FL, we add an example of an
asynchronous hierarchical FL for control plane.

* chore: clean up examples folder (#336)

The examples folder at the top level directory has some outdated and
irrelevant files. Those are now removed from the folder.

* fix: workaround for hybrid mode with two p2p backends (#345)

Due to grpc/grpc#25364, when two p2p
backends (which rely on grpc and asyncio) are defined, the hybrid mode
example throws an execption: 'BlockingIOError: [Errno 35] Resource
temporarily unavailable'. The issue still appears unresolved. As a
temporary workaround, we use two different types of backends: mqtt for
one and p2p for the other. This means that when this example is
executed, both metaserver and a mqtt broker (e.g., mosquitto) must be
running in the local machine.

* fix: distributed mode (#344)

Distributed mode has a bug: before 'weights' is not defined as member
variable, deepcopy(self.weights) in _update_weights() is called.
To address this issue, self.weights is initialized in __init__().

Also, to run a distributed example locally, configuration files are
revised.

* example/implementation for fedprox (#339)

This example is similar to the ones seen in the fedprox paper, although it currently does not simmulate stragglers and uses another dataset/architecture.

A few things were changed in order for there to be a simple process for modifying trainers.
This includes a function in util.py and another class variable in the trainer containing information on the client side regularizer.

Additionally, tests are automated (mu=1,0.1,0.01,0.001,0) so running the example generates or modifies existing files in order to provide the propper configuration for an experiment.

* Create diagnose script (#348)

* Create diagnose script

* Make the script executable

---------

Co-authored-by: Alex Ungurean <[email protected]>

* refactor+fix: configurable deployer / lib regularizer fix (#351)

deployer's job template file is hard-coded, which makes it hard to use
different template file at deployment time. Using different different
template file is useful when underlying infrastructure is
different (e.g., k8s vs knative). To support that, template folder and
file is fed as config variables.

Also, deployer's config info is fed as command argument, which is
cumbersome. So, the config parsing part is refactored such that the
info is fed as a configuration file.

During the testing of deployer change, a bug in the library
is identified. The fix for it is added here too.

Finally, the local dns configuration in flame.sh is updated so that it
can be done correctly across different linux distributions (e.g.,
archlinux and ubuntu). The tests for flame.sh are under archlinux and
ubuntu.

* Add missing merge fix

* Make sdk config backwards compatible. (#355)

---------

Co-authored-by: GustavBaumgart <[email protected]>
Co-authored-by: Myungjin Lee <[email protected]>
Co-authored-by: vboxuser <[email protected]>
Co-authored-by: alexandruuBytex <[email protected]>
Co-authored-by: Alex Ungurean <[email protected]>
Co-authored-by: elqurio <[email protected]>

* refactor: end-to-end refactoring (#360)

* refactor: end-to-end refactoring

The development branch is yet fully tested. Hence, it contains several
incompatibility and bugs. The following issues are handled:

(1) func tag parsing in config.py (sdk): The config module has a small
bug, which the parsed func tags are not populated in Channel class
instance.

(2) design and schema creation failure (control plane): "name" field
in design and "version" field in schema are not used all the time. But
they are specified as "required" fields, which causes error during
assertion check on these field in openapi code.

(3) hyperparameter update failure in mlflow (sdk): hyperparameter is
no longer a dictionary, which is an expected format from mlflow.

(4) library update for new examples - asyncfl and fedprox (sdk):
asyncfl and fedprox algorithms and examples were introduced outside
the development branch, which caused compatibility issues.

(5) control plane example update (control plane): all the example code
in the control plane is outdated because of configuration parsing
module changes.

(6) README file update in adult and mnist_non_orchestration_mode
examples (doc): these two examples are for non-orchestration
mode. They will be deprecated. So, a note is added to their README
file.

* Update lib/python/flame/registry/mlflow.py

Co-authored-by: elqurio <[email protected]>

---------

Co-authored-by: openwithcode <[email protected]>
Co-authored-by: elqurio <[email protected]>

---------

Co-authored-by: elqurio <[email protected]>
Co-authored-by: GustavBaumgart <[email protected]>
Co-authored-by: Myungjin Lee <[email protected]>
Co-authored-by: vboxuser <[email protected]>
Co-authored-by: alexandruuBytex <[email protected]>
Co-authored-by: Alex Ungurean <[email protected]>
  • Loading branch information
7 people authored Mar 6, 2023
1 parent c609448 commit 8332efd
Show file tree
Hide file tree
Showing 115 changed files with 3,079 additions and 2,242 deletions.
35 changes: 35 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: check-yaml
- id: check-json
- id: end-of-file-fixer
- id: trailing-whitespace
- id: pretty-format-json
args: [--no-sort-keys, --autofix, --indent=4]
- id: check-added-large-files

- repo: https://github.com/psf/black
rev: 23.1.0
hooks:
- id: black
args: [--line-length=80, --skip-string-normalization]

- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
args: [--profile=black, --line-length=80]

- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
args:
[
"--ignore=E203,W503",
--max-complexity=10,
--max-line-length=80,
"--select=B,C,E,F,W,T4,B9",
]
12 changes: 7 additions & 5 deletions api/design_api.partials.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@
schema:
type: string
style: simple
- in: header
name: X-API-KEY
schema:
type: string
style: simple
explode: false
required: true
responses:
'200':
description: Deleted
Expand Down Expand Up @@ -390,11 +397,6 @@
style: simple
explode: false
required: true
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/DesignSchema'
responses:
"200":
description: Null response
Expand Down
17 changes: 15 additions & 2 deletions api/design_components.partials.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ DesignInfo:
type: string
required:
- id
- name
example:
name: diabete predict
description: Helps in quick diagnosis and prediction of diabetes among patients.
Expand Down Expand Up @@ -80,7 +79,6 @@ DesignSchema:
items:
$ref: '#/components/schemas/Connector'
required:
- version
- name
- roles
- channels
Expand Down Expand Up @@ -127,6 +125,10 @@ Role:
replica:
format: int32
type: integer
groupAssociation:
type: array
items:
$ref: '#/components/schemas/GroupAssociation'
required:
- name
example:
Expand All @@ -137,6 +139,17 @@ Role:
description: These are responsible to aggregate the updates from trainer nodes.
replica: 2

#########################
# GroupAssociation
#########################
GroupAssociation:
type: object
additionalProperties:
type: string
example:
"param-channel": "red"
"global-channel": "black"

#########################
# Channel between roles
#########################
Expand Down
98 changes: 84 additions & 14 deletions cmd/controller/app/database/db_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,62 +35,132 @@ type DBService interface {

// DatasetService is an interface that defines a collection of APIs related to dataset
type DatasetService interface {
CreateDataset(string, openapi.DatasetInfo) (string, error)
GetDatasets(string, int32) ([]openapi.DatasetInfo, error)
// CreateDataset creates a new dataset in the db
CreateDataset(userId string, info openapi.DatasetInfo) (string, error)

// GetDatasets returns a list of datasets associated with a user
GetDatasets(userId string, limit int32) ([]openapi.DatasetInfo, error)

// GetDatasetById returns the details of a particular dataset
GetDatasetById(string) (openapi.DatasetInfo, error)
}

// DesignService is an interface that defines a collection of APIs related to design
type DesignService interface {
// CreateDesign adds a design to the db
CreateDesign(userId string, info openapi.Design) error

// GetDesign returns a design associated with the given user and design ids
GetDesign(userId string, designId string) (openapi.Design, error)

// DeleteDesign deletes the design from the db
DeleteDesign(userId string, designId string) error

// GetDesigns returns a list of designs associated with a user
GetDesigns(userId string, limit int32) ([]openapi.DesignInfo, error)

// CreateDesignSchema adds a schema for a design to the db
CreateDesignSchema(userId string, designId string, info openapi.DesignSchema) error

// GetDesignSchema returns the schema of a design from the db
GetDesignSchema(userId string, designId string, version string) (openapi.DesignSchema, error)

// GetDesignSchemas returns all the schemas associated with the given designId
GetDesignSchemas(userId string, designId string) ([]openapi.DesignSchema, error)

// UpdateDesignSchema updates a schema for a design in the db
UpdateDesignSchema(userId string, designId string, version string, info openapi.DesignSchema) error

// DeleteDesignSchema deletes the schema of a design from the db
DeleteDesignSchema(userId string, designId string, version string) error

// CreateDesignCode adds the code of a design to the db
CreateDesignCode(userId string, designId string, fileName string, fileVer string, fileData *os.File) error

// GetDesignCode retrieves the code of a design from the db
GetDesignCode(userId string, designId string, version string) ([]byte, error)

// DeleteDesignCode deletes the code of a design from the db
DeleteDesignCode(userId string, designId string, version string) error
}

// JobService is an interface that defines a collection of APIs related to job
type JobService interface {
CreateJob(string, openapi.JobSpec) (openapi.JobStatus, error)
DeleteJob(string, string) error
GetJob(string, string) (openapi.JobSpec, error)
GetJobById(string) (openapi.JobSpec, error)
GetJobStatus(string, string) (openapi.JobStatus, error)
GetJobs(string, int32) ([]openapi.JobStatus, error)
UpdateJob(string, string, openapi.JobSpec) error
UpdateJobStatus(string, string, openapi.JobStatus) error
// CreateJob creates a new job
CreateJob(userId string, spec openapi.JobSpec) (openapi.JobStatus, error)

// DeleteJob deletes a given job
DeleteJob(userId string, jobId string) error

// GetJob gets the job associated with the provided jobId
GetJob(userId string, jobId string) (openapi.JobSpec, error)

// GetJobById gets the job associated with the provided jobId
GetJobById(jobId string) (openapi.JobSpec, error)

// GetJobStatus get the status of a job
GetJobStatus(userId string, jobId string) (openapi.JobStatus, error)

// GetJobs returns the list of jobs associated with a user
GetJobs(userId string, limit int32) ([]openapi.JobStatus, error)

// UpdateJob updates the job with the given jobId
UpdateJob(userId string, jobId string, spec openapi.JobSpec) error

// UpdateJobStatus updates the status of a job given the user Id, job Id and the openapi.JobStatus
UpdateJobStatus(userId string, jobId string, status openapi.JobStatus) error

// GetTaskInfo gets the information of a task given the user Id, job Id and task Id
GetTaskInfo(string, string, string) (openapi.TaskInfo, error)

// GetTasksInfo gets the information of tasks given the user Id, job Id and a limit
GetTasksInfo(string, string, int32, bool) ([]openapi.TaskInfo, error)

// GetTasksInfoGeneric gets the information of tasks given the user Id, job Id, limit and an option to include completed tasks
GetTasksInfoGeneric(string, string, int32, bool, bool) ([]openapi.TaskInfo, error)
}

// TaskService is an interface that defines a collection of APIs related to task
type TaskService interface {
// CreateTasks creates tasks given a set of objects.Task and a flag
CreateTasks([]objects.Task, bool) error

// DeleteTasks deletes tasks given the job Id and a flag
DeleteTasks(string, bool) error

// GetTask gets the task given the user Id, job Id and task Id
GetTask(string, string, string) (map[string][]byte, error)

// IsOneTaskInState evaluates if one of the task is in a certain state given the job Id
IsOneTaskInState(string, openapi.JobState) bool

// IsOneTaskInStateWithRole evaluates if one of the tasks is in a certain state and with a specific role given the job Id
IsOneTaskInStateWithRole(string, openapi.JobState, string) bool

// MonitorTasks monitors the tasks and returns a TaskInfo channel
MonitorTasks(string) (chan openapi.TaskInfo, chan error, context.CancelFunc, error)
SetTaskDirtyFlag(string, bool) error
UpdateTaskStateByFilter(string, openapi.JobState, map[string]interface{}) error
UpdateTaskStatus(string, string, openapi.TaskStatus) error

// SetTaskDirtyFlag sets the dirty flag for tasks given the job Id and a flag
SetTaskDirtyFlag(jobId string, dirty bool) error

// UpdateTaskStateByFilter updates the state of the task using a filter
UpdateTaskStateByFilter(jobId string, newState openapi.JobState, userFilter map[string]interface{}) error

// UpdateTaskStatus updates the status of a task given the user Id, job Id, and openapi.TaskStatus
UpdateTaskStatus(jobId string, taskId string, taskStatus openapi.TaskStatus) error
}

// ComputeService is an interface that defines a collection of APIs related to computes
type ComputeService interface {
// RegisterCompute registers a compute given a openapi.ComputeSpec
RegisterCompute(openapi.ComputeSpec) (openapi.ComputeStatus, error)

// GetComputeIdsByRegion gets all the compute Ids associated with a region
GetComputeIdsByRegion(string) ([]string, error)

// GetComputeById gets the compute info given the compute Id
GetComputeById(string) (openapi.ComputeSpec, error)
// UpdateDeploymentStatus call replaces existing agent statuses with received statuses in collection.

// UpdateDeploymentStatus updates the deployment status given the compute Id, job Id and agentStatuses map
UpdateDeploymentStatus(computeId string, jobId string, agentStatuses map[string]openapi.AgentState) error
}
5 changes: 2 additions & 3 deletions cmd/controller/app/database/mongodb/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ func TestMongoService_UpdateDeploymentStatus(t *testing.T) {
deploymentCollection: mt.Coll,
}
mt.AddMockResponses(mtest.CreateSuccessResponse(), mtest.CreateCursorResponse(1, "flame.deployment", mtest.FirstBatch, bson.D{}))
err := db.UpdateDeploymentStatus("test jobid","test compute id",
map[string]openapi.AgentState{"test task id": openapi.AGENT_DEPLOY_SUCCESS,
})
err := db.UpdateDeploymentStatus("test jobid", "test compute id",
map[string]openapi.AgentState{"test task id": openapi.AGENT_DEPLOY_SUCCESS})
assert.Nil(t, err)
})
mt.Run("status update failure", func(mt *mtest.T) {
Expand Down
22 changes: 11 additions & 11 deletions cmd/controller/app/job/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,19 @@ func (b *JobBuilder) getTaskTemplates() ([]string, map[string]*taskTemplate) {

for _, role := range b.schema.Roles {
template := &taskTemplate{}
JobConfig := &template.JobConfig
jobConfig := &template.JobConfig

JobConfig.Configure(b.jobSpec, b.jobParams.Brokers, b.jobParams.Registry, role, b.schema.Channels)
jobConfig.Configure(b.jobSpec, b.jobParams.Brokers, b.jobParams.Registry, role, b.schema.Channels)

// check channels and set default group if channels don't have groupby attributes set
for i := range JobConfig.Channels {
if len(JobConfig.Channels[i].GroupBy.Value) > 0 {
// check channels and set default group if channels don't have groupBy attributes set
for i := range jobConfig.Channels {
if len(jobConfig.Channels[i].GroupBy.Value) > 0 {
continue
}

// since there is no groupby attribute, set default
JobConfig.Channels[i].GroupBy.Type = groupByTypeTag
JobConfig.Channels[i].GroupBy.Value = append(JobConfig.Channels[i].GroupBy.Value, defaultGroup)
// since there is no groupBy attribute, set default
jobConfig.Channels[i].GroupBy.Type = groupByTypeTag
jobConfig.Channels[i].GroupBy.Value = append(jobConfig.Channels[i].GroupBy.Value, defaultGroup)
}

template.isDataConsumer = role.IsDataConsumer
Expand All @@ -192,7 +192,7 @@ func (b *JobBuilder) getTaskTemplates() ([]string, map[string]*taskTemplate) {
}
template.ZippedCode = b.roleCode[role.Name]
template.Role = role.Name
template.JobId = JobConfig.Job.Id
template.JobId = jobConfig.Job.Id

templates[role.Name] = template
}
Expand All @@ -205,9 +205,9 @@ func (b *JobBuilder) preCheck(dataRoles []string, templates map[string]*taskTemp
// This function will evolve as more invariants are defined
// Before processing templates, the following invariants should be met:
// 1. At least one data consumer role should be defined.
// 2. a role shouled be associated with a code.
// 2. a role should be associated with a code.
// 3. template should be connected.
// 4. when graph traversal starts at a data role template, the depth of groupby tag
// 4. when graph traversal starts at a data role template, the depth of groupBy tag
// should strictly decrease from one channel to another.
// 5. two different data roles cannot be connected directly.

Expand Down
17 changes: 17 additions & 0 deletions cmd/controller/app/objects/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type JobConfig struct {
Job JobIdName `json:"job"`
Role string `json:"role"`
Realm string `json:"realm"`
Groups map[string]string `json:"groups"`
Channels []openapi.Channel `json:"channels"`

MaxRunTime int32 `json:"maxRunTime,omitempty"`
Expand Down Expand Up @@ -101,6 +102,22 @@ func (cfg *JobConfig) Configure(jobSpec *openapi.JobSpec, brokers []config.Broke
// Realm will be updated when datasets are handled
cfg.Realm = ""
cfg.Channels = cfg.extractChannels(role.Name, channels)

// configure the groups of the job based on the groups associated with the assigned role
cfg.Groups = cfg.extractGroups(role.GroupAssociation)
}

// extractGroups - extracts the associated groups that a given role has of a particular job
func (cfg *JobConfig) extractGroups(groupAssociation []map[string]string) map[string]string {
groups := make(map[string]string)

for _, ag := range groupAssociation {
for key, value := range ag {
groups[key] = value
}
}

return groups
}

func (cfg *JobConfig) extractChannels(role string, channels []openapi.Channel) []openapi.Channel {
Expand Down
15 changes: 8 additions & 7 deletions cmd/deployer/app/resource_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cisco-open/flame/cmd/deployer/app/deployer"
"github.com/cisco-open/flame/cmd/deployer/config"
"github.com/cisco-open/flame/pkg/openapi"
"github.com/cisco-open/flame/pkg/openapi/constants"
pbNotify "github.com/cisco-open/flame/pkg/proto/notification"
"github.com/cisco-open/flame/pkg/restapi"
"github.com/cisco-open/flame/pkg/util"
Expand Down Expand Up @@ -279,8 +280,8 @@ func (r *resourceHandler) postDeploymentStatus(jobId string, taskStatuses map[st
taskStatuses, r.spec.ComputeId, jobId)
// construct url
uriMap := map[string]string{
"computeId": r.spec.ComputeId,
"jobId": jobId,
constants.ParamComputeID: r.spec.ComputeId,
constants.ParamJobID: jobId,
}
url := restapi.CreateURL(r.apiserverEp, restapi.PutDeploymentStatusEndpoint, uriMap)

Expand All @@ -296,8 +297,8 @@ func (r *resourceHandler) getDeploymentConfig(jobId string) (openapi.DeploymentC
zap.S().Infof("Sending request to apiserver / controller to get deployment config")
// construct url
uriMap := map[string]string{
"computeId": r.spec.ComputeId,
"jobId": jobId,
constants.ParamComputeID: r.spec.ComputeId,
constants.ParamJobID: jobId,
}
url := restapi.CreateURL(r.apiserverEp, restapi.GetDeploymentConfigEndpoint, uriMap)
code, respBody, err := restapi.HTTPGet(url)
Expand Down Expand Up @@ -355,9 +356,9 @@ func (r *resourceHandler) deployResources(deploymentConfig openapi.DeploymentCon
}

ctx := map[string]string{
"imageLoc": deploymentConfig.ImageLoc,
"taskId": taskId,
"taskKey": deploymentConfig.AgentKVs[taskId],
"imageLoc": deploymentConfig.ImageLoc,
constants.ParamTaskID: taskId,
"taskKey": deploymentConfig.AgentKVs[taskId],
}

rendered, renderErr := mustache.RenderFile(r.jobTemplatePath, &ctx)
Expand Down
Loading

0 comments on commit 8332efd

Please sign in to comment.