Skip to main content

让 Mindspore 训练更容易

项目描述

心灵孢子训练师

派皮 PyPI - Python 版本 GitHub 工作流状态 问题 执照

MindsporeTrainer是基于升思MindSpore的训练框架。让Mindspore的算法研究更容易一些。

Mindspore上手不易,希望能帮助各位炼丹师的升级更容易一些。

主页

目录

主要特性

主要的几个出发点是:

  • 采用纯python实现,方便多卡训练过程中的调试
  • 扩展,对新任务采用插件式访问
  • 便于实现模型的训练、评估、预测等

安装

点子

pip install MindsporeTrainer

python setup.py

使用方法

DeBERTa 预训练任务示例

1.定义任务

MindsporeTrainer/apps/tasks/deberta.py

from collections import OrderedDict
import numpy as np
import os
import random
from shutil import copyfile
from loguru import logger

from mindspore.communication import get_rank, get_group_size

from MindsporeTrainer.data import ExampleInstance, ExampleSet
from MindsporeTrainer.data.example import *
from MindsporeTrainer.apps.tasks import EvalData, TransformerTask
from MindsporeTrainer.apps.tasks import register_task
from MindsporeTrainer.utils.metrics import *
from MindsporeTrainer.utils.metrics import BertMetric
from MindsporeTrainer.utils.masker import NGramMaskGenerator
from MindsporeTrainer.data.dynamic_dataset import create_dynamic_dataset
from MindsporeTrainer.modeling.tokenizers import BertTokenizer

@register_task(name="DEBERTA", desc="Basic DEBERTA task")
class DEBERTATask(TransformerTask):
    def __init__(self, data_dir, args, **kwargs):
        super().__init__(args, **kwargs)
        self.max_seq_length = 512
        self.model_config = 'data/pretrained_models/deberta-base-v2/model_config.json'
        self.vocab_type = 'BERT'
        self.vocab_path = 'data/pretrained_models/deberta-base-v2/vocab.txt'
        self.data_dir = data_dir
        self.args = args
        self.metric = 'bert'
        self.main_metric = 'perplexity'
        self.optimizer_name = 'Lamb'
        self.tokenizer = BertTokenizer(self.vocab_path)
        if args.distribute:
            self.rank_id = get_rank()
            self.rank_size = get_group_size()
        else:
            self.rank_id = 0
            self.rank_size = 1

    def train_data(self, max_seq_len=512, batch_size=32, **kwargs):
        data_path = os.path.join(self.data_dir, 'daizhige.pkl')
        data = self.load_data(data_path, 'GW', max_seq_len)
        # data = ExampleSet(data)
        output_columns = ["input_ids", "input_mask", "token_type_id", "next_sentence_labels",
                                    "masked_lm_positions", "masked_lm_ids", "masked_lm_weights"]
        return create_dynamic_dataset(data, self.get_feature_fn(), 
                                      batch_size=batch_size,
                                      output_columns=output_columns, 
                                      repeat=self.args.num_train_epochs,
                                      num_workers=self.args.data_workers,
                                      num_shards=self.rank_size, 
                                      shard_id=self.rank_id)

    def eval_data(self, max_seq_len=512, batch_size=32, **kwargs):
        ...

    def get_metrics(self, **kwargs):
        """Calcuate metrics based on prediction results"""
        return OrderedDict(
            bert = BertMetric(self.args.eval_batch_size),
            )

    def get_eval_fn(self, **kwargs):
        data = kwargs.get('data')
        if data is None:
            data = self.eval_data(**kwargs)
        def run_eval(model, name, prefix):
            '''
            args: 
                model: Model instance
                name: evaluate name
                prefix: prefix of file
            return:
                float, main metric of this task, used to save best metric model
            '''
            res = model.eval(data, dataset_sink_mode=False)
            res = res['bert']
            main_metric = res[self.main_metric]
            if self.rank_id == 0:
                output=os.path.join(self.args.output_dir, 'submit-{}-{}.tsv'.format(name, prefix))
                metric_str = '\n'.join([f'{k}: {v:.4f}' for k, v in res.items()])
                metric_str = metric_str + '\n'
                logger.info("====================================")
                logger.info("evaluate result:\n")
                logger.info(metric_str)
                logger.info("====================================")

                with open(output, 'w', encoding='utf-8') as fs:
                    fs.write(f"metrics:\n{metric_str}\n")
            return main_metric
        return run_eval


    def get_feature_fn(self, max_seq_len=512, ext_params=None, rng=None, **kwargs):
        tokenizer = self.tokenizer
        mask_generator = NGramMaskGenerator(tokenizer)
        def example_to_feature(*example):
            '''
            sample: text, label
            return: ["input_ids", "input_mask", "token_type_id", "next_sentence_labels",
                    masked_lm_positions", "masked_lm_ids", "masked_lm_weights"]
            '''
            ......
            return tuple(features)

        return example_to_feature

    def load_data(self, path, type=None, max_seq_len=512):
        examples = []
        ......
        return ExampleSet(examples)


    def get_model(self):
        from MindsporeTrainer.modeling.models import BertPreTraining, DebertaPreTraining
        from MindsporeTrainer import build_transformer_model
        if self.args.fp16:
            compute_type = mstype.float16
        else:
            compute_type = mstype.float32
        model, config = build_transformer_model(self.model_config, 
                                                model=DebertaPreTraining, 
                                                compute_type=compute_type, 
                                                padding_idx=self.tokenizer._convert_token_to_id(self.tokenizer.pad_token))
        with open(os.path.join(self.args.output_dir, 'config.json'), 'w', encoding='utf-8') as f:
            f.write(config.to_json_string())
        copyfile(self.vocab_path, os.path.join(self.args.output_dir, 'vocab.txt'))
        return model
        # return partial_class

    def get_loss(self, *args, **kwargs):
        from MindsporeTrainer.modeling.loss import BertPretrainingLoss
        return BertPretrainingLoss(self.tokenizer.vocab_size)

    def get_eval_head(self, *args, **kwargs):
        from MindsporeTrainer.modeling.models import BertEvalHead
        return BertEvalHead(self.tokenizer.vocab_size)

    def get_opt_fn(self, *args, **kwargs):
        return None

