Data

datasets

class PBDataset(*args, **kwargs)

Monolith 训练阶段的数据源头,它从标准输入/磁盘文件中读取数据。 其后可串联 Dataset (如 negative_gen )对样本进行改写,也可使用 filter_by_fids, filter_by_feature_value, negative_sample 等 Op 过滤样本。

Parameters:
  • *args – 用户无需关心,由调度侧自动注入

  • **kwargs – 用户无需关心,由调度侧自动注入

Raises:
  • TypeError – 如果有任何参数与类型不匹配, 则抛TypeError

  • ValueError – 如果有任何值与期望不匹配, 则抛ValueError

negative_gen(self, neg_num, per_channel=False, channel_feature='', item_features=[], start_num=500, max_item_num=100000, positive_label=1, negative_label=-1, negative_action=-99999, positive_actions=[], label_index=0, action_priority='', index_feature='', throw_origin=False, throw_origin_neg=False, cache_only_pos=False, cache_negative_actions=[], real_neg_instance_weight=1.0, sampled_neg_instance_weight=-1.0, unbias_sampled_neg=True, origin_neg_in_pool_proba=1.0, neg_sample_declay_factor=1.0, easy_hard_ratio=0.0, **kwargs)
如果样本中只有正例,没有负例,negative_gen 可根据需要生成随机负例。推荐系统中的样本通常是由 user 侧,item 侧两部分组成。 这里的做法是:
  • 先收集每个样本的 item 侧信息,生成一个 item 池子

  • item 池子并不是平铺的,而是按某个特征(channel_feature)分类组织。 如果在同一个 channel 随机取 item 得到的是 hard 负例,在其它 channel 中抽样得到的是 easy 负例

  • 并不是一开始就生成负例,而是要等 item 池子积累到一定大小才开始生成负例

Example

>>> from monolith.native_training.data import PBDataset
>>> dataset = PBDataset()
>>> dataset = dataset.negative_gen(neg_num=10,
...                                channel_feature="f_page",
...                                item_features=["f_item_id", "f_item_cat"],
...                                per_channel=True,
...                                start_num=1000,
...                                max_item_num=5000,
...                                negative_action=int(Actions.RANDOM_NEG),
...                                positive_actions=[int(Actions.CLICK)])
Parameters:
  • neg_num (int) – 为一个正例生成`neg_num`个负例

  • per_channel (bool) – 是否分桶

  • channel_feature (string) – 用于当 item 分桶的字段

  • item_features – (List[str]): item 侧的特征名列表

  • start_num (int) – 在 item 池子中积累多少个后才开始采样

  • max_iten_num (int) – 每一个 channel(如果分桶)最多收集多少个 item

  • positive_actions (List[int]) – 判定样本是否为正例的 action 值列表,若样本的 action positive_actions,则该样本是正例。如果样本的 action 有多个值,可以通过 action_priority 指定优先级, 选择优先级最高的一个。

  • negative_action (int) – 对于新生成的负例会为其 action 添加该 negative_action 值,即如果 negative_action!=-99999,则新生成的样本 action 值为 negative_action

  • positive_label (int) – 判定样本是否为正例的 label 值,若样本的 label[label_index] == positive_label ,则该样本是正例。注意:它的优先级低于 positive_actions,也就是两者同时指定时以 positive_actions 为准。

  • label_index (int) – 默认值为 0,根据样本的 label 值判定样本是否是正例时,如果样本 label 有多个,则用 label_index 指定真正起作用的 label。

  • negative_label (int) – 对于新生成的负例会为其 label 添加该 negative_label

  • cache_only_pos (bool) – 原始负例是否入池,cache_only_pos=True,原始负例不入池,负例池中的 item features 都来自原始正例。

  • easy_hard_ratio (float) – 当使用 per_channel 的时候,hard 和 easy 负例之间的比例。取值在 0 ~ 1 之间。举例:0.8就是大致 80% easy负例

Warning

  • positive_actionspositive_label 不要同时指定,同时指定时以 positive_actions 为准。

Raises:
  • TypeError – 如果有任何参数与类型不匹配, 则抛TypeError

  • ValueError – 如果有任何值与期望不匹配, 则抛ValueError

