Per_FeAVG源码分析——根目录下:

KarhouTam的Per_FedAVG.源码链接:请使用到的点个star

utils.py

函数:get_args()

功能:用于加载参数:使用ArgumentParser()输入了联邦参数模型参数其他参数三类参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import torch
import random
import numpy as np
from typing import Iterator, Tuple, Union
from argparse import ArgumentParser


def get_args():
parser = ArgumentParser()
##‘--alpha’表示参数名称,type代表参数类型,default代表默认值设置,help则是对alpha的描述性解释。
parser.add_argument("--alpha", type=float, default=1e-2)
parser.add_argument("--beta", type=float, default=1e-3)
parser.add_argument("--global_epochs", type=int, default=200)
parser.add_argument("--local_epochs", type=int, default=4)
parser.add_argument(
"--pers_epochs",
type=int,
default=1,
help="Indicate how many data batches would be used for personalization. Negatives means that equal to train phase.",
)
parser.add_argument(
"--hf",
type=int,
default=0,
help="0 for performing Per-FedAvg(FO), others for Per-FedAvg(HF)",
)
parser.add_argument("--batch_size", type=int, default=40)
parser.add_argument(
"--valset_ratio",
type=float,
default=0.1,
help="Proportion of val set in the entire client local dataset",
)
parser.add_argument(
"--dataset", type=str, choices=["mnist", "cifar"], default="mnist"
)
parser.add_argument("--client_num_per_round", type=int, default=10)
parser.add_argument("--seed", type=int, default=17)
parser.add_argument(
"--gpu",
type=int,
default=1,
help="Non-zero value for using gpu, 0 for using cpu",
)
parser.add_argument(
"--eval_while_training",
type=int,
default=1,
help="Non-zero value for performing local evaluation before and after local training",
)
parser.add_argument("--log", type=int, default=1)
return parser.parse_args() #解析了命令行参数,并将解析结果作为函数的返回值,以便在其他地方可以使用参数

函数:eval()

功能:用于在PyTorch中评估给定模型的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@torch.no_grad() #这个装饰器确保在评估模型时不会计算梯度,从而节省内存和计算资源。
def eval(
model: torch.nn.Module,#评价的模型
dataloader: torch.utils.data.DataLoader,#数据集加载器
criterion: Union[torch.nn.MSELoss, torch.nn.CrossEntropyLoss],#损失函数,可以是均方误差(MSE)或交叉熵损失。
device=torch.device("cpu"),#用于运行模型的设备(默认为cpu)
) -> Tuple[torch.Tensor, torch.Tensor]:
#将模型设置为评估模式,确保如Dropout或BatchNorm这样的层在评估时以不同的方式工作。
model.eval()
total_loss = 0
num_samples = 0
acc = 0
#对于数据加载器中的每一批数据,计算损失和准确率。
for x, y in dataloader:
x, y = x.to(device), y.to(device)
logit = model(x)
# total_loss += criterion(logit, y) / y.size(-1)
total_loss += criterion(logit, y) #使用给定的损失函数计算预测(logit)和真实标签(y)之间的损失。
pred = torch.softmax(logit, -1).argmax(-1)#使用 torch.softmax 和 argmax 获取预测的类别索引,然后与真实标签比较,统计正确的预测。
acc += torch.eq(pred, y).int().sum()
num_samples += y.size(-1)
model.train()#在评估结束后将模型重置为训练模式,尽管这通常不是必需的,因为下一个使用模型的操作可能会自动设置它。
return total_loss, acc / num_samples

函数:fix_random_seed(seed: int)

作用:设置随机种子以确保结果的可复现性

1
2
3
4
5
6
7
8
def fix_random_seed(seed: int):
torch.cuda.empty_cache()#这个函数会清空CUDA缓存。
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)#这分别设置CPU和GPU上的随机种子,以确保PyTorch操作的随机性是可复现的。这是正确的。
random.seed(seed)
np.random.seed(seed)#这两个函数分别设置Python标准库中的random模块和NumPy库中的随机数生成器的种子。这也是为了确保其他库中的随机操作是可复现的。
torch.backends.cudnn.deterministic = True#当你设置deterministic=True时,你告诉cuDNN(CUDA Deep Neural Network library)在卷积操作中使用确定性的算法,而不是可能更快但不太确定的算法。这有助于确保即使在GPU上,结果也是可复现的。
torch.backends.cudnn.benchmark = True#这个设置告诉cuDNN为特定的配置自动寻找最快的卷积算法。但是,当benchmark=True时,cuDNN会尝试不同的算法并保留最佳的一个,这可能会导致结果不可复现,因为每次运行都可能选择不同的算法。

model.py

函数:elu(nn.Module)

作用:它实现了指数线性单元(Exponential Linear Unit, ELU)激活函数。

