模型结构

Monolith 在 TensorFlow 基础上扩展了动态 Embedding Hash Table,除此之外,模型结构的定义几乎与原生 TensorFlow 无异。 同时,Monolith 预置提供了封装好的 dense layer 供用户使用。

1. 精排模型

demo 展示了一个典型精排模型的编写。

2. 召回模型

模型结构

目前召回一般是 FFM 结构

ffm

每个特征被划分成 K 个 field,特征在每个 field 都会有单独的 embedding,计算二次项参数时, 根据两个特征所属的 field 使用不同的 embedding 计算点乘。目前通用的召回模型是简化版的三域 FFM, 即将 user,group(item),context 作为三个 field,每个特征都有两部分 embedding, 分别对应其他两个 field。 如下图所示,128 维的 embedding 分成低 64 维和高 64 维,两两点乘实现交叉,为了减少计算,field内部先进行了 sum_pooling 处理。

recall

因此 FFM 的计算公式转化成:

recall_score

其中

  • group 和 context 是已知的

  • g_c * c_g 部分和 group bias 可以离线计算得到

  • u_c 、c_u 、context bias、user_bias在一次请求中是固定的,与 group 排序无关,可以省略

  • u_g * g_u 部分需要在线计算

因此 FFM 线上 serving 时,需要实现两部分工作:

  1. 离线计算 offline_score = g_c * c_g + group bias,这部分通过 fermat 模块实现

  2. 在线计算 u_g * g_u + offline score , 这部分是 viking 实现

模型编写

召回模型与精排模型编写几乎无异,但值得注意的是,由于召回模型在 serving 阶段天然与精排模型有差异,所以召回模型导出 serving 图时需要一些额外操作。 一般类似如下代码定义了召回 FFM 所需要的所有 part

def model_fn(self, features: Dict[str, tf.Tensor],
             mode: tf.estimator.ModeKeys):
  item_embeddings = self.lookup_embedding_slice(features=ITEM_FNAMES,
                                                slice_name='item_emb',
                                                slice_dim=32, **deep)

  user_embeddings = self.lookup_embedding_slice(features=USER_FNAMES,
                                                slice_name='user_emb',
                                                slice_dim=32, **deep)d

  context_embeddings = self.lookup_embedding_slice(features=CONTEXT_FNAMES,
                                                   slice_name='context_emb',
                                                   slice_dim=32, **deep)

  user_lr_concat = self.lookup_embedding_slice(features=USER_FNAMES,
                                               slice_name='lr_weight',
                                               slice_dim=1, **wide)

  item_lr_concat = self.lookup_embedding_slice(features=ITEM_FNAMES,
                                               slice_name='lr_weight',
                                               slice_dim=1, **wide)

  context_lr_concat = self.lookup_embedding_slice(features=CONTEXT_FNAMES,
                                                  slice_name='lr_weight',
                                                  slice_dim=1, **wide)

  user_bias = tf.reduce_sum(tf.add_n(user_lr_concat), axis=1, name='user_lr')
  item_bias = tf.reduce_sum(tf.add_n(item_lr_concat), axis=1, name='item_lr')
  context_bias = tf.reduce_sum(tf.add_n(context_lr_concat), axis=1, name='context')

  item_sum_pooling = tf.add_n(item_embeddings)
  user_sum_pooling = tf.add_n(user_embeddings)
  context_sum_pooling = tf.add_n(context_embeddings)

  user_embedding = tf.identity(user_sum_pooling, name='user_embedding')
  item_embedding = tf.identity(item_sum_pooling, name='item_embedding')
  context_embedding = tf.identity(context_sum_pooling, name='cid_tensor')

model_fn 中要特别加上以下代码,以便 serving 图正确导出

if mode == tf.estimator.ModeKeys.PREDICT:
  self.add_extra_output(
      "user_subgraph", {
          "user_embedding": user_embedding,
          "cid_tensor": context_embedding,
          "user_lr": user_bias,
          "context": context_bias
      })
  self.add_extra_output("item_subgraph", {
      "item_embedding": item_embedding,
      "item_lr": item_bias
  })

3. Multi-task 模型

Monolith 同样支持 multi-task(multi-head) 模型训练,只需要保证 model_fn 返回的 EstimatorSpec 符合 multi-task 编写规则即可

def input_fn(self, mode) -> 'DatasetV2':

  def parser(tensor):
    extra_features = ['sample_rate', 'actions']
    extra_feature_shapes = [1] * len(extra_features)
    features = parse_examples(tensor,
                              sparse_features=VALID_NAMES,
                              extra_features=extra_features,
                              extra_feature_shapes=extra_feature_shapes)

    if mode != tf.estimator.ModeKeys.PREDICT:
      actions = tf.reshape(features['actions'], shape=(-1,))
      features['label_ctr'] = tf.where(tf.math.equal(actions, 1),
                                       tf.ones_like(actions, dtype=tf.float32),
                                       tf.zeros_like(actions, dtype=tf.float32))
      features['label_cvr'] = tf.where(tf.math.equal(actions, 2),
                                       tf.ones_like(actions, dtype=tf.float32),
                                       tf.zeros_like(actions, dtype=tf.float32))
      features['sample_rate'] = tf.reshape(features['sample_rate'], shape=(-1,))
    return features

  dataset = PBDataset()
  dataset = dataset.batch(self.batch_size, drop_remainder=True)
  dataset = dataset.map(parser)
  return dataset

def model_fn(self, features: Dict[str, tf.Tensor], mode: tf.estimator.ModeKeys):
  ...
  label_ctr = features.get('label_ctr', None)
  label_cvr = features.get('label_cvr', None)
  return EstimatorSpec(label=[label_ctr, label_cvr],
                       pred=[pred_ctr, pred_cvr],
                       head_name=['ctr', 'cvr'],
                       loss=loss,
                       optimizer=optimizer,
                       classification=[True, True])

其中,labels, preds, head_names, classifications 是等长的数组,分别存储对应 task(head) 的 label(Tensor), pred(Tensor), head_name(str), classification(bool)。