transform(self, t, **kwargs)

新版样本过滤/改写逻辑,在过滤条件复杂时比原来的写法更加高效。它接受一个 Transform 对象,然后执行该对象所定义的过滤改写逻辑。

Example

>>> from monolith.native_training.data import PBDataset, filter_by_fids, transforms
>>> # 原来的写法
>>> def filter_fn(variant):
...   return filter_by_fids(variant, has_fids=[100], has_actions=[1, 2])
>>>
>>> dataset = PBDataset()
>>> dataset = dataset.filter(filter_fn)
>>>
>>> # 新版本写法,两者过滤逻辑等价
>>> dataset = PBDataset()
>>> dataset = dataset.transform(
...     transforms.Compose([
...         transforms.FilterByFid(has_fids=[100]),
...         transforms.FilterByAction(has_actions=[1, 2])
...     ]))

Note

  • dataset.transform(...) 目前仅支持过滤行为,负例生成仍然需要使用 dataset.negative_gen(...)

Parameters:

t (Transform) – 样本改写方式

Raises:
  • TypeError – 如果有任何参数与类型不匹配,则抛TypeError

  • ValueError – 如果有任何值与期望不匹配,则抛ValueError

dataset ops

filter_by_fids(variant, filter_fids=None, has_fids=None, select_fids=None, has_actions=None, req_time_min=0, select_slots=None, variant_type='instance')

根据离散特征的 FID 值进行过滤

Parameters:
  • variant (Tensor) – 输入数据,必须是 variant 类型

  • filter_fids (List[int]) – 任意一个FID出现 filter_fids 中,则样本被过滤

  • has_fids (List[int]) – 任意一个FID出现在 has_fids 中,则样本被选择

  • select_fids (List[int]) – 所有 select_fids 均出现在样本中,样本才被选择

  • has_actions (List[int]) – 样本 actions 字段任意一个值出现在 has_actions 中,则样本被选择

  • req_time_min (int) – 请求时间最小值

  • select_slots (List[int]) – 所有 select_slots 均出现在样本中,样本才被选择

Warning

  • select_slots 即将被废弃,尽量不要使用

Example

>>> from monolith.native_training.data import PBDataset, filter_by_fids
>>> def filter_fn(variant):
...   return filter_by_fids(variant, has_actions=[1, 2], filter_fids=[100])
>>>
>>> dataset = PBDataset()
>>> dataset = dataset.filter(filter_fn)
Returns:

True 表示样本被保留,False 表示样本被丢弃

Return type:

bool

filter_by_feature_value(variant, field_name, op, operand, field_type, keep_empty=False, operand_filepath=None)

根据连续特征值进行过滤

Example

>>> from monolith.native_training.data import PBDataset, filter_by_feature_value
>>> def filter_fn(variant):
...   return filter_by_feature_value(variant, field_name='__meta__att_traced', op='neq', operand=1, field_type='int64')
>>>
>>> dataset = PBDataset()
>>> dataset = dataset.filter(filter_fn)
Parameters:
  • variant (Tensor) – 输入数据,必须是 variant 类型

  • field_name (List[int]) – 样本在指定 field_name 进行过滤

  • op (str) – 比较运算符,可以是 gt/ge/eq/lt/le/neq/between/in/not-in 等 布尔运算,也可以是 all/any/diff 等集合布尔运算

  • operand (Union[float, int, str, List[float], List[int], List[str]]) – 操作数,用于比较,可以为值或者 List

  • keep_empty (bool) – 默认 False

  • field_type (str) – 需要显式指定字段类型,可以为 int64/float/double/bytes/string ( bytes or string ,含义相同)

Returns:

True 表示样本被保留,False 表示样本被丢弃

Return type:

bool

negative_sample(variant, drop_rate, label_index=0, threshold=0.0, variant_type='instance', action_priority=None, per_action_drop_rate=None)

负例采样

Example