1
2
3
4
5
6
7
class elu(nn.Module):#继承自nn.Module的类elu。
#调用了父类nn.Module的__init__方法来确保基类的初始化。
def __init__(self) -> None:
super(elu, self).__init__()
#实现了ELU激活函数。对于输入x,如果x大于或等于0,则返回x本身;否则,返回0.2 * (torch.exp(x) - 1)。
def forward(self, x):
return torch.where(x >= 0, x, 0.2 * (torch.exp(x) - 1))
类:linear(nn.Module)

作用:__init__方法用于初始化权重(w)和偏置(b),而forward方法定义了数据通过网络层的前向传播过程。

1
2
3
4
5
6
7
8
9
10
11
class linear(nn.Module):
def __init__(self, in_c, out_c) -> None:
super(linear, self).__init__()
#使用了torch.randn(out_c, in_c) * torch.sqrt(torch.tensor(2 / in_c))来初始化权重,这是He初始化(也称为Kaiming初始化)的一个变种,它通常用于ReLU或其变种激活函数。
self.w = nn.Parameter(
torch.randn(out_c, in_c) * torch.sqrt(torch.tensor(2 / in_c))
)
self.b = nn.Parameter(torch.randn(out_c))

def forward(self, x):
return F.linear(x, self.w, self.b)
类:MLP_MNIST

作用:构建不同类型的神经网络模型,分别是MLP(多层感知机)、CNNMnist(用于MNIST手写数字数据集的卷积神经网络)和CNNCifar(用于CIFAR-10数据集的卷积神经网络)。实现了神经网络的前向传播过程,并用于分类任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class MLP_MNIST(nn.Module):
def __init__(self) -> None:
super(MLP_MNIST, self).__init__()
self.fc1 = linear(28 * 28, 80)
self.fc2 = linear(80, 60)
self.fc3 = linear(60, 10)
self.flatten = nn.Flatten()
self.activation = elu()

def forward(self, x):
x = self.flatten(x)

x = self.fc1(x)
x = self.activation(x)

x = self.fc2(x)
x = self.activation(x)

x = self.fc3(x)
x = self.activation(x)

return x


class MLP_CIFAR10(nn.Module):
def __init__(self) -> None:
super(MLP_CIFAR10, self).__init__()
self.fc1 = linear(32 * 32 * 3, 80)
self.fc2 = linear(80, 60)
self.fc3 = linear(60, 10)
self.flatten = nn.Flatten()
self.activation = elu()

def forward(self, x):
x = self.flatten(x)

x = self.fc1(x)
x = self.activation(x)

x = self.fc2(x)
x = self.activation(x)

x = self.fc3(x)
x = self.activation(x)

return x
类:MLP_CIFAR10

作用:构建不同类型的神经网络模型,分别是MLP(多层感知机)、CNNMnist(用于MNIST手写数字数据集的卷积神经网络)和CNNCifar(用于CIFAR-10数据集的卷积神经网络)。实现了神经网络的前向传播过程,并用于分类任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class MLP_CIFAR10(nn.Module):
def __init__(self) -> None:
super(MLP_CIFAR10, self).__init__()
self.fc1 = linear(32 * 32 * 3, 80)
self.fc2 = linear(80, 60)
self.fc3 = linear(60, 10)
self.flatten = nn.Flatten()
self.activation = elu()

def forward(self, x):
x = self.flatten(x)

x = self.fc1(x)
x = self.activation(x)

x = self.fc2(x)
x = self.activation(x)

x = self.fc3(x)
x = self.activation(x)

return x
字典:MODEL_DICT

作用:关联键"mnist""cifar"到它们各自的多层感知机(MLP)模型类MLP_MNISTMLP_CIFAR10

1
MODEL_DICT = {"mnist": MLP_MNIST, "cifar": MLP_CIFAR10}
函数:get_model(dataset, device)

作用:根据数据集名称从MODEL_DICT字典中获取相应的模型类,并实例化模型,然后将模型移动到指定的设备上(CPU或GPU)

1
2
def get_model(dataset, device):
return MODEL_DICT[dataset]().to(device)

perfedavg.py

函数:init()

作用:类的初始化方法,用于配置和初始化类的实例变量。该方法接收多个参数,包括客户端ID、学习率参数(alpha和beta,可能是某种优化算法中的参数,如Momentum或Adam中的beta1和beta2)、全局模型、损失函数、批量大小、数据集名称、本地训练轮数、验证集比例、日志记录器和GPU设备ID。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(
self,
client_id: int,
alpha: float,
beta: float,
global_model: torch.nn.Module,
criterion: Union[torch.nn.CrossEntropyLoss, torch.nn.MSELoss],
batch_size: int,
dataset: str,
local_epochs: int,
valset_ratio: float,
logger: rich.console.Console,
gpu: int,
):
#设备选择:根据传入的gpu参数和torch.cuda.is_available()的结果,选择使用CPU还是GPU进行计算。
if gpu and torch.cuda.is_available():
self.device = torch.device("cuda")
else:
self.device = torch.device("cpu")
#日志记录器:将传入的logger实例保存在类的实例变量中,以便在类的其他方法中使用。
self.logger = logger
#本地训练参数:保存了本地训练轮数(local_epochs)和损失函数(criterion)。
self.local_epochs = local_epochs
self.criterion = criterion
#客户端ID和模型:保存了客户端ID(client_id)和全局模型的深拷贝(global_model)。
self.id = client_id
self.model = deepcopy(global_model)
#学习率参数:保存了alpha和beta参数,这些参数可能是优化算法的一部分。
self.alpha = alpha
self.beta = beta
#数据加载器:调用get_dataloader函数来获取训练和验证的数据加载器(trainloader和valloader)。这个函数根据数据集名称、客户端ID、批量大小和验证集比例来返回相应的数据加载器。
self.trainloader, self.valloader = get_dataloader(
dataset, client_id, batch_size, valset_ratio
)
#迭代训练加载器:将训练数据加载器转换为迭代器并保存在iter_trainloader中。这样做可能是为了在类的其他方法中方便地从训练集中获取批量数据。
self.iter_trainloader = iter(self.trainloader)
函数:get_data_batch(self)

