Demo¶
以下是基于 Monolith API 开发的一个用户侧自定义精排模型代码示例
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from absl import app, flags, logging
from typing import Dict
import tensorflow as tf
from monolith.native_training.data import PBDataset, PbType
from monolith.native_training.data import filter_by_fids
from monolith.native_training.data import parse_examples, parse_example_batch
from monolith.native_training.entry import (ZerosInitializer,
RandomUniformInitializer,
AdagradOptimizer, FtrlOptimizer,
Fp16Compressor, Fp32Compressor)
from monolith.native_training.estimator import (EstimatorSpec, Estimator,
RunConfig)
import monolith.native_training.layers as layers
from monolith.native_training.native_model import (MonolithModel,
get_sigmoid_loss_and_pred)
FLAGS = flags.FLAGS
FEATURE_NAMES = ['f_user_id', 'f_doc_id']
deep = {
"initializer":
RandomUniformInitializer(-0.015625, 0.015625),
"optimizer":
AdagradOptimizer(learning_rate=0.01, initial_accumulator_value=1.0),
"compressor":
Fp16Compressor()
}
wide = {
"initializer":
ZerosInitializer(),
"optimizer":
FtrlOptimizer(learning_rate=0.01,
initial_accumulator_value=0,
beta=1.0,
l1_regularization=1.0,
l2_regularization=1.0),
"compressor":
Fp32Compressor()
}
class MyModel(MonolithModel):
def __init__(self, params=None):
super(MyModel, self).__init__(params)
# data pipline(自定义模型 input_fn 中使用)
self.batch_size = 256
self.shuffle_size = 100
# training(自定义模型 model_fn 中使用)
self.default_occurrence_threshold = 2
self.sample_bias = True
# training(Monolith 框架内部生效)
self.set_train_clip_norm(1000.0)
self.set_train_dense_weight_decay(0.0001)
def input_fn(self, mode) -> 'DatasetV2':
def parser(tensor):
dense_features = ['__meta__actions']
dense_feature_shapes = [1]
dense_feature_types = [tf.int64]
features = parse_examples(tensor,
sparse_features=FEATURE_NAMES,
dense_features=dense_features,
dense_feature_shapes=dense_feature_shapes,
dense_feature_types=dense_feature_types,
extra_features=['sample_rate'],
extra_feature_shapes=[1])
features['label'] = None
if mode != tf.estimator.ModeKeys.PREDICT:
actions = tf.reshape(features['__meta__actions'], shape=(-1,))
features['label'] = tf.where(tf.math.equal(actions, 2),
tf.ones_like(actions, dtype=tf.float32),
tf.zeros_like(actions, dtype=tf.float32))
features['sample_rate'] = tf.reshape(features['sample_rate'],
shape=(-1,))
return features
def filter_fn(variant):
return filter_by_fids(variant,
has_actions=[1, 2],
filter_fids=[359045553113367103])
dataset = PBDataset(self.file_name, output_pb_type=PbType.PACKED_EXAMPLE)
dataset = dataset.filter(filter_fn)
dataset = dataset.shuffle(self.shuffle_size).batch(self.batch_size,
drop_remainder=True)
dataset = dataset.map(parser)
return dataset
def model_fn(self, features: Dict[str, tf.Tensor],
mode: tf.estimator.ModeKeys):
for name in FEATURE_NAMES:
self.create_embedding_feature_column(
name, occurrence_threshold=self.default_occurrence_threshold)
# Tensor(batch_size, 2)
bias = self.lookup_embedding_slice(features=FEATURE_NAMES,
slice_name='bias',
slice_dim=1,
out_type='concat',
**wide)
# [Tensor(batch_size, 16), Tensor(batch_size, 16)]
embedding = self.lookup_embedding_slice(features=FEATURE_NAMES,
slice_name='vec',
slice_dim=16,
**deep)
# Tensor(batch_size, 1)
bias_sum = tf.reduce_sum(bias, axis=1, keepdims=True)
# Tensor(batch_size, 34)
input = tf.concat(embedding + [bias], axis=1)
# Tensor(batch_size, 1)
output = layers.MLP(name='tower',
output_dims=[1024, 512, 256, 1],
initializers=tf.keras.initializers.HeNormal())(input)
# Tensor(batch_size, 1)
logits = tf.add_n([bias_sum, output])
# Tensor(batch_size,)
label = features.get('label', None)
# Tensor(batch_size,)
sample_rate = features.get('sample_rate', None)
# loss: Tensor(), pred: Tensor(batch_size,)
loss, pred = get_sigmoid_loss_and_pred(name='loss_and_pred',
logits=logits,
label=label,
batch_size=self.batch_size,
sample_rate=sample_rate,
sample_bias=self.sample_bias,
mode=mode)
optimizer = tf.compat.v1.train.AdagradOptimizer(
learning_rate=0.01, initial_accumulator_value=1.0)
return EstimatorSpec(label=label,
pred=pred,
head_name='ctr',
loss=loss,
optimizer=optimizer,
classification=True)
def serving_input_receiver_fn(self):
receiver_tensors = {}
examples_placeholder = tf.compat.v1.placeholder(dtype=tf.string,
shape=(None,))
receiver_tensors["example_batch"] = examples_placeholder
parsed_results = parse_example_batch(examples_placeholder,
sparse_features=FEATURE_NAMES)
return tf.estimator.export.ServingInputReceiver(parsed_results,
receiver_tensors)
def main(_):
config = RunConfig(dense_only_save_checkpoints_secs=600,
enable_fused_layout=True)
model = MyModel()
estimator = Estimator(model, config)
estimator.train()
if __name__ == "__main__":
tf.compat.v1.disable_eager_execution()
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)
app.run(main)
所有的自定义模型都要从
MonolithModel
继承__init__
里设定一些常用参数input_fn
定义训练阶段的数据处理逻辑model_fn
定义模型结构serving_input_receiver_fn
定义推理阶段数据处理逻辑main
定义整个训练的入口代码,后面会慢慢固化不再向用户开放编辑