模型转换[yolov3模型在keras与darknet之间转换]



首先借助qqwweee/keras-yolo3中的convert.py和,并重新编写了代码,实现将darknet格式的yolov3的yolov3.cfg和yolov3.weights转换成keras(tensorflow)的h5格式

1 将darknet格式的yolov3.cfg和yolov3.weights转换成kears(tf)的h5格式

# -*- coding: utf-8 -*-

import os
import io
import argparse
import configparser


import numpy as np
from keras import backend as K
from keras.layers import (Conv2D, Input, ZeroPadding2D, Add,
                          UpSampling2D, MaxPooling2D, Concatenate)
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.normalization import BatchNormalization
from keras.models import Model
from keras.regularizers import l2
from keras.utils.vis_utils import plot_model as plot



def parser():
    parser = argparse.ArgumentParser(description="Darknet\'s yolov3.cfg and yolov3.weights \
                                      converted into Keras\'s yolov3.h5!")
    parser.add_argument('-cfg_path', help='yolov3.cfg')
    parser.add_argument('-weights_path', help='yolov3.weights')
    parser.add_argument('-output_path', help='yolov3.h5')
    parser.add_argument('-weights_only', action='store_true',help='only save weights in yolov3.h5')
    return parser.parse_args()


class WeightLoader(object):

    def __init__(self,weight_path):
        self.fhandle = open(weight_path,'rb')
        self.read_bytes = 0

    def parser_buffer(self,shape,dtype='int32',buffer_size=None):
        self.read_bytes += buffer_size
        return np.ndarray(shape=shape,
                          dtype=dtype,
                          buffer=self.fhandle.read(buffer_size) )

    def head(self):

        major, minor, revision = self.parser_buffer(
                                   shape=(3,),
                                   dtype='int32',
                                   buffer_size=12)

        if major*10+minor >= 2 and major < 1000 and minor < 1000:
            seen = self.parser_buffer(
                             shape=(1,),
                             dtype='int64',
                             buffer_size=8)
        else:
            seen = self.parser_buffer(
                             shape=(1,),
                             dtype='int32',
                             buffer_size=4)

        return major, minor, revision, seen

    def close(self):
        self.fhandle.close()