作用:用于从训练数据加载器中获取下一批数据,并处理StopIteration异常(当迭代器耗尽时触发)。当iter_trainloader中的数据被完全迭代一遍后,该方法会重新初始化迭代器并获取新的数据批次。

1
2
3
4
5
6
7
8
9
10
def get_data_batch(self):
#尝试获取数据:try块尝试从self.iter_trainloader(一个迭代器)中获取下一批数据(x为输入数据,y为标签)
try:
x, y = next(self.iter_trainloader)
#异常处理:
except StopIteration:
self.iter_trainloader = iter(self.trainloader)#重新初始化iter_trainloader,通过调用iter(self.trainloader)来创建一个新的迭代器。
x, y = next(self.iter_trainloader)#再次尝试从新的迭代器中获取下一批数据。
#无论数据是直接从原始迭代器中获取,还是通过重新初始化迭代器后获取,都会将数据(x和y)移动到self.device(即CPU或GPU)上,并返回它们。
return x.to(self.device), y.to(self.device)
函数:train()

作用:是一个用于在本地客户端上训练模型的函数。该方法接收全局模型、一个布尔值hessian_free(用于指示是否使用Hessian-free优化)和一个布尔值eval_while_training(用于指示是否在训练前后评估模型性能)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def train(
self,
global_model: torch.nn.Module,
hessian_free=False,
eval_while_training=False,
):
self.model.load_state_dict(global_model.state_dict())
#训练前评估(可选):如果eval_while_training为True,则在训练开始前使用utils.eval函数评估模型在验证集上的性能,并保存损失和准确率。
if eval_while_training:
loss_before, acc_before = utils.eval(
self.model, self.valloader, self.criterion, self.device
)
#执行训练:调用_train方法
self._train(hessian_free)

#训练后评估(可选):如果eval_while_training为True,则在训练结束后再次使用utils.eval函数评估模型在验证集上的性能,并保存损失和准确率。
if eval_while_training:
loss_after, acc_after = utils.eval(
self.model, self.valloader, self.criterion, self.device
)
#记录并返回模型:使用self.logger记录训练前后的损失和准确率变化。然后,使用SerializationTool.serialize_model方法将训练后的模型序列化为某种格式,并返回该序列化模型。
self.logger.log(
"client [{}] [red]loss: {:.4f} -> {:.4f} [blue]acc: {:.2f}% -> {:.2f}%".format(
self.id,
loss_before,
loss_after,
acc_before * 100.0,
acc_after * 100.0,
)
)
return SerializationTool.serialize_model(self.model)
函数:_train()

作用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def _train(self, hessian_free=False):
#使用Hessian-free方法的Per-FedAvg(HF)
#当hessian_free为True时,该方法将执行Hessian-free的Per-FedAvg训练过程。这通常涉及计算二阶导数(Hessian)的近似,以优化模型参数。
#
if hessian_free: # Per-FedAvg(HF)
#对于每个本地训练周期(self.local_epochs),首先复制当前模型(self.model)到一个临时模型(temp_model)中。
#使用get_data_batch方法获取第一批数据(data_batch_1),并计算关于临时模型的一阶梯度(grads)。
#使用这些梯度来更新临时模型的参数(这里使用了简单的SGD更新,但学习率self.alpha可能需要根据实际情况调整)。
#接着,获取第二批数据(data_batch_2),并再次计算关于临时模型的一阶梯度(grads_1st)。
#然后,获取第三批数据(data_batch_3),但这次计算的是关于原始模型(self.model)的二阶梯度(Hessian向量积,即grads_2nd)。注意,这里的计算可能需要特定的函数或库,因为直接计算完整的Hessian矩阵是计算密集且不可行的。
#最后,使用这些一阶梯度和二阶梯度来更新原始模型的参数。更新规则似乎结合了梯度下降和二阶优化方法(具体是哪种方法取决于self.beta和self.alpha的值)。
for _ in range(self.local_epochs):
temp_model = deepcopy(self.model)
data_batch_1 = self.get_data_batch()
grads = self.compute_grad(temp_model, data_batch_1)
for param, grad in zip(temp_model.parameters(), grads):
param.data.sub_(self.alpha * grad)

