data

datasets

class PbType(value)[source]

An enumeration.

INSTANCE = 1
EXAMPLEBATCH = 2
EXAMPLE = 3
PLAINTEXT = 4
class FilePBDataset(file_name='', buffer_size=None, input_pb_type=None, output_pb_type=None, feature_pruning_type=2, disable_iterator_save_restore=True, use_snappy=None, **kwargs)[source]

从标准输入/pb文件中读取序列化数据,并将其反序列化存于TF的Variant类型中。这样做的好处是可以直接对PB对象进行过滤与修改, 不用等到parse以后。Monolith提供了一系列工具操作Variant变量,如filter_by_fids,filter_by_value,negative_sample等

另外,InstanceReweightDataset/NegativeGenDataset 这些DataSet也可以直接作用于Variant

Parameters:
  • file_name (str) – 文件名,如果为空,则从stdin读取数据

  • buffer_size (int) – 读取文件时缓存大小,默认100MB

  • input_pb_type (str) – 输入pb类型,可以是example/example_batch/instance

  • output_pb_type (str) – 输入pb类型,可以是example/instance/plaintext

Raises:
  • TypeError – 如果有任何参数与类型不匹配,则抛TypeError

  • ValueError – 如果有任何值与期望不匹配,则抛ValueError

class InstanceReweightDataset(input_dataset, action_priority=None, reweight=None, variant_type='example')[source]

样本重加权,并根据action给样本打标签,使用方式为 dataset.instance_reweight

一个样本可能有多个action,按`action_priority`,找到最高优的action。再用action找到对应的 action:weight:label, 让样本重复weight次(也有可能是0次,即删除样本),然后给样本打上label指定的标签

Parameters:
  • input_dataset (dataset) – 输入数据集

  • action_priority (str) – action用int表示,以逗号分隔的int数组,排在前面的优先级高

  • reweight (str) – 基本单元是`action:weight:label`,可以用逗号分隔多个基本单元 1) action: 动作,用int表示,与业务相关,如download,install,click,exposure等 2) weight: 权重,用int表示,表示样本重复的次数 3) label: 标签,一般用1/-1表示。

  • variant_type (str) – 输入数据是variant类型的,支持两种格式,instance/example

Raises:
  • TypeError – 如果有任何参数与类型不匹配,则抛TypeError

  • ValueError – 如果有任何值与期望不匹配,则抛ValueError

class NegativeGenDataset(input_dataset, neg_num, per_channel=False, channel_feature='', item_features=[], start_num=500, max_item_num=100000, positive_label=1, negative_label=-1, negative_action=-99999, positive_actions=[], label_index=0, action_priority='', index_feature='', throw_origin=False, throw_origin_neg=False, cache_only_pos=True, real_neg_instance_weight=1.0, sampled_neg_instance_weight=-1.0, unbias_sampled_neg=True, origin_neg_in_pool_proba=1.0, neg_sample_declay_factor=1.0, variant_type='example')[source]

负例生成。有时,样本中只有正例,没有负例,需要随机生成负例

推荐系统中的样本通常是由user侧,item侧两部分组成。这里的做法是:
  • 先收集每个样本的item侧信息,生成一个item池子

  • item池子并不是平铺的,而是按某个特征(channel_slot)分类组织的。如果在同一个channel随机取item得到的是hard负例,在其它channel中抽样得到的是easy负例

  • 并不是一开始就生成负例,而是要等item池子积累到一定大小才开始生成负例

Parameters:
  • input_dataset (dataset) – 输入数据集

  • neg_num (int) – 为一个正例生成`neg_num`个负例

  • channel_feature (string) – 用于当item分类的字段

  • per_channel (bool) – 是否分类

  • start_num (int) – 在item池子中积累多少个后才开始采样

  • max_iten_num (int) – 每一个channel最多收集多注个item

  • item_features – (List[str]): item侧的特征名列表

  • positive_label – 正例的label,仅为正例生成负例

  • negative_label – 生成的负例的被打上的label

Raises:
  • TypeError – 如果有任何参数与类型不匹配,则抛TypeError

  • ValueError – 如果有任何值与期望不匹配,则抛ValueError

