Tensorflow Post Training Quantization
Table of Contents
1. Tensorflow Post Training Quantization
1.1. 测试模型
import tensorflow as tf import numpy as np from tensorflow.keras import layers, losses, metrics, optimizers, models X = tf.random.uniform([100, 1], minval=1, maxval=10.0) Y = tf.pow(X, 2) tf.keras.backend.clear_session() model = models.Sequential() model.add(layers.Dense(100, input_shape=(1,), activation="relu")) model.add(layers.Dense(100, activation="relu")) model.add(layers.Dense(100, activation="relu")) model.add(layers.Dense(100, activation="relu")) model.add(layers.Dense(100, activation="relu")) model.add(layers.Dense(1)) model.summary() model.compile(optimizer="adam", loss="mse") history = model.fit(X, Y, batch_size=20, epochs=2000, verbose=0) print(model.predict([2.0, 10.0, 20])) model.save("/tmp/pow")
Model: "sequential"
_
Layer (type) Output Shape Param #
===============================================================
dense (Dense) (None, 100) 200
_
dense_1 (Dense) (None, 100) 10100
_
dense_2 (Dense) (None, 100) 10100
_
dense_3 (Dense) (None, 100) 10100
_
dense_4 (Dense) (None, 100) 10100
_
dense_5 (Dense) (None, 1) 101
===============================================================
Total params: 40,701
Trainable params: 40,701
Non-trainable params: 0
_
[[ 3.9891944]
[ 98.814255 ]
[251.58577 ]]
1.2. Dynamic Range Quantization
import tensorflow as tf converter = tf.lite.TFLiteConverter.from_saved_model("/tmp/pow") converter.optimizations = [tf.lite.Optimize.DEFAULT] # converter.optimizations = [tf.lite.Optimize.OPTIMIZE_FOR_SIZE] tflite = converter.convert() with open ("/tmp/pow-q.tflite","wb") as f: f.write(tflite) print("size of tflite-q:", len(tflite))
size of tflite-q: 46496
- node 的权重会进行量化, 但 node 的输入输出还是 float32
- 最初 weight 会在运行时被还原成 float32, 以便收集校准数据
- 收集到足够数据后 activation 会在运行时被量化成 int8, 再用整型来计算, 而不是把 weight 还原成float32 后用 float32 来计算, 但输出还是 float32
- bias 不会进行量化 (可能因为它的范围过大)
- 不支持量化的 node 会跳过, 继续用 float32
TfLiteStatus Eval(TfLiteContext* context, TfLiteNode* node) const TfLiteTensor* filter = GetInput(context, node, kWeightsTensor); switch (filter->type) case kTfLiteFloat32: // 正常的全 float32 的运算 case kTfLiteInt8: // wegiht 为 int8 EvalQuantized<kernel_type>(context, node, params, data, input, filter, bias, output); TfLiteStatus EvalQuantized(TfLiteContext* context, TfLiteNode* node, TfLiteFullyConnectedParams* params, OpData* data, const TfLiteTensor* input, const TfLiteTensor* filter, const TfLiteTensor* bias, TfLiteTensor* output) // 动态范围量化 if (input->type == kTfLiteFloat32) return EvalHybrid(context, node, params, data, input, filter, bias, input_quantized, scaling_factors, accum_scratch, row_sums, input_offsets, output); else // 全整数 reference_ops::FullyConnected( op_params, GetTensorShape(input), GetTensorData<uint8_t>(input), GetTensorShape(filter), GetTensorData<uint8_t>(filter), GetTensorShape(bias), GetTensorData<int32_t>(bias), GetTensorShape(output), GetTensorData<uint8_t>(output)); TfLiteStatus EvalHybrid(TfLiteContext* context, TfLiteNode* node, TfLiteFullyConnectedParams* params, OpData* data, const TfLiteTensor* input, const TfLiteTensor* filter, const TfLiteTensor* bias, TfLiteTensor* input_quantized, TfLiteTensor* scaling_factors, TfLiteTensor* accum_scratch, TfLiteTensor* row_sums, TfLiteTensor* input_offsets, TfLiteTensor* output) // Quantize input from float to uint8 + quantization params (scaling factor). float* scaling_factors_ptr = GetTensorData<float>(scaling_factors); int8_t* quant_data = GetTensorData<int8_t>(input_quantized); const int8_t* filter_data = GetTensorData<int8_t>(filter); const float* input_ptr = GetTensorData<float>(input); tensor_utils::BatchQuantizeFloats( input_ptr, batch_size, input_size, quant_data, scaling_factors_ptr, input_offset_ptr, params->asymmetric_quantize_inputs); for (int b = 0; b < batch_size; ++b) { // Incorporate scaling of the filter. // !!! 最终使用的 scale 是 input_scale * filter_scale scaling_factors_ptr[b] *= filter->params.scale; } // Compute output += weight * quantized_input int32_t* scratch = GetTensorData<int32_t>(accum_scratch); tensor_utils::MatrixBatchVectorMultiplyAccumulate( filter_data, num_units, input_size, quant_data, scaling_factors_ptr, batch_size, GetTensorData<float>(output), /*per_channel_scale=*/nullptr, input_offset_ptr, scratch, row_sums_ptr, &data->compute_row_sums, CpuBackendContext::GetFromContext(context)); // 最终的 output 是 float tensor_utils::ApplyActivationToVector( GetTensorData<float>(output), batch_size * num_units, params->activation, GetTensorData<float>(output)); // 矩阵运算时输入是两个 int8 的矩阵, 但输出的结果是 float void PortableMatrixBatchVectorMultiplyAccumulate( const int8_t* __restrict__ matrix, const int m_rows, const int m_cols, const int8_t* __restrict__ vectors, const float* scaling_factors, int n_batch, float* __restrict__ result) { for (int batch = 0; batch < n_batch; ++batch, vectors += m_cols) { const float batch_scaling_factor = scaling_factors[batch]; // Get the address of the first row. const int8_t* row_ptr = matrix; for (int row = 0; row < m_rows; ++row) { // Initialize the dot product sum for the row to 0. int32_t dotprod = 0; for (int col = 0; col < m_cols; ++col, ++row_ptr) { dotprod += (*row_ptr) * (vectors[col]); } // for col *result += dotprod * batch_scaling_factor; ++result; } // for row } // for batch }
1.3. Full Integer Quantization
- weight 的对称量化, 即 zero_point 为 0
- activation 是非对称量化
- zero_point 为整数
- tflite 的量化规范来自 Gemmlowp Quantization
import tensorflow as tf converter = tf.lite.TFLiteConverter.from_saved_model("/tmp/pow") X = tf.random.uniform([100, 1], minval=1, maxval=10.0) def representative_dataset_gen(): for i in range(10): yield [[X[i]]] converter.representative_dataset = representative_dataset_gen converter.optimizations = [tf.lite.Optimize.DEFAULT] converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8] tflite = converter.convert() with open("/tmp/pow-q.tflite", "wb") as f: f.write(tflite) print("size of tflite-q:", len(tflite))
size of tflite-q: 46944
- 所有数据, 包含模型的输入, 都会量化, 且所有 node 的输入输出都是 int8. 由于量化需要依据 (min, max), 针对模型输入, 需要提供一个 representative_dataset_gen, 用来提供这一数据, 才能完成对 input 的量化
全整数的量化需要 node 本身支持全整数运算, 否则量化过程会失败, 例如 floor 是无法量化的 op, 所以下面的模型使用全整数量化时会失败
RuntimeError: Quantization not yet supported for op: FLOOR
import tensorflow as tf import numpy as np from tensorflow import keras from tensorflow.keras import layers n = 100 X = tf.random.uniform([n, 1], minval = 1, maxval = 10.) Y = tf.pow(X,2) tf.keras.backend.clear_session() inputs = keras.Input(shape = (1, )) dense = layers.Dense(50, activation = "relu")(inputs) floor = tf.floor(dense) outputs = layers.Dense(1)(floor) model = keras.Model(inputs, outputs) model.compile(optimizer="adam",loss="mse") history = model.fit(X,Y,batch_size = 20,epochs = 50, verbose = 0) converter = tf.lite.TFLiteConverter.from_keras_model(model) X = tf.random.uniform([100, 1], minval=1, maxval=10.0) def representative_dataset_gen(): for i in range(10): yield [[X[i]]] converter.representative_dataset = representative_dataset_gen converter.optimizations = [tf.lite.Optimize.DEFAULT] converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8] tflite = converter.convert()
tflite 目前不支持 floor 的量化, 但也许通过一个查找表可以实现.
Backlinks
Quantization (Quantization > Tensorflow Post Training Quantization): Tensorflow Post Training Quantization