数据流¶
Monolith 训练时从标准输入(stdin)或磁盘文件读取数据,此时数据一般是列式存储的 ExampleBatch
或 parquet
。用户对样本可感知的操作有 4 类
训练阶段,1 条样本变 0 条,即过滤样本,比如根据
FID
或actions
过滤样本训练阶段,1 条样本变 N(N>=1) 条,即生成样本,比如随机生成负例
训练阶段,
dataset.batch()
之后使用parse_examples
解析成tf.Tensor
入图训练推理阶段,使用
parse_example_batch
将序列化的请求数据解析成tf.Tensor
入图推理
1. 过滤样本¶
以下代码的过滤逻辑表示:如果样本中包含了 FID
100 或 200,并且其 actions
值包含 1 或 2,则样本被保留,其余被丢弃。
def filter_fn(variant):
return filter_by_fids(variant, has_fids=[100, 200], has_actions=[1, 2])
dataset = PBDataset()
dataset = dataset.filter(filter_fn)
与之等价的新版写法为
dataset = PBDataset()
dataset = dataset.transform(
transforms.Compose([
transforms.FilterByFid(has_fids=[100, 200]),
transforms.FilterByAction(has_actions=[1, 2])
]))
2. 负例生成¶
一个典型的负例生成使用方式如下
dataset = PBDataset()
dataset = dataset.negative_gen(neg_num=10,
channel_feature="f_page",
item_features=["f_item_id", "f_item_cat"],
per_channel=True,
start_num=1000,
max_item_num=5000,
negative_action=int(Actions.RANDOM_NEG),
positive_actions=[int(Actions.CLICK)])
按如上参数设定
它为每个正例生成 10 个负例
它指定
per_channel=True
且channel_feature="f_page"
,则生成负例时会根据f_page
字段进行分桶,如果f_page
一共有 3 可能值,则会积攒 3 个 item 池start_num=1000
,max_item_num=5000
意味着某个 item 池只有积攒达到 1000 时才会开始生成负例,但该池子最多存 5000 个,超过 5000 之后按 FIFO(first in first out) 原则更新池子它判定样本里
action ∈ [int(Actions.CLICK)]
的为正例新生成的负例样本的
action=int(Actions.RANDOM_NEG)
3. 数据解析¶
3.1 input_fn
¶
input_fn
对应训练阶段,解析样本使用 parse_examples
。以下是一个典型的例子
def parser(tensor):
extra_features = ['sample_rate', 'actions']
extra_feature_shapes = [1] * len(extra_features)
features = parse_examples(tensor,
sparse_features=VALID_FNAMES,
extra_features=extra_features,
extra_feature_shapes=extra_feature_shapes)
if mode != tf.estimator.ModeKeys.PREDICT:
actions = tf.reshape(features['actions'], shape=(-1,))
features['label'] = tf.where(tf.math.equal(actions, int(Actions.CLICK)),
tf.ones_like(actions, dtype=tf.float32),
tf.zeros_like(actions, dtype=tf.float32))
features['sample_rate'] = tf.reshape(features['sample_rate'], shape=(-1,))
return features
dataset = PBDataset()
dataset = dataset.shuffle(100).batch(256, drop_remainder=True)
dataset = dataset.map(parser)
由于训练阶段普遍存在过滤/改写样本的操作,即使数据源头是列存格式,也会在
PBDataset
出来之后展开成行存样本格式dataset.batch()
之后数据流的element
变成List[Example]
。parse_examples
负责将List[Example]
parse 成tf.Tensor
以便后续入图
值得注意的是,我们推荐
drop_remainder=True
,表示当前 worker 训练将近末了, 剩余的数据已经不足凑够batch_size
(这里是 256)时,则剩余的样本扔掉不进训练器。假设某个训练任务batch_size=256, drop_remainder=True
,并且worker_num=100
, 则最坏的情况下会有255 x 100 = 25500
条数据被丢弃不进训练器。
3.2 serving_input_receiver_fn
¶
serving_input_receiver_fn
对应推理阶段,解析样本使用 parse_example_batch
。以下是一个典型的例子
def serving_input_receiver_fn(self):
receiver_tensors = {}
examples_placeholder = tf.compat.v1.placeholder(dtype=tf.string,
shape=(None,))
receiver_tensors["example_batch"] = examples_placeholder
parsed_results = parse_example_batch(examples_placeholder,
sparse_features=VALID_FNAMES)
return tf.estimator.export.ServingInputReceiver(parsed_results,
receiver_tensors)
receiver_tensors["example_batch"]
是固定写法,不能改动
3.3 extra_features/dense_features
¶
大部分同学有时对 extra_features/dense_features
表示困惑。
简单来讲,extra_features
只解析 LineId
中的字段,而 dense_features
则更加广泛,所有非 FID 特征都可以解析,可以认为后者是前者的超集。
message LineId
optional fixed64 uid = 2; // 用户原始ID
optional int64 req_time = 3; // 请求时间
optional fixed64 item_id = 4; // 物品ID
optional string req_id = 5; // 请求ID
repeated int32 actions = 6 [packed=true]; // 行为列表
optional int64 generate_time = 20; // 样本生成时间
optional int32 emit_type = 21; // 样本发送类型
repeated int32 pre_actions = 23 [packed=true]; // 上一次行为列表
optional string model_names = 25; // 模型名,multi-task样本中用到
optional float sample_rate = 27 [default = 1.0]; // 负样本采样率
}
LineId
是历史遗留问题,后续会渐渐弃用。目前数据流侧在进行双写操作,以 actions
字段为例,
它目前同时出现在样本 LineId.actions
中以及 __meta__actions
字段中。所以下面的代码是等价的
features = parse_examples(tensor,
extra_features=['actions'],
extra_feature_shapes=[1],
dense_features=['img_embedding'],
dense_feature_shapes=[1024],
dense_feature_types=[tf.float32])
# 等价于
features = parse_examples(tensor,
dense_features=['__meta__actions', 'img_embedding'],
dense_feature_shapes=[1, 1024],
dense_feature_types=[tf.int64, tf.float32])
值得注意的是
解析
extra_features
时不需要指定其字段类型,因为框架可以从LineId
proto 定义中预先获取。 但dense_features
字段类型只能由用户来指定,只有用户清楚其数据类型。