数据流

Monolith 训练时从标准输入(stdin)或磁盘文件读取数据,此时数据一般是列式存储的 ExampleBatchparquet。用户对样本可感知的操作有 4 类

  • 训练阶段,1 条样本变 0 条,即过滤样本,比如根据 FIDactions 过滤样本

  • 训练阶段,1 条样本变 N(N>=1) 条,即生成样本,比如随机生成负例

  • 训练阶段,dataset.batch() 之后使用 parse_examples 解析成 tf.Tensor 入图训练

  • 推理阶段,使用 parse_example_batch 将序列化的请求数据解析成 tf.Tensor 入图推理

1. 过滤样本

以下代码的过滤逻辑表示:如果样本中包含了 FID 100 或 200,并且其 actions 值包含 1 或 2,则样本被保留,其余被丢弃。

def filter_fn(variant):
  return filter_by_fids(variant, has_fids=[100, 200], has_actions=[1, 2])


dataset = PBDataset()
dataset = dataset.filter(filter_fn)

与之等价的新版写法为

dataset = PBDataset()
dataset = dataset.transform(
    transforms.Compose([
        transforms.FilterByFid(has_fids=[100, 200]),
        transforms.FilterByAction(has_actions=[1, 2])
    ]))

2. 负例生成

一个典型的负例生成使用方式如下

dataset = PBDataset()
dataset = dataset.negative_gen(neg_num=10,
                               channel_feature="f_page",
                               item_features=["f_item_id", "f_item_cat"],
                               per_channel=True,
                               start_num=1000,
                               max_item_num=5000,
                               negative_action=int(Actions.RANDOM_NEG),
                               positive_actions=[int(Actions.CLICK)])

按如上参数设定

  • 它为每个正例生成 10 个负例

  • 它指定 per_channel=Truechannel_feature="f_page",则生成负例时会根据 f_page 字段进行分桶,如果 f_page 一共有 3 可能值,则会积攒 3 个 item 池

  • start_num=1000, max_item_num=5000 意味着某个 item 池只有积攒达到 1000 时才会开始生成负例,但该池子最多存 5000 个,超过 5000 之后按 FIFO(first in first out) 原则更新池子

  • 它判定样本里 action [int(Actions.CLICK)] 的为正例

  • 新生成的负例样本的 action=int(Actions.RANDOM_NEG)

3. 数据解析

3.1 input_fn

input_fn 对应训练阶段,解析样本使用 parse_examples。以下是一个典型的例子

def parser(tensor):
  extra_features = ['sample_rate', 'actions']
  extra_feature_shapes = [1] * len(extra_features)
  features = parse_examples(tensor,
                            sparse_features=VALID_FNAMES,
                            extra_features=extra_features,
                            extra_feature_shapes=extra_feature_shapes)
  if mode != tf.estimator.ModeKeys.PREDICT:
    actions = tf.reshape(features['actions'], shape=(-1,))
    features['label'] = tf.where(tf.math.equal(actions, int(Actions.CLICK)),
                                 tf.ones_like(actions, dtype=tf.float32),
                                 tf.zeros_like(actions, dtype=tf.float32))
    features['sample_rate'] = tf.reshape(features['sample_rate'], shape=(-1,))
  return features

dataset = PBDataset()
dataset = dataset.shuffle(100).batch(256, drop_remainder=True)
dataset = dataset.map(parser)
  • 由于训练阶段普遍存在过滤/改写样本的操作,即使数据源头是列存格式,也会在 PBDataset 出来之后展开成行存样本格式

  • dataset.batch() 之后数据流的 element 变成 List[Example]

  • parse_examples 负责将 List[Example] parse 成 tf.Tensor 以便后续入图


值得注意的是,我们推荐 drop_remainder=True,表示当前 worker 训练将近末了, 剩余的数据已经不足凑够 batch_size(这里是 256)时,则剩余的样本扔掉不进训练器。假设某个训练任务 batch_size=256, drop_remainder=True,并且 worker_num=100, 则最坏的情况下会有 255 x 100 = 25500 条数据被丢弃不进训练器。


3.2 serving_input_receiver_fn

serving_input_receiver_fn 对应推理阶段,解析样本使用 parse_example_batch。以下是一个典型的例子

def serving_input_receiver_fn(self):
  receiver_tensors = {}
  examples_placeholder = tf.compat.v1.placeholder(dtype=tf.string,
                                                  shape=(None,))
  receiver_tensors["example_batch"] = examples_placeholder
  parsed_results = parse_example_batch(examples_placeholder,
                                       sparse_features=VALID_FNAMES)
  return tf.estimator.export.ServingInputReceiver(parsed_results,
                                                  receiver_tensors)
  • receiver_tensors["example_batch"] 是固定写法,不能改动

3.3 extra_features/dense_features

大部分同学有时对 extra_features/dense_features 表示困惑。 简单来讲,extra_features 只解析 LineId 中的字段,而 dense_features 则更加广泛,所有非 FID 特征都可以解析,可以认为后者是前者的超集。

message LineId
    optional fixed64 uid = 2;                        // 用户原始ID
    optional int64 req_time = 3;                     // 请求时间
    optional fixed64 item_id = 4;                    // 物品ID
    optional string req_id = 5;                      // 请求ID
    repeated int32 actions = 6 [packed=true];        // 行为列表
    optional int64 generate_time = 20;               // 样本生成时间
    optional int32 emit_type = 21;                   // 样本发送类型
    repeated int32 pre_actions = 23 [packed=true];   // 上一次行为列表
    optional string model_names = 25;                // 模型名,multi-task样本中用到
    optional float sample_rate = 27 [default = 1.0]; // 负样本采样率
}

LineId 是历史遗留问题,后续会渐渐弃用。目前数据流侧在进行双写操作,以 actions 字段为例, 它目前同时出现在样本 LineId.actions 中以及 __meta__actions 字段中。所以下面的代码是等价的

features = parse_examples(tensor,
                          extra_features=['actions'],
                          extra_feature_shapes=[1],
                          dense_features=['img_embedding'],
                          dense_feature_shapes=[1024],
                          dense_feature_types=[tf.float32])
# 等价于
features = parse_examples(tensor,
                          dense_features=['__meta__actions', 'img_embedding'],
                          dense_feature_shapes=[1, 1024],
                          dense_feature_types=[tf.int64, tf.float32])

值得注意的是

  • 解析 extra_features 时不需要指定其字段类型,因为框架可以从 LineId proto 定义中预先获取。 但 dense_features 字段类型只能由用户来指定,只有用户清楚其数据类型。