class DarkNetParser(object):
    def __init__(self, cfg_path, weights_path):

        self.block_gen = self._get_block(cfg_path)
        self.weight_loader = WeightLoader(weights_path)
        
        major, minor, revision, seen = self.weight_loader.head()
        print('weights header: ',major, minor, revision, seen)

        self.input_layer = Input(shape=(None, None, 3))
        self.out_index = []
        self.prev_layer = self.input_layer
        self.all_layers = []
        self.count = [0,0]

    
    def _get_block(self,cfg_path):

        block = {}
        with open(cfg_path,'r', encoding='utf-8') as fr:
            for line in fr:
                line = line.strip()
                if '[' in line and ']' in line:        
                    if block:
                        yield block
                    block = {}
                    block['type'] = line.strip(' []')
                elif not line or '#' in line:
                    continue
                else:
                    key,val = line.strip().replace(' ','').split('=')
                    key,val = key.strip(), val.strip()
                    block[key] = val

            yield block


    def conv(self, block):
        '''在读取darknet的yolov3.weights文件时,顺序是
          1 - bias;
          2 - 如果有bn,则接着读取三个scale,mean,var
          3 - 读取权重
        '''
        # Darknet serializes convolutional weights as:
        # [bias/beta, [gamma, mean, variance], conv_weights]
        self.count[0] += 1
        # read conv block
        filters = int(block['filters'])
        size = int(block['size'])
        stride = int(block['stride'])
        pad = int(block['pad'])
        activation = block['activation']
        
        padding = 'same' if pad == 1 and stride == 1 else 'valid'
        batch_normalize = 'batch_normalize' in block
        
        prev_layer_shape = K.int_shape(self.prev_layer)
        weights_shape = (size, size, prev_layer_shape[-1], filters)
        darknet_w_shape = (filters, weights_shape[2], size, size)
        weights_size = np.product(weights_shape)

        print('+',self.count[0],'conv2d', 
              'bn' if batch_normalize else ' ',
              activation,
              weights_shape)

        # 读取滤波器个偏置
        conv_bias = self.weight_loader.parser_buffer(
                                 shape=(filters,),
                                 dtype='float32',
                                 buffer_size=filters*4)
 
        # 如果有bn,则接着读取滤波器个scale,mean,var
        if batch_normalize:
            bn_weight_list = self.bn(filters, conv_bias)

        # 读取权重
        conv_weights = self.weight_loader.parser_buffer(
                              shape=darknet_w_shape,
                              dtype='float32',
                              buffer_size=weights_size*4)
        # DarkNet conv_weights are serialized Caffe-style:
        # (out_dim, in_dim, height, width)
        # We would like to set these to Tensorflow order:
        # (height, width, in_dim, out_dim)

        conv_weights = np.transpose(conv_weights, [2, 3, 1, 0])
        conv_weights = [conv_weights] if batch_normalize else \
                              [conv_weights, conv_bias]

        act_fn = None
        if activation == 'leaky':
            pass
        elif activation != 'linear':
            raise

        if stride > 1:
            self.prev_layer = ZeroPadding2D(((1,0),(1,0)))(self.prev_layer)

        conv_layer = (Conv2D(
                filters, (size, size),
                strides=(stride, stride),
                kernel_regularizer=l2(self.weight_decay),
                use_bias=not batch_normalize,
                weights=conv_weights,
                activation=act_fn,
                padding=padding))(self.prev_layer)

        if batch_normalize:
             conv_layer = BatchNormalization(weights=bn_weight_list)(conv_layer)
        self.prev_layer = conv_layer

        if activation == 'linear':
            self.all_layers.append(self.prev_layer)
        elif activation == 'leaky':
            act_layer = LeakyReLU(alpha=0.1)(self.prev_layer)
            self.prev_layer = act_layer
            self.all_layers.append(act_layer)


    def bn(self,filters,conv_bias):
        '''bn有4个参数,分别是bias,scale,mean,var,
          其中bias已经读取完毕,这里读取剩下三个,scale,mean,var '''
        bn_weights = self.weight_loader.parser_buffer(
                              shape=(3,filters),
                              dtype='float32',
                              buffer_size=(filters*3)*4)
        # scale, bias, mean,var
        bn_weight_list = [bn_weights[0],
                          conv_bias,
                          bn_weights[1],
                          bn_weights[2] ]
        return bn_weight_list
       
    def maxpool(self,block):
        size = int(block['size'])
        stride = int(block['stride'])
        maxpool_layer = MaxPooling2D(pool_size=(size,size),
                        strides=(stride,stride),
                        padding='same')(self.prev_layer)
        self.all_layers.append(maxpool_layer)
        self.prev_layer = maxpool_layer

    def shortcut(self,block):
        index = int(block['from'])
        activation = block['activation']
        assert activation == 'linear', 'Only linear activation supported.'
        shortcut_layer = Add()([self.all_layers[index],self.prev_layer])
        self.all_layers.append(shortcut_layer)
        self.prev_layer = shortcut_layer

    def route(self,block):
        layers_ids = block['layers']
        ids = [int(i) for i in layers_ids.split(',')]
        layers = [self.all_layers[i] for i in ids]
        if len(layers) > 1:
            print('Concatenating route layers:', layers)
            concatenate_layer = Concatenate()(layers)
            self.all_layers.append(concatenate_layer)
            self.prev_layer = concatenate_layer
        else:
            skip_layer = layers[0]
            self.all_layers.append(skip_layer)
            self.prev_layer = skip_layer

    def upsample(self,block):
        stride = int(block['stride'])
        assert stride == 2, 'Only stride=2 supported.'
        upsample_layer = UpSampling2D(stride)(self.prev_layer)
        self.all_layers.append(upsample_layer)
        self.prev_layer = self.all_layers[-1]
    
    def yolo(self,block):
        self.out_index.append(len(self.all_layers)-1)
        self.all_layers.append(None)
        self.prev_layer = self.all_layers[-1]

    def net(self, block):
        self.weight_decay = block['decay']


    def create_and_save(self,weights_only,output_path):
        if len(self.out_index) == 0:
            self.out_index.append( len(self.all_layers)-1 )

        output_layers = [self.all_layers[i] for i in self.out_index]
        model = Model(inputs=self.input_layer,
                      outputs=output_layers)
        print(model.summary())

        if weights_only:
            model.save_weights(output_path)
            print('Saved Keras weights to {}'.format(output_path))
        else:
            model.save(output_path)
            print('Saved Keras model to {}'.format(output_path))

    def close(self):
        self.weight_loader.close()
        