2.写作启动

运行.py

import MindsporeTrainer as mst
mst.launch()

3. 运行任务

python run.py --task_name=RESNETTask --do_train --do_eval --data_dir=data --num_train_epochs=10 --learning_rate=1e-2 --train_batch_size=64 --save_eval_steps=1000 --output_dir=output

多卡训练

官方推荐编写bash脚本,利用mpi进行,这里采用了纯python的实现。

定义必须的环境变量

重击

export RANK_SIZE = 8
export RANK_TABLE_FILE = /path/hccl.json

vscode 调试环境

"env": {
    "RANK_SIZE": "8",
    "RANK_TABLE_FILE": "/path/hccl.json"
}

设置参数,开始训练

python run.py ...... --device_num=8 --device_id=0,1,2,3,4,5,6,7

创建模型

自定义模型

Mindspore相对和PyTorch,有其自身的特点,建模习惯也有不同
。中,构造函数返回都为元组,即使只有一个对象,返回也应该采用(obj,)的形式。

骨干

模型的主体部分定义。

预测头部

模型的附属部分,根据模型的用途,通常可以定义为损失层,或者在评估过程中定义的评估头,或者其他用途的头。

使用官方model_zoo中的模型

其代码结构如下

models
├── official                                    # 官方支持模型
│   └── XXX                                     # 模型名
│       ├── README.md                           # 模型说明文档
│       ├── requirements.txt                    # 依赖说明文件
│       ├── eval.py                             # 精度验证脚本
│       ├── export.py                           # 推理模型导出脚本
│       ├── scripts                             # 脚本文件
│       │   ├── run_distributed_train.sh        # 分布式训练脚本
│       │   ├── run_eval.sh                     # 验证脚本
│       │   └── run_standalone_train.sh         # 单机训练脚本
│       ├── src                                 # 模型定义源码目录
│       │   ├── XXXNet.py                       # 模型结构定义
│       │   ├── callback.py                     # 回调函数定义
│       │   ├── config.py                       # 模型配置参数文件
│       │   └── dataset.py                      # 数据集处理定义
│       ├── ascend_infer                        # (可选)用于在Ascend推理设备上进行离线推理的脚本
│       ├── third_party                         # (可选)第三方代码
│       │   └── XXXrepo                         # (可选)完整克隆自第三方仓库的代码
│       └── train.py                            # 训练脚本
├── research                                    # 非官方研究脚本
├── community                                   # 合作方脚本链接
└── utils                                       # 模型通用工具