parsers

parse_examples(tensor, sparse_features, dense_features=None, dense_feature_shapes=None, dense_feature_types=None, extra_features=None, extra_feature_shapes=None)[source]

从Tensor中解析example

Example格式中,所有特征均存于feature中,没有平铺特征。Sparse特征由于长度不定,输出RaggedTensor,其它特征输出Tensor

Parameters:
  • tensor (tf.Tensor) – 输入样本

  • sparse_features (List[str]) – 稀疏特征名称,可以有多个

  • dense_features (List[str]) – 稠密特征(或Label)名称,可以有多个,也可以有不同类型

  • dense_feature_shapes (List[int]) – 稠密特征名称的shape

  • dense_feature_types (List[dtype]) – 稠密特征名称的数据类型,默认为`tf.float32`

  • extra_features (List[str]) – 主要指LineId中的字段,可以有多个,Monolith会自动从LineId中提取数据类型

  • extra_feature_shapes (List[int]) – extra特征名称的shape

Returns:

Dict[str,Tensor] 解析出特征名到特征的字典

parse_example_batch(tensor, sparse_features, dense_features=None, dense_feature_shapes=None, dense_feature_types=None, extra_features=None, extra_feature_shapes=None)[source]

从Tensor中解析example_batch

Example_batch格式中,所有特征均存于feature中,没有平铺特征。Sparse特征由于长度不定,输出RaggedTensor,其它特征输出Tensor

Parameters:
  • tensor (tf.Tensor) – 输入样本

  • sparse_features (List[str]) – 稀疏特征名称,可以有多个

  • dense_features (List[str]) – 稠密特征(或Label)名称,可以有多个,也可以有不同类型

  • dense_feature_shapes (List[int]) – 稠密特征名称的shape

  • dense_feature_types (List[dtype]) – 稠密特征名称的数据类型,默认为`tf.float32`

  • extra_features (List[str]) – 主要指LineId中的字段,可以有多个,Monolith会自动从LineId中提取数据类型

  • extra_feature_shapes (List[int]) – extra特征名称的shape

Returns:

Dict[str,Tensor] 解析出特征名到特征的字典

feature_utils

filter_by_fids(variant, filter_fids=None, has_fids=None, select_fids=None, has_actions=None, req_time_min=0, select_slots=None, variant_type='instance')[source]

通过特征ID (FID) 过滤,离散特征过滤

Parameters:
  • variant (Tensor) – 输入数据,必须是variant类型

  • filter_fids (List[int]) – 任意一个FID出现`filter_fids`中,样本被过滤

  • has_fids (List[int]) – 任意一个FID出现在`has_fids`中,则样本被选择

  • select_fids (List[int]) – 所有`select_fids`均出现在样本中,则样本被选择

  • has_actions (List[int]) – 任意一个action出现在`has_actions`中,则样本被选择

  • req_time_min (int) – 请求时间最小值

  • select_slots (List[int]) – 所有`select_slots`均出现在样本中,样本才被选择

  • variant_type (str) – variant类型,可以为instance/example

Returns:

variant tensor,过滤后的数据,variant类型

filter_by_value(variant, field_name, op, operand, variant_type='instance', keep_empty=False, operand_filepath=None)[source]

通过值过滤,连续特征过滤,

Parameters:
  • variant (Tensor) – 输入数据,必须是variant类型

  • field_name (List[int]) – 任意一个FID出现`filter_fids`中,样本被过滤

  • op (str) – 比较运算符,可以是 gt/ge/eq/lt/le/neq/between/in/not-in 等 布尔运算,也可以是 all/any/diff 等集合布尔运算

  • operand (float) – 操作数,用于比较

  • variant_type (str) – variant类型,可以为instance/example

  • keep_empty (bool) – False

Returns:

variant tensor,过滤后的数据,variant类型

add_action(variant, field_name, op, operand, action, variant_type='example')[source]

根据指定 LineId 字段经过简单的关系运算,决定是否为 actions 字段增加值