def main():

    args = parser()
    print('loading weights...')

    cfg_parser = DarkNetParser(args.cfg_path,args.weights_path)

    print('creating keras model...')

    layers_fun = {'convolutional':cfg_parser.conv,
                 'net':cfg_parser.net,
                 'yolo':cfg_parser.yolo,
                 'route':cfg_parser.route,
                 'upsample':cfg_parser.upsample,
                 'maxpool':cfg_parser.maxpool,
                 'shortcut':cfg_parser.shortcut
                 }

    print('Parsing Darknet config.')
    for ind,block in enumerate(cfg_parser.block_gen):
        type = block['type']
        layers_fun[type](block)

    cfg_parser.create_and_save(args.weights_only, args.output_path)
    cfg_parser.close()
        

if __name__ == '__main__':
    main()

运行结果

python yolov3_darknet_to_keras.py -cfg_path text.cfg -weights_path yolov3.weights -output_path yolov3c_d2k.h5


2 将kears(tf)的h5格式转换成darknet格式的yolov3.weights

其中上面的与下面的名称转换

bias -> beta
scale -> gamma
mean -> moving_mean
var -> moving_variance

基于此写的脚本为:

# -*- coding: utf-8 -*-
''' yolov3_keras_to_darknet.py'''
import argparse
import numpy
import numpy as np
import keras
from keras.models import load_model
from keras import backend as K

def parser():
    parser = argparse.ArgumentParser(description="Darknet\'s yolov3.cfg and yolov3.weights \
                                      converted into Keras\'s yolov3.h5!")
    parser.add_argument('-cfg_path', help='yolov3.cfg')
    parser.add_argument('-h5_path', help='yolov3.h5')
    parser.add_argument('-output_path', help='yolov3.weights')
    return parser.parse_args()



class WeightSaver(object):

    def __init__(self,h5_path,output_path):
        self.model = load_model(h5_path)
       # 如果要读取keras调用save_weights的h5文件,可以先读取一次save的h5,
       # 然后取消下面的注释,读取save_weights的h5
#        self.model.load_weights('text.h5') 
        self.layers = {weight.name:weight for weight in self.model.weights}
        self.sess = K.get_session()
        self.fhandle = open(output_path,'wb')
        self._write_head()

    def _write_head(self):
        numpy_data = numpy.ndarray(shape=(3,),
                          dtype='int32',
                          buffer=np.array([0,2,0],dtype='int32') )
        self.save(numpy_data)
        numpy_data = numpy.ndarray(shape=(1,),
                          dtype='int64',
                          buffer=np.array([320000],dtype='int64'))
        self.save(numpy_data)
 
    def get_bn_layername(self,num):
        layer_name = 'batch_normalization_{num}'.format(num=num)
        bias = self.layers['{0}/beta:0'.format(layer_name)]
        scale = self.layers['{0}/gamma:0'.format(layer_name)]
        mean = self.layers['{0}/moving_mean:0'.format(layer_name)]
        var = self.layers['{0}/moving_variance:0'.format(layer_name)]
       
        bias_np = self.get_numpy(bias)
        scale_np = self.get_numpy(scale)
        mean_np = self.get_numpy(mean)
        var_np = self.get_numpy(var)
        return bias_np,scale_np,mean_np,var_np

    def get_convbias_layername(self,num):
        layer_name = 'conv2d_{num}'.format(num=num)
        bias = self.layers['{0}/bias:0'.format(layer_name)]
      
        bias_np = self.get_numpy(bias)
        return bias_np
 
    def get_conv_layername(self,num):
        layer_name = 'conv2d_{num}'.format(num=num)
        conv = self.layers['{0}/kernel:0'.format(layer_name)]
       
        conv_np = self.get_numpy(conv)
        return conv_np

  
    def get_numpy(self,layer_name):
        numpy_data = self.sess.run(layer_name)
        return numpy_data

    def save(self,numpy_data):
        bytes_data = numpy_data.tobytes()
        self.fhandle.write(bytes_data)
        self.fhandle.flush()

    def close(self):
        self.fhandle.close()