例如需要的目录后,在 src/xxxmodel.py 中找到相应的定义,如果有定义好的目录主干和头部,可以 直接 引入
。任务
git clone https://gitee.com/mindspore/models
models/official/nlp/bert/src

    from bert.src.bert_for_pre_training import BertPreTraining, BertPretrainingLoss
    ......
    def get_model(self):
        from MindsporeTrainer import build_transformer_model
        if self.args.fp16:
            compute_type = mstype.float16
        else:
            compute_type = mstype.float32
        model, config = build_transformer_model(self.model_config, 
                                                model=BertPreTraining, 
                                                compute_type=compute_type, 
                                                padding_idx=self.tokenizer._convert_token_to_id(self.tokenizer.pad_token))
        with open(os.path.join(self.output_dir, 'config.json'), 'w', encoding='utf-8') as f:
            f.write(config.to_json_string())
        copyfile(self.vocab_path, os.path.join(self.output_dir, 'vocab.txt'))
        return model

    def get_loss(self, *args, **kwargs):
        return BertPretrainingLoss(self.tokenizer.vocab_size)

    def get_eval_head(self, *args, **kwargs):
        from MindsporeTrainer.modeling.models import BertEvalHead
        return BertEvalHead(self.tokenizer.vocab_size)

参数介绍

超训练参数基本通过运行参数来控制的,另外一些则可以在任务定义中控制。

