|
- # Copyright 2021 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # ============================================================================
- """Learning rate scheduler."""
- import math
- from collections import Counter
-
- import numpy as np
-
-
- def linear_warmup_lr(current_step, warmup_steps, base_lr, init_lr):
- """Linear learning rate."""
- lr_inc = (float(base_lr) - float(init_lr)) / float(warmup_steps)
- lr = float(init_lr) + lr_inc * current_step
- return lr
-
-
- def warmup_step_lr(
- lr,
- lr_epochs,
- steps_per_epoch,
- warmup_epochs,
- max_epoch,
- gamma=0.1):
- """Warmup step learning rate."""
- base_lr = lr
- warmup_init_lr = 0
- total_steps = int(max_epoch * steps_per_epoch)
- warmup_steps = int(warmup_epochs * steps_per_epoch)
- milestones = lr_epochs
- milestones_steps = []
- for milestone in milestones:
- milestones_step = milestone * steps_per_epoch
- milestones_steps.append(milestones_step)
-
- lr_each_step = []
- lr = base_lr
- milestones_steps_counter = Counter(milestones_steps)
- for i in range(total_steps):
- if i < warmup_steps:
- lr = linear_warmup_lr(i + 1, warmup_steps, base_lr, warmup_init_lr)
- else:
- lr = lr * gamma ** milestones_steps_counter[i]
- lr_each_step.append(lr)
-
- return np.array(lr_each_step).astype(np.float32)
-
-
- def multi_step_lr(lr, milestones, steps_per_epoch, max_epoch, gamma=0.1):
- return warmup_step_lr(
- lr,
- milestones,
- steps_per_epoch,
- 0,
- max_epoch,
- gamma=gamma)
-
-
- def step_lr(lr, epoch_size, steps_per_epoch, max_epoch, gamma=0.1):
- lr_epochs = []
- for i in range(1, max_epoch):
- if i % epoch_size == 0:
- lr_epochs.append(i)
- return multi_step_lr(
- lr,
- lr_epochs,
- steps_per_epoch,
- max_epoch,
- gamma=gamma)
-
-
- def warmup_cosine_annealing_lr(
- lr,
- steps_per_epoch,
- warmup_epochs,
- max_epoch,
- t_max,
- eta_min=0):
- """Cosine annealing learning rate."""
- base_lr = lr
- warmup_init_lr = 0
- total_steps = int(max_epoch * steps_per_epoch)
- warmup_steps = int(warmup_epochs * steps_per_epoch)
-
- lr_each_step = []
- for i in range(total_steps):
- last_epoch = i // steps_per_epoch
- if i < warmup_steps:
- lr = linear_warmup_lr(i + 1, warmup_steps, base_lr, warmup_init_lr)
- else:
- lr = eta_min + (base_lr - eta_min) * \
- (1. + math.cos(math.pi * last_epoch / t_max)) / 2
- lr_each_step.append(lr)
-
- return np.array(lr_each_step).astype(np.float32)
-
-
- def warmup_cosine_annealing_lr_v2(
- lr,
- steps_per_epoch,
- warmup_epochs,
- max_epoch,
- t_max,
- eta_min=0):
- """Cosine annealing learning rate V2."""
- base_lr = lr
- warmup_init_lr = 0
- total_steps = int(max_epoch * steps_per_epoch)
- warmup_steps = int(warmup_epochs * steps_per_epoch)
-
- last_lr = 0
- last_epoch_v1 = 0
-
- t_max_v2 = int(max_epoch * 1 / 3)
-
- lr_each_step = []
- for i in range(total_steps):
- last_epoch = i // steps_per_epoch
- if i < warmup_steps:
- lr = linear_warmup_lr(i + 1, warmup_steps, base_lr, warmup_init_lr)
- else:
- if i < total_steps * 2 / 3:
- lr = eta_min + (base_lr - eta_min) * (1. + \
- math.cos(math.pi * last_epoch / t_max)) / 2
- last_lr = lr
- last_epoch_v1 = last_epoch
- else:
- base_lr = last_lr
- last_epoch = last_epoch - last_epoch_v1
- lr = eta_min + (base_lr - eta_min) * (1. + \
- math.cos(math.pi * last_epoch / t_max_v2)) / 2
-
- lr_each_step.append(lr)
- return np.array(lr_each_step).astype(np.float32)
-
-
- def warmup_cosine_annealing_lr_sample(
- lr,
- steps_per_epoch,
- warmup_epochs,
- max_epoch,
- t_max,
- eta_min=0):
- """Warmup cosine annealing learning rate."""
- start_sample_epoch = 60
- step_sample = 2
- tobe_sampled_epoch = 60
- end_sampled_epoch = start_sample_epoch + step_sample * tobe_sampled_epoch
- max_sampled_epoch = max_epoch + tobe_sampled_epoch
- t_max = max_sampled_epoch
-
- base_lr = lr
- warmup_init_lr = 0
- total_steps = int(max_epoch * steps_per_epoch)
- total_sampled_steps = int(max_sampled_epoch * steps_per_epoch)
- warmup_steps = int(warmup_epochs * steps_per_epoch)
-
- lr_each_step = []
-
- for i in range(total_sampled_steps):
- last_epoch = i // steps_per_epoch
- if last_epoch in range(
- start_sample_epoch,
- end_sampled_epoch,
- step_sample):
- continue
- if i < warmup_steps:
- lr = linear_warmup_lr(i + 1, warmup_steps, base_lr, warmup_init_lr)
- else:
- lr = eta_min + (base_lr - eta_min) * \
- (1. + math.cos(math.pi * last_epoch / t_max)) / 2
- lr_each_step.append(lr)
-
- assert total_steps == len(lr_each_step)
- return np.array(lr_each_step).astype(np.float32)
-
-
- def my_lr(max_epoch, steps_per_epoch, per_step=2, gamma=0.1, lr=0.001):
- lr_each_step = []
- total_steps = steps_per_epoch * max_epoch
- n = total_steps // per_step
- for i in range(max_epoch * steps_per_epoch):
- if i % n == 0 and i != 0:
- lr = lr * gamma
- lr_each_step.append(lr)
-
- assert total_steps == len(lr_each_step)
- return np.array(lr_each_step).astype(np.float32)
-
-
- def get_lr(args):
- """generate learning rate."""
- if args.lr_scheduler == 'exponential':
- lr = warmup_step_lr(args.lr,
- args.lr_epochs,
- args.steps_per_epoch,
- args.warmup_epochs,
- args.max_epoch,
- gamma=args.lr_gamma,
- )
- elif args.lr_scheduler == 'cosine_annealing':
- lr = warmup_cosine_annealing_lr(args.lr,
- args.steps_per_epoch,
- args.warmup_epochs,
- args.max_epoch,
- args.t_max,
- args.eta_min)
- elif args.lr_scheduler == 'cosine_annealing_V2':
- lr = warmup_cosine_annealing_lr_v2(args.lr,
- args.steps_per_epoch,
- args.warmup_epochs,
- args.max_epoch,
- args.t_max,
- args.eta_min)
- elif args.lr_scheduler == 'cosine_annealing_sample':
- lr = warmup_cosine_annealing_lr_sample(args.lr,
- args.steps_per_epoch,
- args.warmup_epochs,
- args.max_epoch,
- args.t_max,
- args.eta_min)
- elif args.lr_scheduler == 'my_lr':
- lr = my_lr(
- args.max_epoch,
- args.steps_per_epoch,
- args.per_step,
- args.lr_gamma,
- args.lr)
- else:
- raise NotImplementedError(args.lr_scheduler)
- return lr
|