>>> from monolith.native_training.data import PBDataset, negative_sample
>>> def filter_fn(variant):
...   return negative_sample(dataset, drop_rate=0.5, label_index=0, threshold=0)
>>>
>>> dataset = PBDataset()
>>> dataset = dataset.filter(filter_fn)
Parameters:
  • variant (Tensor) – 输入数据,必须是 variant 类型

  • drop_rate (float) – 负例丢弃比例,取值区间为[0, 1),sample_rate = 1 - drop_rate

  • label_index (int) – 样本中 label 字段是一个 float 数组,label_index 表示本次启用哪一个 index 对应的 label

  • threshold (float) – label 值大于 threshold 的是正样本

Returns:

True 表示样本被保留,False 表示样本被丢弃

Return type:

bool

transforms

class Compose(transforms)

Composes several transforms together.

Parameters:

transforms (List[Transform]) – list of transforms to compose.

Example

>>> from monolith.native_training.data import transforms
>>> transforms.Compose([
...     transforms.FilterByFid(has_fids=[100]),
...     transforms.FilterByAction(has_actions=[1, 2]),
... ])
class FilterByFid(has_fids=None, filter_fids=None, select_fids=None, feature_name=None)

根据 Sparse Feature 的 FID 值进行过滤

Parameters:
  • has_fids (List[int]) – 包含给定任意一个 FID 值,则样本被保留。

  • filter_fids (List[int]) – 包含给定任意一个 FID 值,则样本被丢弃。

  • select_fids (List[int]) – 包含给定所有 FID 值,样本才被保留。

Example

>>> from monolith.native_training.data import transforms
>>> transforms.FilterByFid(has_fids=[100])
class FilterByAction(has_actions=None)

根据 action 的值进行过滤

Parameters:

has_actions (List[int]) – 包含给定任意一个 action 值,则样本被保留。

Example

>>> from monolith.native_training.data import transforms
>>> transforms.FilterByAction(has_actions=[1, 2])
class NegativeSample(drop_rate, label_index, threshold)

负例采样

Parameters:
  • drop_rate (float) – 负例丢弃比例,取值区间为[0, 1),sample_rate = 1 - drop_rate

  • label_index (int) – 样本中 labels 是一个列表,label_index表示本次启用哪一个 index 对应的 label

  • threshold (float) – label 值大于 threshold 的是正样本

Example

>>> from monolith.native_training.data import transforms
>>> transforms.NegativeSample(drop_rate=0.5, label_index=0, threshold=0)
class FilterByFeature(field_name, op, operand, field_type, keep_empty=False)

根据给定 dense 特征值过滤

Parameters:
  • field_name (List[int]) – 样本在指定 field_name 进行过滤

  • op (str) – 比较运算符,可以是 gt/ge/eq/lt/le/neq/between/in/not-in 等布尔运算,也可以是 all/any/diff 等集合布尔运算

  • operand (float) – 操作数,用于比较,可以为值或者 List

  • keep_empty (bool) – 默认 False

  • field_type (str) – 需要显式指定字段类型,可以为 int64/float/double/bytes/string ( bytes or string,含义相同)

Example

>>> from monolith.native_training.data import transforms
>>> transforms.FilterByFeature(field_name='__meta__att_traced',
...                            op='neq',
...                            operand=1,
...                            field_type='int64')
class LogicalOr(x, y)

对两个 Transform 进行或操作

Parameters:
  • x (Transform) – left hand side Transform

  • y (Transform) – right hand side Transform

Example

>>> from monolith.native_training.data import transforms
>>> transforms.LogicalOr(transforms.FilterByFid(has_fids=[100]),
...                      transforms.FilterByAction(has_actions=[1, 2]))
class LogicalAnd(x, y)

对两个 Transform 进行与操作

Parameters:
  • x (Transform) – left hand side Transform

  • y (Transform) – right hand side Transform

Example

>>> from monolith.native_training.data import transforms
>>> transforms.LogicalAnd(transforms.FilterByFidFilterByFid(has_fids=[100]),
...                       transforms.FilterByAction(has_actions=[1, 2]))
class LogicalNot(x)

Transform 取反

Parameters:

x (Transform) – Transform 操作

Example

>>> from monolith.native_training.data import transforms
>>> transforms.LogicalNot(transforms.FilterByFid(has_fids=[100]))

parsers

parse_examples(tensor, sparse_features, dense_features=None, dense_feature_shapes=None, dense_feature_types=None, extra_features=None, extra_feature_shapes=None)