data_batch_2 = self.get_data_batch()
grads_1st = self.compute_grad(temp_model, data_batch_2)

data_batch_3 = self.get_data_batch()

grads_2nd = self.compute_grad(
self.model, data_batch_3, v=grads_1st, second_order_grads=True
)
# NOTE: Go check https://github.com/KarhouTam/Per-FedAvg/issues/2 if you confuse about the model update.
for param, grad1, grad2 in zip(
self.model.parameters(), grads_1st, grads_2nd
):
param.data.sub_(self.beta * grad1 - self.beta * self.alpha * grad2)

else: # Per-FedAvg(FO)
#只使用一阶梯度的Per-FedAvg(FO)
#首先在一个临时模型(temp_model)上计算第一个数据批次(data_batch_1)的梯度,并更新临时模型的参数。
#然后,它获取第二个数据批次(data_batch_2)并计算梯度,但这次它直接在原始模型(self.model)上应用这些梯度的更新,而不是临时模型。
for _ in range(self.local_epochs):
# ========================== FedAvg ==========================
# NOTE: You can uncomment those codes for running FedAvg.
# When you're trying to run FedAvg, comment other codes in this branch.

# data_batch = self.get_data_batch()
# grads = self.compute_grad(self.model, data_batch)
# for param, grad in zip(self.model.parameters(), grads):
# param.data.sub_(self.beta * grad)

# ============================================================

temp_model = deepcopy(self.model)
data_batch_1 = self.get_data_batch()
grads = self.compute_grad(temp_model, data_batch_1)

for param, grad in zip(temp_model.parameters(), grads):
param.data.sub_(self.alpha * grad)

data_batch_2 = self.get_data_batch()
grads = self.compute_grad(temp_model, data_batch_2)

for param, grad in zip(self.model.parameters(), grads):
param.data.sub_(self.beta * grad)
函数:compute_grad()

作用:根据给定的数据批次data_batch计算模型model的梯度。如果second_order_gradsTrue,它将计算二阶梯度(Hessian-vector积的一个近似),否则,它将计算标准的一阶梯度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def compute_grad(
self,
model: torch.nn.Module,
data_batch: Tuple[torch.Tensor, torch.Tensor],
v: Union[Tuple[torch.Tensor, ...], None] = None,
second_order_grads=False,
):
x, y = data_batch
if second_order_grads:
frz_model_params = deepcopy(model.state_dict())
delta = 1e-3
dummy_model_params_1 = OrderedDict()
dummy_model_params_2 = OrderedDict()
with torch.no_grad():
for (layer_name, param), grad in zip(model.named_parameters(), v):
dummy_model_params_1.update({layer_name: param + delta * grad})
dummy_model_params_2.update({layer_name: param - delta * grad})

model.load_state_dict(dummy_model_params_1, strict=False)
logit_1 = model(x)
loss_1 = self.criterion(logit_1, y)
grads_1 = torch.autograd.grad(loss_1, model.parameters())

model.load_state_dict(dummy_model_params_2, strict=False)
logit_2 = model(x)
loss_2 = self.criterion(logit_2, y)
grads_2 = torch.autograd.grad(loss_2, model.parameters())

model.load_state_dict(frz_model_params)

grads = []
with torch.no_grad():
for g1, g2 in zip(grads_1, grads_2):
grads.append((g1 - g2) / (2 * delta))
return grads

else:
logit = model(x)
loss = self.criterion(logit, y)
grads = torch.autograd.grad(loss, model.parameters())
return grads
函数:pers_N_eval()

作用:在给定全局模型(global_model)和个性化训练轮次(pers_epochs)之后,该函数首先加载全局模型的参数到客户端的本地模型(self.model),然后在本地数据集上进行训练和评估。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def pers_N_eval(self, global_model: torch.nn.Module, pers_epochs: int):
#加载全局模型参数:
self.model.load_state_dict(global_model.state_dict())

