Demo

以下是基于 Monolith API 开发的一个用户侧自定义精排模型代码示例

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

from absl import app, flags, logging
from typing import Dict
import tensorflow as tf
from monolith.native_training.data import PBDataset, PbType
from monolith.native_training.data import filter_by_fids
from monolith.native_training.data import parse_examples, parse_example_batch
from monolith.native_training.entry import (ZerosInitializer,
                                            RandomUniformInitializer,
                                            AdagradOptimizer, FtrlOptimizer,
                                            Fp16Compressor, Fp32Compressor)
from monolith.native_training.estimator import (EstimatorSpec, Estimator,
                                                RunConfig)
import monolith.native_training.layers as layers
from monolith.native_training.native_model import (MonolithModel,
                                                   get_sigmoid_loss_and_pred)

FLAGS = flags.FLAGS

FEATURE_NAMES = ['f_user_id', 'f_doc_id']

deep = {
    "initializer":
        RandomUniformInitializer(-0.015625, 0.015625),
    "optimizer":
        AdagradOptimizer(learning_rate=0.01, initial_accumulator_value=1.0),
    "compressor":
        Fp16Compressor()
}

wide = {
    "initializer":
        ZerosInitializer(),
    "optimizer":
        FtrlOptimizer(learning_rate=0.01,
                      initial_accumulator_value=0,
                      beta=1.0,
                      l1_regularization=1.0,
                      l2_regularization=1.0),
    "compressor":
        Fp32Compressor()
}


class MyModel(MonolithModel):

  def __init__(self, params=None):
    super(MyModel, self).__init__(params)
    # data pipline(自定义模型 input_fn 中使用)
    self.batch_size = 256
    self.shuffle_size = 100

    # training(自定义模型 model_fn 中使用)
    self.default_occurrence_threshold = 2
    self.sample_bias = True

    # training(Monolith 框架内部生效)
    self.set_train_clip_norm(1000.0)
    self.set_train_dense_weight_decay(0.0001)

  def input_fn(self, mode) -> 'DatasetV2':

    def parser(tensor):
      dense_features = ['__meta__actions']
      dense_feature_shapes = [1]
      dense_feature_types = [tf.int64]
      features = parse_examples(tensor,
                                sparse_features=FEATURE_NAMES,
                                dense_features=dense_features,
                                dense_feature_shapes=dense_feature_shapes,
                                dense_feature_types=dense_feature_types,
                                extra_features=['sample_rate'],
                                extra_feature_shapes=[1])
      features['label'] = None
      if mode != tf.estimator.ModeKeys.PREDICT:
        actions = tf.reshape(features['__meta__actions'], shape=(-1,))
        features['label'] = tf.where(tf.math.equal(actions, 2),
                                     tf.ones_like(actions, dtype=tf.float32),
                                     tf.zeros_like(actions, dtype=tf.float32))
        features['sample_rate'] = tf.reshape(features['sample_rate'],
                                             shape=(-1,))
      return features

    def filter_fn(variant):
      return filter_by_fids(variant,
                            has_actions=[1, 2],
                            filter_fids=[359045553113367103])

    dataset = PBDataset(self.file_name, output_pb_type=PbType.PACKED_EXAMPLE)
    dataset = dataset.filter(filter_fn)
    dataset = dataset.shuffle(self.shuffle_size).batch(self.batch_size,
                                                       drop_remainder=True)
    dataset = dataset.map(parser)
    return dataset

  def model_fn(self, features: Dict[str, tf.Tensor],
               mode: tf.estimator.ModeKeys):
    for name in FEATURE_NAMES:
      self.create_embedding_feature_column(
          name, occurrence_threshold=self.default_occurrence_threshold)

    # Tensor(batch_size, 2)
    bias = self.lookup_embedding_slice(features=FEATURE_NAMES,
                                       slice_name='bias',
                                       slice_dim=1,
                                       out_type='concat',
                                       **wide)
    # [Tensor(batch_size, 16), Tensor(batch_size, 16)]
    embedding = self.lookup_embedding_slice(features=FEATURE_NAMES,
                                            slice_name='vec',
                                            slice_dim=16,
                                            **deep)
    # Tensor(batch_size, 1)
    bias_sum = tf.reduce_sum(bias, axis=1, keepdims=True)
    # Tensor(batch_size, 34)
    input = tf.concat(embedding + [bias], axis=1)
    # Tensor(batch_size, 1)
    output = layers.MLP(name='tower',
                        output_dims=[1024, 512, 256, 1],
                        initializers=tf.keras.initializers.HeNormal())(input)
    # Tensor(batch_size, 1)
    logits = tf.add_n([bias_sum, output])

    # Tensor(batch_size,)
    label = features.get('label', None)
    # Tensor(batch_size,)
    sample_rate = features.get('sample_rate', None)
    # loss: Tensor(), pred: Tensor(batch_size,)
    loss, pred = get_sigmoid_loss_and_pred(name='loss_and_pred',
                                           logits=logits,
                                           label=label,
                                           batch_size=self.batch_size,
                                           sample_rate=sample_rate,
                                           sample_bias=self.sample_bias,
                                           mode=mode)

    optimizer = tf.compat.v1.train.AdagradOptimizer(
        learning_rate=0.01, initial_accumulator_value=1.0)
    return EstimatorSpec(label=label,
                         pred=pred,
                         head_name='ctr',
                         loss=loss,
                         optimizer=optimizer,
                         classification=True)

  def serving_input_receiver_fn(self):

    receiver_tensors = {}
    examples_placeholder = tf.compat.v1.placeholder(dtype=tf.string,
                                                    shape=(None,))
    receiver_tensors["example_batch"] = examples_placeholder
    parsed_results = parse_example_batch(examples_placeholder,
                                         sparse_features=FEATURE_NAMES)
    return tf.estimator.export.ServingInputReceiver(parsed_results,
                                                    receiver_tensors)


def main(_):
  config = RunConfig(dense_only_save_checkpoints_secs=600,
                     enable_fused_layout=True)
  model = MyModel()
  estimator = Estimator(model, config)
  estimator.train()


if __name__ == "__main__":
  tf.compat.v1.disable_eager_execution()
  tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)
  app.run(main)
  • 所有的自定义模型都要从 MonolithModel 继承

  • __init__ 里设定一些常用参数

  • input_fn 定义训练阶段的数据处理逻辑

  • model_fn 定义模型结构

  • serving_input_receiver_fn 定义推理阶段数据处理逻辑

  • main 定义整个训练的入口代码,后面会慢慢固化不再向用户开放编辑