class KerasParser(object):

    def __init__(self, cfg_path, h5_path, output_path):
        self.block_gen = self._get_block(cfg_path)
        self.weights_saver = WeightSaver(h5_path, output_path)
        self.count_conv = 0
        self.count_bn = 0

    def _get_block(self,cfg_path):

        block = {}
        with open(cfg_path,'r', encoding='utf-8') as fr:
            for line in fr:
                line = line.strip()
                if '[' in line and ']' in line:
                    if block:
                        yield block
                    block = {}
                    block['type'] = line.strip(' []')
                elif not line or '#' in line:
                    continue
                else:
                    key,val = line.strip().replace(' ','').split('=')
                    key,val = key.strip(), val.strip()
                    block[key] = val

            yield block

    def close(self):
        self.weights_saver.close()

    def conv(self, block):
        self.count_conv += 1
        batch_normalize = 'batch_normalize' in block
        print('handing.. ',self.count_conv)

        # 如果bn存在,则先处理bn,顺序为bias,scale,mean,var
        if batch_normalize:
            bias,scale,mean,var = self.bn()
            self.weights_saver.save(bias)
            
            scale = scale.reshape(1,-1)
            mean = mean.reshape(1,-1)
            var = var.reshape(1,-1)
            remain = np.concatenate([scale,mean,var],axis=0)
            self.weights_saver.save(remain)

        # 否则,先处理biase
        else:
            conv_bias = self.weights_saver.get_convbias_layername(self.count_conv)
            self.weights_saver.save(conv_bias)

        # 接着处理weights
        conv_weights = self.weights_saver.get_conv_layername(self.count_conv)
        # 需要将(height, width, in_dim, out_dim)转换成(out_dim, in_dim, height, width)
        conv_weights = np.transpose(conv_weights,[3,2,0,1])
        self.weights_saver.save(conv_weights)

    def bn(self):
        self.count_bn += 1
        bias,scale,mean,var = self.weights_saver.get_bn_layername(self.count_bn) 
        return bias,scale,mean,var

        

def main():
    args = parser()
    keras_loader = KerasParser(args.cfg_path, args.h5_path, args.output_path)

    for block in keras_loader.block_gen:
        if 'convolutional' in block['type']:
            keras_loader.conv(block)
    keras_loader.close()


if __name__ == "__main__":
    main()

通过读取keras保存的h5文件,并读取其权重,其如下所示,

[,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
,
,

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,

......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
......

,
,
,
,
,

,
,
,
......

,
,

运行结果

python yolov3_keras_to_darknet.py -cfg_path text.cfg -h5_path yolov3c_d2k.h5 -output_path yolov3c_d2k_k2d.weights


可以看出原始文件yolov3.weights转成yolov3c_d2k.h5,然后再转回来yolov3c_d2k_k2d.weights,而md5值未变,说明逆向转换成功。

3 实际案例

这里我们给出完整的操作过程:
首先,机器环境(个人觉得这里应该无所谓):
1 - python3.5.6;
2 - keras 2.2.4;
3 - tensorflow-gpu 1.12.0.

然后,去github的chineseocr给出的百度网盘下下载:

  • text.h5: 通过keras的save_weights方式保存的
  • text.weights:darknet生成的文件
  • text.cfg: darknet中yolov3的网络结构

如果直接执行

python yolov3_keras_to_darknet.py -cfg_path text.cfg -h5_path text.h5 -output_path test.weights

会报:

是因为当前h5是通过save_weights方式保存的,而非save方式。

所以我们先执行

python yolov3_darknet_to_keras.py -cfg_path text.cfg -weights_path text.weights -output_path test.h5

此时是将darknet的结构通过keras的save方式转换成h5.然后此时执行,就没问题了:

python h5_to_weights.py -cfg_path text.cfg -h5_path test.h5 -output_path test.weights