#评估初始模型性能:
loss_before, acc_before = utils.eval(
self.model, self.valloader, self.criterion, self.device
)
#定义优化器:
optimizer = torch.optim.SGD(self.model.parameters(), lr=self.alpha)
#个性化训练:这部分代码执行了 pers_epochs 轮次的个性化训练。在每次迭代中,它首先从 self.get_data_batch() 获取一个数据批次,然后使用这个数据批次来更新模型的参数。
for _ in range(pers_epochs):
x, y = self.get_data_batch()
logit = self.model(x)
loss = self.criterion(logit, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
#评估训练后的模型性能:在个性化训练完成后,再次评估模型的性能。
loss_after, acc_after = utils.eval(
self.model, self.valloader, self.criterion, self.device
)
#记录并输出日志:使用日志记录器(self.logger)来记录训练前后的损失和准确率。这里还使用了颜色代码(如 [red] 和 [blue]),但这些可能不会在纯文本日志中显示,除非日志记录器进
self.logger.log(
"client [{}] [red]loss: {:.4f} -> {:.4f} [blue]acc: {:.2f}% -> {:.2f}%".format(
self.id, loss_before, loss_after, acc_before * 100.0, acc_after * 100.0
)
)
#返回评估结果:
return {
"loss_before": loss_before,
"acc_before": acc_before,
"loss_after": loss_after,
"acc_after": acc_after,
}

main.py

用于启动分布式或联邦学习中的客户端或服务器进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
if __name__ == "__main__":

args = get_args()#使用get_args()从命令行获取参数,并将这些参数存储在一个对象中
fix_random_seed(args.seed)#用于设置随机种子,确保实验的可重复性。
if os.path.isdir("./log") == False:#这行代码检查当前目录下是否存在一个名为log的目录。如果不存在,则执行下一行代码。
os.mkdir("./log")#如果log目录不存在,这行代码会创建它。os.mkdir用于创建新目录。
#首先,检查args对象中是否有gpu参数且其值为True;其次,检查是否有可用的CUDA设备(即是否有NVIDIA GPU并安装了适当的CUDA和PyTorch版本)。
if args.gpu and torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")

global_model = get_model(args.dataset, device)#创建了一个日志记录器对象logger
logger = Console(record=args.log)
logger.log(f"Arguments:", dict(args._get_kwargs()))
clients_4_training, clients_4_eval, client_num_in_total = get_client_id_indices(
args.dataset
)


# init clients
#初始化一个客户端列表,每个客户端都是PerFedAvgClient类的实例。
clients = [
PerFedAvgClient(
client_id=client_id,
alpha=args.alpha,
beta=args.beta,
global_model=global_model,
criterion=torch.nn.CrossEntropyLoss(),
batch_size=args.batch_size,
dataset=args.dataset,
local_epochs=args.local_epochs,
valset_ratio=args.valset_ratio,
logger=logger,
gpu=args.gpu,
)
for client_id in range(client_num_in_total)
]
# training
#开始训练过程的,并且它使用了日志记录器(logger)来输出到log。
logger.log("=" * 20, "TRAINING", "=" * 20, style="bold red")
for _ in track( #全局训练循环:
range(args.global_epochs), "Training...", console=logger, disable=args.log
):
# select clients
#选择客户端:在每次全局迭代中,代码从clients_4_training中随机选择args.client_num_per_round个客户端进行本地训练。
selected_clients = random.sample(clients_4_training, args.client_num_per_round)

model_params_cache = []
# client local training 客户端本地训练
#对于选定的每个客户端,代码执行train方法。该方法以当前的全局模型作为起点,并可能在本地数据集上进行训练。train方法返回序列化后的模型参数,这些参数被添加到model_params_cache列表中。
for client_id in selected_clients:
serialized_model_params = clients[client_id].train(
global_model=global_model,
hessian_free=args.hf,
eval_while_training=args.eval_while_training,
)
model_params_cache.append(serialized_model_params)

# aggregate model parameters聚合模型参数:
#在所有选定的客户端完成本地训练后,代码使用fedavg_aggregate函数(来聚合模型参数。这个函数将model_params_cache列表中的模型参数进行聚合(通常使用FedAvg算法,即加权平均)。然后,使用deserialize_model函数将聚合后的模型参数反序列化并应用到global_model上,从而更新全局模型。
aggregated_model_params = Aggregators.fedavg_aggregate(model_params_cache)
SerializationTool.deserialize_model(global_model, aggregated_model_params)
#分隔符日志:
#最后,代码使用logger对象输出一个由60个等号字符组成的分隔符,可能用于在日志中分隔不同的全局迭代轮次
logger.log("=" * 60)

#描述了联邦学习中的评估过程,并且它记录了模型在评估前后的性能。
# eval
pers_epochs = args.local_epochs if args.pers_epochs == -1 else args.pers_epochs #确定持久化轮次(Persistent Epochs)

#开始评估日志
#初始化评估结果列表
#这四个列表用于存储每个客户端在评估前后的损失和准确率。
logger.log("=" * 20, "EVALUATION", "=" * 20, style="bold blue")
loss_before = []
loss_after = []
acc_before = []
acc_after = []

#客户端评估循环
#代码遍历clients_4_eval列表中的每个客户端ID,并对每个客户端执行pers_N_eval方法。这个方法在评估前对模型进行本地训练(使用pers_epochs指定的轮次),然后评估模型的性能,并返回一个包含损失和准确率的字典。这些值随后被添加到相应的列表中。
for client_id in track(
clients_4_eval, "Evaluating...", console=logger, disable=args.log
):
stats = clients[client_id].pers_N_eval(
global_model=global_model, pers_epochs=pers_epochs,
)
loss_before.append(stats["loss_before"])
loss_after.append(stats["loss_after"])
acc_before.append(stats["acc_before"])
acc_after.append(stats["acc_after"])

#输出评估结果
#代码使用logger对象输出评估结果。它计算了所有客户端的平均损失和准确率,并将它们以格式化的方式输出。
logger.log("=" * 20, "RESULTS", "=" * 20, style="bold green")
logger.log(f"loss_before_pers: {(sum(loss_before) / len(loss_before)):.4f}")
logger.log(f"acc_before_pers: {(sum(acc_before) * 100.0 / len(acc_before)):.2f}%")
logger.log(f"loss_after_pers: {(sum(loss_after) / len(loss_after)):.4f}")
logger.log(f"acc_after_pers: {(sum(acc_after) * 100.0 / len(acc_after)):.2f}%")

#保存评估结果(如果启用日志)
if args.log:
algo = "HF" if args.hf else "FO"
logger.save_html(
f"./log/{args.dataset}_{args.client_num_per_round}_{args.global_epochs}_{pers_epochs}_{algo}.html"
)

Per_FeAVG源码分析——data目录下:

init.py

不做分析

utils.py

字典:DATASET_DICT

作用:它将字符串键(如 "mnist""cifar")映射到对应的类(MNISTDatasetCIFARDataset

1
2
3
4
DATASET_DICT = {
"mnist": MNISTDataset,
"cifar": CIFARDataset,
}
函数:CURRENT_DIR

作用:CURRENT_DIR 被设置为当前 Python 脚本文件的父目录的绝对路径

1
CURRENT_DIR = Path(__file__).parent.abspath()
函数:get_dataloader

作用:从一个预处理好的 pickle 文件中加载数据集,并根据给定的 client_id 分割为训练集和验证集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def get_dataloader(dataset: str, client_id: int, batch_size=20, valset_ratio=0.1):
pickles_dir = CURRENT_DIR / dataset / "pickles"
if os.path.isdir(pickles_dir) is False:
raise RuntimeError("Please preprocess and create pickles first.")

with open(pickles_dir / str(client_id) + ".pkl", "rb") as f:
client_dataset: DATASET_DICT[dataset] = pickle.load(f)

val_num_samples = int(valset_ratio * len(client_dataset))
train_num_samples = len(client_dataset) - val_num_samples

trainset, valset = random_split(
client_dataset, [train_num_samples, val_num_samples]
)
trainloader = DataLoader(trainset, batch_size, drop_last=True)
valloader = DataLoader(valset, batch_size)

return trainloader, valloader

函数:get_client_id_indices(dataset)

作用:从一个特定的 pickle 文件中加载并返回关于数据集分割的信息。从一个 seperation.pkl 文件中读取训练集、测试集以及总数目的索引或标识符。

1
2
3
4
5
def get_client_id_indices(dataset):
dataset_pickles_path = CURRENT_DIR / dataset / "pickles"
with open(dataset_pickles_path / "seperation.pkl", "rb") as f:
seperation = pickle.load(f)
return (seperation["train"], seperation["test"], seperation["total"])

preprocess.py

函数:CURRENT_DIR

作用:CURRENT_DIR 被设置为当前 Python 脚本文件的父目录的绝对路径

1
CURRENT_DIR = Path(__file__).parent.abspath()
字典:DATASET

作用:数据集名称映射到了两个元组,可以基于数据集名称来动态地加载和实例化相应的数据集

1
2
3
4
DATASET = {
"mnist": (MNIST, MNISTDataset),
"cifar": (CIFAR10, CIFARDataset),
}
字典:MEAN

作用:用来存储不同数据集的像素均值。这些均值通常用于数据归一化,只包含一个灰度通道,因此其均值是一个单元素元组 (0.1307,)。这意味着当你对 MNIST 数据集进行归一化时,你会从每个像素值中减去 0.1307。

1
2
3
4
MEAN = {
"mnist": (0.1307,),
"cifar": (0.4914, 0.4822, 0.4465),
}
字典:STD

作用:存储不同数据集的像素标准差。使用归一化时,标准差通常与均值一起使用,以确保数据的每个特征(在这个案例中是像素值)都有相似的尺度。它只包含一个灰度通道,因此其标准差是一个单元素元组 (0.3015,)。这意味着在归一化 MNIST 数据时,每个像素值都会根据其灰度通道的标准差进行缩放。

1
2
3
4
STD = {
"mnist": (0.3015,),
"cifar": (0.2023, 0.1994, 0.2010),
}
函数:preprocess()

作用:用于预处理数据集,在联邦学习或分布式学习的场景中,数据需要在多个客户端(或节点)之间分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def preprocess(args: Namespace) -> None:
#参数和目录设置:
#设置数据集目录(dataset_dir)和pickle文件目录(pickles_dir)。
dataset_dir = CURRENT_DIR / args.dataset
pickles_dir = CURRENT_DIR / args.dataset / "pickles"

#设置随机数生成器的种子,以确保结果的可重复性。
np.random.seed(args.seed)
random.seed(args.seed)
torch.manual_seed(args.seed)
num_train_clients = int(args.client_num_in_total * args.fraction)
num_test_clients = args.client_num_in_total - num_train_clients

#数据转换
#定义了一个转换transform,它只包含标准化(假设MEAN和STD是预定义的字典,包含了每个数据集的均值和标准差),初始化了训练集和测试集的统计信息字典
transform = transforms.Compose(
[transforms.Normalize(MEAN[args.dataset], STD[args.dataset]),]
)
target_transform = None
trainset_stats = {}
testset_stats = {}

#目录和文件处理:
#检查数据集目录是否存在,如果不存在则创建它。
#如果pickle目录已经存在,则删除它(可能是为了确保没有旧的pickle文件干扰)。
#创建新的pickle目录。
if not os.path.isdir(CURRENT_DIR / args.dataset):
os.mkdir(CURRENT_DIR / args.dataset)
if os.path.isdir(pickles_dir):
os.system(f"rm -rf {pickles_dir}")
os.mkdir(f"{pickles_dir}")

#加载数据集
#从预定义的DATASET字典中获取原始和目标数据集。
#使用ori_dataset类创建训练集和测试集。注意,训练集在加载时还指定了download=True(用于自动下载数据集),而测试集没有。两者都使用了transforms.ToTensor()进行初步的数据转换。
ori_dataset, target_dataset = DATASET[args.dataset]
trainset = ori_dataset(
dataset_dir, train=True, download=True, transform=transforms.ToTensor()
)
testset = ori_dataset(dataset_dir, train=False, transform=transforms.ToTensor())

#分配类别到客户端
#根据args.classes确定每个客户端应有的类别数量(默认为10类)
#使用randomly_alloc_classes函数将类别随机分配给训练集和测试集的客户端
#randomly_alloc_classes函数还返回每个子集的统计信息(
num_classes = 10 if args.classes <= 0 else args.classes
all_trainsets, trainset_stats = randomly_alloc_classes(
ori_dataset=trainset,
target_dataset=target_dataset,
num_clients=num_train_clients,
num_classes=num_classes,
transform=transform,
target_transform=target_transform,
)
all_testsets, testset_stats = randomly_alloc_classes(
ori_dataset=testset,
target_dataset=target_dataset,
num_clients=num_test_clients,
num_classes=num_classes,
transform=transform,
target_transform=target_transform,
)

#将所有训练集和测试集组合到一个列表all_datasets中
all_datasets = all_trainsets + all_testsets

#保存客户端数据集为pickle文件
#通过enumerate(all_datasets)遍历all_datasets列表中的每个数据集和对应的client_id(即客户端的ID)。
#使用pathlib的/操作符(如果pickles_dir是pathlib.Path对象)来构建pickle文件的路径。
#使用pickle.dump()函数将每个数据集保存到对应的pickle文件中。
for client_id, dataset in enumerate(all_datasets):
with open(pickles_dir / str(client_id) + ".pkl", "wb") as f:
pickle.dump(dataset, f)

#保存客户端索引
#创建一个字典,其中包含三个键:“train”、“test”和“total”,“total”的值就是总的客户端数量。
#“train”和“test”的值是客户端ID的列表,分别代表训练集和测试集的客户端。这里假设num_train_clients表示训练集客户端的数量,而args.client_num_in_total表示总的客户端数量。
#使用pickle.dump()函数将这个字典保存到名为“seperation.pkl”的文件中。这个文件用于在后续的训练和测试过程中区分哪些客户端是训练集,哪些是测试集
with open(pickles_dir / "seperation.pkl", "wb") as f:
pickle.dump(
{
"train": [i for i in range(num_train_clients)],
"test": [i for i in range(num_train_clients, args.client_num_in_total)],
"total": args.client_num_in_total,
},
f,
)

#保存数据集统计信息
#trainset_stats和testset_stats是在之前的预处理步骤中收集的训练集和测试集的统计信息。
#使用json.dump()函数将这些统计信息保存为JSON格式的文件“all_stats.json”。这个文件用于在后续的模型训练和评估过程中提供数据集的相关信息
with open(dataset_dir / "all_stats.json", "w") as f:
json.dump({"train": trainset_stats, "test": testset_stats}, f)
函数:randomly_alloc_classes

作用:将原始数据集(ori_dataset)中的样本随机分配给多个客户端(或用户),同时确保每个客户端获得指定数量的不同类别的样本。函数还返回了分配给每个客户端的数据集列表和相应的统计信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def randomly_alloc_classes(
ori_dataset: Dataset,
target_dataset: Dataset,
num_clients: int,
num_classes: int,
transform=None,
target_transform=None,
) -> Tuple[List[Dataset], Dict[str, Dict[str, int]]]:

#分配样本
#使用noniid_slicing函数来将ori_dataset中的样本分配给num_clients个客户端。这个函数应该返回一个字典,其中键是客户端ID,值是分配给该客户端的样本索引列表。
dict_users = noniid_slicing(ori_dataset, num_clients, num_clients * num_classes)
stats = {}
#收集统计信息
#对于每个客户端,从ori_dataset中提取标签(ori_dataset.targets),然后根据分配给该客户端的样本索引列表计算标签的类别分布。这些统计信息被存储在stats字典中。
for i, indices in dict_users.items():
targets_numpy = np.array(ori_dataset.targets)
stats[f"client {i}"] = {"x": 0, "y": {}}
stats[f"client {i}"]["x"] = len(indices)
stats[f"client {i}"]["y"] = Counter(targets_numpy[indices].tolist())
datasets = []
#创建数据集
#使用target_dataset类从ori_dataset中创建子集,每个子集对应于一个客户端。这是通过从ori_dataset中提取分配给该客户端的样本,并传递给target_dataset的构造函数来实现的。
for indices in dict_users.values():
datasets.append(
target_dataset(
[ori_dataset[i] for i in indices],
transform=transform,
target_transform=target_transform,
)
)
return datasets, stats
函数:__name__==“main”

作用:基本的命令行参数解析设置,它使用argparse库来从命令行获取参数。这些参数包括数据集类型、客户端总数、训练客户端的比例、每个客户端数据所属的类别数量以及随机种子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"--dataset", type=str, choices=["mnist", "cifar"], default="mnist",
)
parser.add_argument("--client_num_in_total", type=int, default=200)
parser.add_argument(
"--fraction", type=float, default=0.9, help="Propotion of train clients"
)
parser.add_argument(
"--classes",
type=int,
default=2,
help="Num of classes that one client's data belong to.",
)
parser.add_argument("--seed", type=int, default=0)
args = parser.parse_args()
preprocess(args)

dataset.py

类:MNISTDataset(Dataset)
函数:init

作用:用于初始化一个对象的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(
#参数
self,
subset=None,
data=None,
targets=None,
transform=None,
target_transform=None,
) -> None:
#处理
self.transform = transform
self.target_transform = target_transform
#如果data和targets都非空,它将data增加一个新的维度(使用unsqueeze(1)),并将data和targets设置为对象的属性。
if (data is not None) and (targets is not None):
self.data = data.unsqueeze(1)
self.targets = targets
# 如果subset非空,它将遍历subset,检查元组中的每个元素是否为张量。如果不是,它将使用torch.tensor将其转换为张量。然后,它使用torch.stack将数据和标签分别堆叠成张量,并设置为对象的属性。
elif subset is not None:
self.data = torch.stack(
list(
map(
lambda tup: tup[0]
if isinstance(tup[0], torch.Tensor)
else torch.tensor(tup[0]),
subset,
)
)
)
self.targets = torch.stack(
list(
map(
lambda tup: tup[1]
if isinstance(tup[1], torch.Tensor)
else torch.tensor(tup[1]),
subset,
)
)
)
#如果data和targets以及subset都为空,则抛出一个ValueError,说明需要提供数据格式。
else:
raise ValueError(
"Data Format: subset: Tuple(data: Tensor / Image / np.ndarray, targets: Tensor) OR data: List[Tensor] targets: List[Tensor]"
)
函数:getitem

作用:允许类的实例像列表、元组或其他可迭代对象那样进行索引访问。在你提供的上下文中,这个方法通常用于数据加载器(如PyTorch的DataLoader),以便在训练或评估模型时能够按索引访问数据集中的单个样本。

1
2
3
4
5
6
7
8
9
10
def __getitem__(self, index):
data, targets = self.data[index], self.targets[index]

if self.transform is not None:
data = self.transform(self.data[index])

if self.target_transform is not None:
targets = self.target_transform(self.targets[index])

return data, targets
函数:len

作用:确定self.data的长度

1
2
def __len__(self):
return len(self.targets)
类:CIFARDataset(Dataset)
函数:init

作用:用于初始化一个对象的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
self,
subset=None,
data=None,
targets=None,
transform=None,
target_transform=None,
) -> None:
self.transform = transform
self.target_transform = target_transform
if (data is not None) and (targets is not None):
self.data = data.unsqueeze(1)
self.targets = targets
elif subset is not None:
self.data = torch.stack(
list(
map(
lambda tup: tup[0]
if isinstance(tup[0], torch.Tensor)
else torch.tensor(tup[0]),
subset,
)
)
)
self.targets = torch.stack(
list(
map(
lambda tup: tup[1]
if isinstance(tup[1], torch.Tensor)
else torch.tensor(tup[1]),
subset,
)
)
)
else:
raise ValueError(
"Data Format: subset: Tuple(data: Tensor / Image / np.ndarray, targets: Tensor) OR data: List[Tensor] targets: List[Tensor]"
)
函数:getitem

作用:允许类的实例像列表、元组或其他可迭代对象那样进行索引访问。在你提供的上下文中,这个方法通常用于数据加载器(如PyTorch的DataLoader),以便在训练或评估模型时能够按索引访问数据集中的单个样本。

1
2
3
4
5
6
7
8
9
10
def __getitem__(self, index):
img, targets = self.data[index], self.targets[index]

if self.transform is not None:
img = self.transform(self.data[index])

if self.target_transform is not None:
targets = self.target_transform(self.targets[index])

return img, targets
函数:len

作用:返回长度

1
2
def __len__(self):
return len(self.targets)