Parameters:
  • variant (Tensor) – 输入数据,必须是 variant 类型

  • field_name (List[int]) – 根据 field_name 对应值进行条件判断

  • op (str) – 比较运算符,可以是 gt/ge/eq/lt/le/neq/between/in

  • operand (float) – 操作数,用于比较

  • action (int) – 当条件满足时,需要往 LineId.actions 添加的值

  • variant_type (str) – ‘instance’ 或 ‘example’

Returns:

variant tensor,改写后的数据,variant 类型

add_label(variant, config, negative_value, new_sample_rate, variant_type='example')[source]
根据给定配置决定是否添加 label,支持 multi-task label 生成,请务必配合

filter_by_label 过滤算子同时使用,否则可能会有无效样本被喂入训练器。

举例 config=’1,2:3:1.0;4::0.5’,表示一共有两个 task(;分隔), task1 pos_actions = {1,2},neg_actions = {3},sample_rate = 1.0,而 task2 pos_actions = {4},neg_actions 为空,sample_rate = 0.5 add_label 的执行逻辑如下

  • 对于 task1,如果当前样本的 actions 包含 {1,2} 任一个则判定为正例,否则根据给定 采样率决定是否采样(sample_rate < 1.0 方可触发采样),若触发采样且在采样范围内 标为负例,不在采样范围内置为无效 label,若未触发采样直接标记为负例。这个例子里由于 task1 的 sample_rate=1.0,因此不会触发负采样

  • 对于 task2,如果当前样本的 actions 包含 {4} 则判定为正例,由于未指定 neg_actions 对于不包含 {4} 的样本直接进行负采样,在采样范围内标为负例,不在采样范围内置为 无效 label。这个例子里由于 task2 的 sample_rate=0.5,因此会对于不包含 {4} 的样本 触发负采样

Parameters:
  • variant (Tensor) – 输入数据,必须是 variant 类型

  • config (str) – 形如 ‘1,2:3:1.0;4::0.5’

  • negative_value (float) – 如 -1.0 或 0.0

  • new_sample_rate (float) – 为 LineId.sample_rate 赋值

  • variant_type (str) – ‘instance’ 或 ‘example’

Returns:

variant tensor,改写后的数据,variant 类型

scatter_label(variant, config, variant_type='example')[source]

根据给定配置 scatter label 以支持 multi-task label 生成,配置形如 ‘chnid0:index0,chnid1:index1’,请务必配合 filter_by_label 过滤算子使用, 否则可能会有无效样本被喂入训练器。举例 config=’100:3,200:1,300:4’, 表示一共有 5 个 task(最大的 index=4),scatter_label 的执行逻辑如下

1。获取 label_value = label[0],亦即默认待处理样本的 label.size() > 0 2。重置待处理样本的 label 长度为 5,并全部初始化为 INVALID_LABEL 3。if 样本的 chnid = 100,label[3] = label_value 4。else if 样本的 chnid = 200,label[1] = label_value 5。else if 样本的 chnid = 300,label[4] = label_value 6。else 样本的 chnid not in {100,200,300},则 label 中全部值为 INVALID_LABEL

Parameters:
  • variant (Tensor) – 输入数据,必须是 variant 类型

  • config (str) – 形如 ‘100:3,200:1,300:4’

  • variant_type (str) – ‘instance’ 或 ‘example’

Returns:

variant tensor,改写后的数据,variant 类型

filter_by_label(variant, label_threshold, filter_equal=False, variant_type='example')[source]

根据给定配置决定是否保留当前样本,支持 multi-task

Parameters:
  • variant (Tensor) – 输入数据,必须是 variant 类型

  • label_threshold (List[float]) – 样本任一 label 值 >= 相应 label_threshold

  • [-100.0,0.0],假设样本 (值则样本被保留,否则被丢弃。举例 label_threshold =) –

    • label = [-1000,-1],则该样本被丢弃,即不存在任何合法 label 值

    • label = [-1000,0],则该样本被保留,即第 2 个 label 值合法

    • label = [-1,-1],则该样本被保留,即第 1 个 label 值合法

    • label = [-1,1],则该样本被保留,即第 1,2 个 label 值均合法

  • filter_equal (bool) – Whether to filter when label equals to threshold。

  • variant_type (str) – ‘instance’ 或 ‘example’