将 List[Example] variant/string 解析成 Dict[str, Tensor] 以便后续入图

Example 格式中,所有特征均存于 feature 中,没有平铺特征。Sparse 特征由于长度不固定,输出 tf.RaggedTensor,其他特征输出 tf.Tensor

Example

>>> from monolith.native_training.data import PBDataset, parse_examples
>>> def parser(tensor):
...   extra_features = ['sample_rate', 'actions']
...   extra_feature_shapes = [1] * len(extra_features)
...   features = parse_examples(tensor,
...                             sparse_features=VALID_FNAMES,
...                             extra_features=extra_features,
...                             extra_feature_shapes=extra_feature_shapes)
...   if mode != tf.estimator.ModeKeys.PREDICT:
...     actions = tf.reshape(features['actions'], shape=(-1,))
...     features['label'] = tf.where(tf.math.equal(actions, int(Actions.CLICK)),
...                                  tf.ones_like(actions, dtype=tf.float32),
...                                  tf.zeros_like(actions, dtype=tf.float32))
...     features['sample_rate'] = tf.reshape(features['sample_rate'], shape=(-1,))
...   return features
>>>
>>> dataset = PBDataset()
>>> dataset = dataset.shuffle(100).batch(256, drop_remainder=True)
>>> dataset = dataset.map(parser)
Parameters:
  • tensor (tf.Tensor) – 输入样本

  • sparse_features (List[str]) – 稀疏特征名称,可以有多个

  • dense_features (List[str]) – 稠密特征(或 Label)名称,可以有多个,也可以有不同类型

  • dense_feature_shapes (List[int]) – 稠密特征名称的 shape

  • dense_feature_types (List[dtype]) – 稠密特征名称的数据类型,默认为 tf.float32

  • extra_features (List[str]) – 主要指 LineId 中的字段,可以有多个,Monolith会自动从 LineId 中提取数据类型

  • extra_feature_shapes (List[int]) – extra 特征名称的 shape

Warning

  • 解析 extra_features 时不需要指定其字段类型,因为框架可以从 LineId proto 定义中预先获取。

  • 解析 dense_features 字段类型只能由用户来指定,只有用户清楚其数据类型

Returns:

Dict[str, Tensor] 解析出特征名到特征的字典

parse_example_batch(tensor, sparse_features, dense_features=None, dense_feature_shapes=None, dense_feature_types=None, extra_features=None, extra_feature_shapes=None)

将 ExampleBatch variant/string tensor 解析成 Dict[str, Tensor] 以便后续入图

ExampleBatch 格式中,所有特征均存于 feature 中,没有平铺特征。Sparse 特征由于长度不固定,输出 tf.RaggedTensor,其他特征输出 tf.Tensor

Example

>>> from monolith.native_training.data import PBDataset, parse_example_batch
>>> def serving_input_receiver_fn(self):
...   receiver_tensors = {}
...   examples_placeholder = tf.compat.v1.placeholder(dtype=tf.string,
...                                                   shape=(None,))
...   receiver_tensors["example_batch"] = examples_placeholder
...   parsed_results = parse_example_batch(examples_placeholder,
...                                        sparse_features=VALID_FNAMES)
...   return tf.estimator.export.ServingInputReceiver(parsed_results,
...                                                   receiver_tensors)
Parameters:
  • tensor (tf.Tensor) – 输入样本

  • sparse_features (List[str]) – 稀疏特征名称,可以有多个

  • dense_features (List[str]) – 稠密特征(或 Label)名称,可以有多个,也可以有不同类型

  • dense_feature_shapes (List[int]) – 稠密特征名称的 shape

  • dense_feature_types (List[dtype]) – 稠密特征名称的数据类型,默认为 tf.float32

  • extra_features (List[str]) – 主要指 LineId 中的字段,可以有多个,Monolith 会自动从 LineId 中提取数据类型

  • extra_feature_shapes (List[int]) – extra 特征名称的 shape

Warning

  • 解析 extra_features 时不需要指定其字段类型,因为框架可以从 LineId proto 定义中预先获取。

  • 解析 dense_features 字段类型只能由用户来指定,只有用户清楚其数据类型

Returns:

Dict[str, Tensor] 解析出特征名到特征的字典