常用参数

  • --accumulation_steps
    type=int
    default=1
    在权重更新前累积梯度N次,默认为1。

  • --allreduce_post_accumulation
    type=str
    default= truechoices
    =[true, false]
    是N步累加后还是每步后allreduce,默认为true。

  • --data_dir
    default=None
    type=str
    required=False
    输入数据目录。应包含任务的 .tsv 文件(或其他数据文件)。

  • --data_sink_steps
    type=int
    default=1
    每个 epoch 的 Sink 步数,默认为 1。

  • --do_train
    default=False
    action='store_true'
    是否运行训练。

  • --do_eval
    default=False
    action='store_true'
    是否在开发集上运行 eval。

  • --do_predict
    default=False
    action='store_true'
    是否在测试集上运行预测。

  • --debug
    default=False
    action='store_true'
    是否缓存熟二进制特征

  • --device_target
    type=str
    default='Ascend'choices
    =['Ascend 'GPU']
    将执行代码的设备。(默认:上升)

  • --distribute
    default=False
    action='store_true'
    运行分发,默认为 false。

  • --device_id
    type=str
    default=0
    设备id,默认为0。

  • --device_num
    type=int
    default=1
    使用设备编号,默认为 1。

  • --enable_data_sink
    default=False
    action='store_true'
    启用数据接收器,默认为 false。

  • --load_checkpoint_path
    type=str
    default=''
    加载检查点文件路径

  • --num_train_epochs
    default=1
    type=int
    要执行的训练 epoch 总数。

  • --output_dir
    default=None
    type=str
    required=True
    将写入模型检查点的输出目录。

  • --run_mode
    type=str
    default='GRAPH'
    选择=['GRAPH'PY']
    0: GRAPH_MODE, 1: PY_NATIVE_MODE

  • --save_eval_steps
    type=int
    default=1000
    保存检查点和评估步骤,默认为 1000。

  • --save_checkpoint_num
    type=int
    default=1
    保存检查点编号,默认为1。

  • --tag
    type=str
    default='final'
    当前预测/运行的标签名称。

  • --task_dir
    default=None
    type=str
    required=False
    加载自定义任务的目录。

  • --task_name
    default=None
    type=str
    action=LoadTaskAction
    required=True
    要训练的任务的名称。

训练参数

  • --data_workers
    type=int
    default=4
    加载数据的worker。
  • --enable_graph_kernel
    type=str
    default=auto
    选择=[auto, true, false]
    由图形内核加速,默认为自动。
  • --eval_batch_size
    default=32
    type=int
    eval 的总批大小。
  • --enable_global_norm
    type=bool
    default=False
    启用全局规范
  • --predict_batch_size
    default=32
    type=int
    预测的总批大小。
  • --report_interval
    default=1
    type=int
    状态报告的间隔步骤。
  • --save_graphs
    default=False
    action='store_true'
    是否保存图表
  • --seed
    type=int
    default=1234
    用于初始化的随机种子
  • --thor
    default=False
    action='store_true' 是否将模型转换为thor优化器
  • --train_batch_size
    default=64
    type=int
    训练的总批大小。
  • --train_steps
    type=int
    default=-1
    Training Steps,默认为-1,表示按照epoch number运行所有steps。

优化器参数

  • --fp16
    default=False
    type=boolean_string
    是否使用 16 位浮点精度而不是 32 位

  • --learning_rate
    default=5e-5
    type=float
    Adam 的初始学习率。

  • --loss_scale_value
    type=int
    default=1024
    初始损失比例值

  • --resume_opt_path
    type=str.lower
    default=''
    要恢复的优化器。

  • --scale_factor
    type=int
    default=4
    损失比例因子

  • --scale_window
    type=int
    default=1000
    损失窗口

  • --warmup
    default=0.1
    type=float
    执行线性学习率预热的训练比例。例如,0.1 = 10% 的训练。

任务中增加自定义参数

    @classmethod
    def add_arguments(cls, parser):
        """Add task specific arguments
            e.g. parser.add_argument('--data_dir', type=str, help='The path of data directory.')
        """
        parser.add_argument('--task_example_arg', type=str, default=None, help='An example task specific argument')

        return parser

任务

所有的任务都应该继承于MindsporeTrainer.apps.tasks.Task
为变压器定义的MindsporeTrainer.apps.tasks.TransformerTask也继承自任务。
任务类的定义为:

class Task():
    _meta={}

    def __init__(self, args, **kwargs):
        self.args = args
    
    def eval_data(self, **kwargs):
        """
        Get eval dataset object.
        """
        return None

    def train_data(self, **kwargs):
        """
        Get train dataset object.
        """
        return None

    def test_data(self, **kwargs):
        return None

    def get_labels(self):
        """Gets the list of labels for this data set."""
        return None

    def get_eval_fn(self, *args, **kwargs):
        """
        Get the evaluation function
        """
        return None

    def get_eval_head(self, *args, **kwargs):
        """
        Get the evaluate head, the head replace loss function head when evaluation process
        """
        return None

    def get_pred_fn(self, *args, **kwargs):
        """
        Get the predict function
        """
        return None

    def get_loss(self, *args, **kwargs):
        """
        Get the loss function
        """
        return None

    def get_opt_fn(self, *args, **kwargs):
        """
        Get a function wich return the opimizer
        """
        def get_optimizer(*args, **kwargs):
            pass
        return get_optimizer

    def get_metrics(self):
        """Calcuate metrics based on prediction results"""
        return None

    def get_predict_fn(self):
        """Calcuate metrics based on prediction results"""
        def predict_fn(logits, output_dir, name, prefix):
            pass
        return None

    def get_feature_fn(self, **kwargs):
        """
        get the featurize function
        """
        def _example_to_feature(**kwargs):
             return feature
        return _example_to_feature
    
    def get_model(self):
        """
        Get a model instance
        """
        raise NotImplementedError('method not implemented yet.')

    @classmethod
    def add_arguments(cls, parser):
        """Add task specific arguments
        """
        pass

API

心灵孢子训练师

MindsporeTrainer.launch()
启动器,可支持分布式启动
MindsporeTrainer.build_transformer_model(
                                        config_path=None,
                                        model='bert',
                                        application='encoder',
                                        **kwargs
                                        )
创建transformer模型  
args:   
    config_path model config 路径  
    model 为str的话,从预定义模型中获取模型类,为类名的话,直接进行实例化  
    application 用途,默认为'encoder',TODO:实现decoder等  
    其他参数

MindsporeTrainer.modeling

建模模块,并提供若干预定义的模型,目前包括BERT和DeBERTa。

MindsporeTrainer.modeling.models

提供若干预定义的模型,目前包括BERT和DeBERTa。

MindsporeTrainer.modeling.loss

提供预定义的loss

MindsporeTrainer.modeling.tokenizers

提供预定义的tokenizers,目前仅支持BertTokenizer

MindsporeTrainer.data

数据相关

MindsporeTrainer.data.ExampleInstance

样本实例

MindsporeTrainer.data.ExampleSet

样本集

MindsporeTrainer.data.dynamic_dataset

创建动态数据集

MindsporeTrainer.utils

各种实用组件

MindsporeTrainer.utils.metrics

提供多种自定义metric

MindsporeTrainer.utils.masker

用于生成MLM的mask

MindsporeTrainer.apps.tasks

任务相关

MindsporeTrainer.apps.tasks.Task

任务基类

MindsporeTrainer.apps.tasks.TransformerTask

Transformer任务类,继承自Task

MindsporeTrainer.optims

优化器、学习率调度等

楷模

作者实现的模型,实验实验中......

德贝尔塔

原文论文:DeBERTa: Decoding-enhanced BERT with Disentangled Attention
原文仓库:https://github.com/microsoft/DeBERTa
实现是DeBERTa v2,查看DeBERTa任务

作者

周波

DMAC Group@浙江大学人工智能研究所

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

MindsporeTrainer-0.1.5.tar.gz (143.1 kB 查看哈希)

已上传 source

内置分布

MindsporeTrainer-0.1.5-py3-none-any.whl (170.7 kB 查看哈希

已上传 py3