-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcentralized_simulation_main.py
118 lines (94 loc) · 3.62 KB
/
centralized_simulation_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from __future__ import absolute_import, division, print_function
import argparse
import random
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.optim as optim
import torch.nn.functional as F
import time
from torchvision import transforms
from nets.lenet import LeNet
from centralized.master import Master
from centralized.worker import Worker
import datasets.femnist as femnist
from utils.train import TrainArguments, train_model
import utils.flags as flags
DEFAULT_ARGS = {
'init_method': 'tcp://127.0.0.1:23456',
}
def run_master(device, model, args):
# TODO: support multiple datasets
test_dataset = femnist.FEMNISTDataset(args.dataset_dir, train=False,
transform=transforms.ToTensor(),
only_digits=True)
if args.validation_period:
client_ids = list(test_dataset.client_ids())
if args.max_num_users_per_worker:
rng = random.Random()
rng.seed(args.seed)
rng.shuffle(client_ids)
del client_ids[args.max_num_users_per_worker:]
test_datasets = [test_dataset.get_client_dataset(client_id)
for client_id in client_ids]
test_data_loader = torch.utils.data.DataLoader(
torch.utils.data.ConcatDataset(test_datasets),
batch_size=args.batch_size)
test_args = test_data_loader, args.validation_period
else:
test_args = None
master = Master(model, device, args.num_workers, args.init_method,
sample_size=args.sample_size, seed=args.seed)
master.run(args.epochs, test_args)
def run_worker(rank, device, model, args):
# TODO: support multiple datasets
dataset = femnist.get_partition(rank=rank-1,
world_size=args.num_workers,
**vars(args))
data_loader = torch.utils.data.DataLoader(
dataset, batch_size=args.batch_size, shuffle=True)
worker = Worker(model, device, rank, args.num_workers, args.init_method,
sample_size=args.sample_size, seed=args.seed)
train_args = TrainArguments(
data_loader=data_loader,
device=torch.device('cpu'),
model=model,
optimizer=optim.Adam(model.parameters(), lr=args.lr),
loss_fn=F.nll_loss,
log_every_n_steps=args.log_every_n_steps,
train_fn=train_model,
)
worker.run(args.epochs, args.local_epochs, train_args)
def main():
parser = argparse.ArgumentParser()
flags.add_base_flags(parser)
parser.add_argument(
'--sample_size', type=int,
help='size of worker samples in each epoch. '
'If not set, all workers paricipate in each epoch.')
parser.add_argument(
'--init_method', default=DEFAULT_ARGS['init_method'],
help='init method to use for torch.distributed (default={})'.format(
DEFAULT_ARGS['init_method']))
args = parser.parse_args()
# TODO: support GPU
device = torch.device('cpu')
torch.manual_seed(args.seed)
model = LeNet()
master_process = mp.Process(target=run_master, args=(device, model, args))
master_process.start()
time.sleep(0.1)
worker_processes = []
for rank in range(1, args.num_workers + 1):
p = mp.Process(target=run_worker,
args=(rank, device, model, args))
p.start()
worker_processes.append(p)
master_process.join()
for p in worker_processes:
if p.is_alive():
p.terminate()
else:
p.join()
if __name__ == "__main__":
main()