Returns:

valid tensor,是否保留当前样本

special_strategy(variant, strategy_list, strategy_conf=None, variant_type='instance', keep_empty_strategy=True)[source]

用LineID中的special_strategy进行过滤,

Parameters:
  • variant (Tensor) – 输入数据,必须是variant类型

  • strategy_list (List[int]) – strategy列表

  • strategy_conf (str) – 配置方式为 strategy:sample_rate:label,如果有多个可以用逗号分割。 用于实现采样,包括对正例/负例/所有样本采样,并修改样本标签

  • variant_type (str) – variant类型,可以为instance/example

  • keep_empty_strategy (bool) – 是否保留strategy为空的样本,默认为False

Returns:

variant tensor,过滤后的数据,variant类型

negative_sample(variant, drop_rate, label_index=0, threshold=0.0, variant_type='instance')[source]

负例采样

Parameters:
  • variant (Tensor) – 输入数据,必须是variant类型

  • drop_rate (float) – 负例丢弃比例,取值区间为[0,1),sample_rate = 1 - drop_rate。

  • label_index (int) – 样本中labels是一个列表,label_index表示本次启用哪一个index对应的label

  • threshold (float) – label是一个实数,大于`threshold`的是正样本

  • variant_type (str) – variant类型,可以为instance/example

Returns:

variant tensor,过滤后的数据,variant类型

feature_combine(src1, src2, slot)[source]

特征交叉,用于对已抽取Sparse特征的交叉

Parameters:
  • src1 (RaggedTensor) – 参与交叉的sparse特征,可以是简单特征,也可以是序列特征

  • src1 – 参与交叉的sparse特征,可以是简单特征,也可以是序列特征

  • slot (int) – 输出特征的slot

Returns:

RaggedTensor,交叉后的特征

switch_slot(ragged, slot)[source]

对Sparse特征切换slot

Parameters:
  • ragged (RaggedTensor) – 输入sparse特征,可以是简单特征,也可以是序列特征

  • slot (int) – 输出特征的slot

Returns:

RaggedTensor,切换后的特征

label_upper_bound(variant, label_upper_bounds, variant_type='instance')[source]

给label设置upper_bound,instance的label超过upper_bound的会被设置成upper_bound。 :param variant: 输入数据,必须是 variant 类型 :type variant: Tensor :param label_upper_bounds: 样本任一 label 值 >= 相应 label_upper_bounds :type label_upper_bounds: List[float] :param 时,该label会被设置为upper_bound: :param variant_type: ‘instance’ 或 ‘example’ :type variant_type: str

Returns:

variant tensor,label根据upper_bound调整后的数据,variant类型

label_normalization(variant, norm_methods, norm_values, variant_type='instance')[source]

对Label进行normalization,instance的label会被修改为norm之后的数值。 :param variant: 输入数据,必须是 variant 类型 :type variant: Tensor :param norm_methods: normlization的方法,例如log,scale,repow,scalelog :type norm_methods: List[str] :param norm_values: 对应normalization方法使用的norm_value,长度需要与norm_methods保持一致 :type norm_values: List[float] :param variant_type: ‘instance’ 或 ‘example’ :type variant_type: str

Returns:

variant tensor,label根据upper_bound调整后的数据,variant类型

use_field_as_label(variant, field_name, overwrite_invalid_value=False, label_threshold=7200, variant_type='instance')[source]

用line_id里的field作为新的label。 :param variant: 输入数据,必须是 variant 类型 :type variant: Tensor :param overwrite_invalid_value: 是否对新field进行overwrite,如果overwrite会在value >= label_threshold时overwrite成0。 :type overwrite_invalid_value: bool :param label_threshold: 对新field进行overwrite的threshold值,如果value >= label_threshold则改写为0。 :type label_threshold: List[float] :param variant_type: ‘instance’ 或 ‘example’ :type variant_type: str

Returns:

variant tensor,label根据upper_bound调整后的数据,variant类型