data¶
datasets¶
- class PbType(value)[source]¶
An enumeration.
- INSTANCE = 1¶
- EXAMPLEBATCH = 2¶
- EXAMPLE = 3¶
- PLAINTEXT = 4¶
- class FilePBDataset(file_name='', buffer_size=None, input_pb_type=None, output_pb_type=None, feature_pruning_type=2, disable_iterator_save_restore=True, use_snappy=None, **kwargs)[source]¶
从标准输入/pb文件中读取序列化数据,并将其反序列化存于TF的Variant类型中。这样做的好处是可以直接对PB对象进行过滤与修改, 不用等到parse以后。Monolith提供了一系列工具操作Variant变量,如filter_by_fids,filter_by_value,negative_sample等
另外,InstanceReweightDataset/NegativeGenDataset 这些DataSet也可以直接作用于Variant
- Parameters:
file_name (
str
) – 文件名,如果为空,则从stdin读取数据buffer_size (
int
) – 读取文件时缓存大小,默认100MBinput_pb_type (
str
) – 输入pb类型,可以是example/example_batch/instanceoutput_pb_type (
str
) – 输入pb类型,可以是example/instance/plaintext
- Raises:
TypeError – 如果有任何参数与类型不匹配,则抛TypeError
ValueError – 如果有任何值与期望不匹配,则抛ValueError
- class InstanceReweightDataset(input_dataset, action_priority=None, reweight=None, variant_type='example')[source]¶
样本重加权,并根据action给样本打标签,使用方式为 dataset.instance_reweight
一个样本可能有多个action,按`action_priority`,找到最高优的action。再用action找到对应的 action:weight:label, 让样本重复weight次(也有可能是0次,即删除样本),然后给样本打上label指定的标签
- Parameters:
input_dataset (
dataset
) – 输入数据集action_priority (
str
) – action用int表示,以逗号分隔的int数组,排在前面的优先级高reweight (
str
) – 基本单元是`action:weight:label`,可以用逗号分隔多个基本单元 1) action: 动作,用int表示,与业务相关,如download,install,click,exposure等 2) weight: 权重,用int表示,表示样本重复的次数 3) label: 标签,一般用1/-1表示。variant_type (
str
) – 输入数据是variant类型的,支持两种格式,instance/example
- Raises:
TypeError – 如果有任何参数与类型不匹配,则抛TypeError
ValueError – 如果有任何值与期望不匹配,则抛ValueError
- class NegativeGenDataset(input_dataset, neg_num, per_channel=False, channel_feature='', item_features=[], start_num=500, max_item_num=100000, positive_label=1, negative_label=-1, negative_action=-99999, positive_actions=[], label_index=0, action_priority='', index_feature='', throw_origin=False, throw_origin_neg=False, cache_only_pos=True, real_neg_instance_weight=1.0, sampled_neg_instance_weight=-1.0, unbias_sampled_neg=True, origin_neg_in_pool_proba=1.0, neg_sample_declay_factor=1.0, variant_type='example')[source]¶
负例生成。有时,样本中只有正例,没有负例,需要随机生成负例
- 推荐系统中的样本通常是由user侧,item侧两部分组成。这里的做法是:
先收集每个样本的item侧信息,生成一个item池子
item池子并不是平铺的,而是按某个特征(channel_slot)分类组织的。如果在同一个channel随机取item得到的是hard负例,在其它channel中抽样得到的是easy负例
并不是一开始就生成负例,而是要等item池子积累到一定大小才开始生成负例
- Parameters:
input_dataset (
dataset
) – 输入数据集neg_num (
int
) – 为一个正例生成`neg_num`个负例channel_feature (
string
) – 用于当item分类的字段per_channel (
bool
) – 是否分类start_num (
int
) – 在item池子中积累多少个后才开始采样max_iten_num (
int
) – 每一个channel最多收集多注个itemitem_features – (
List[str]
): item侧的特征名列表positive_label – 正例的label,仅为正例生成负例
negative_label – 生成的负例的被打上的label
- Raises:
TypeError – 如果有任何参数与类型不匹配,则抛TypeError
ValueError – 如果有任何值与期望不匹配,则抛ValueError
parsers¶
- parse_examples(tensor, sparse_features, dense_features=None, dense_feature_shapes=None, dense_feature_types=None, extra_features=None, extra_feature_shapes=None)[source]¶
从Tensor中解析example
Example格式中,所有特征均存于feature中,没有平铺特征。Sparse特征由于长度不定,输出RaggedTensor,其它特征输出Tensor
- Parameters:
tensor (
tf.Tensor
) – 输入样本sparse_features (
List[str]
) – 稀疏特征名称,可以有多个dense_features (
List[str]
) – 稠密特征(或Label)名称,可以有多个,也可以有不同类型dense_feature_shapes (
List[int]
) – 稠密特征名称的shapedense_feature_types (
List[dtype]
) – 稠密特征名称的数据类型,默认为`tf.float32`extra_features (
List[str]
) – 主要指LineId中的字段,可以有多个,Monolith会自动从LineId中提取数据类型extra_feature_shapes (
List[int]
) – extra特征名称的shape
- Returns:
Dict[str,Tensor] 解析出特征名到特征的字典
- parse_example_batch(tensor, sparse_features, dense_features=None, dense_feature_shapes=None, dense_feature_types=None, extra_features=None, extra_feature_shapes=None)[source]¶
从Tensor中解析example_batch
Example_batch格式中,所有特征均存于feature中,没有平铺特征。Sparse特征由于长度不定,输出RaggedTensor,其它特征输出Tensor
- Parameters:
tensor (
tf.Tensor
) – 输入样本sparse_features (
List[str]
) – 稀疏特征名称,可以有多个dense_features (
List[str]
) – 稠密特征(或Label)名称,可以有多个,也可以有不同类型dense_feature_shapes (
List[int]
) – 稠密特征名称的shapedense_feature_types (
List[dtype]
) – 稠密特征名称的数据类型,默认为`tf.float32`extra_features (
List[str]
) – 主要指LineId中的字段,可以有多个,Monolith会自动从LineId中提取数据类型extra_feature_shapes (
List[int]
) – extra特征名称的shape
- Returns:
Dict[str,Tensor] 解析出特征名到特征的字典
feature_utils¶
- filter_by_fids(variant, filter_fids=None, has_fids=None, select_fids=None, has_actions=None, req_time_min=0, select_slots=None, variant_type='instance')[source]¶
通过特征ID (FID) 过滤,离散特征过滤
- Parameters:
variant (
Tensor
) – 输入数据,必须是variant类型filter_fids (
List[int]
) – 任意一个FID出现`filter_fids`中,样本被过滤has_fids (
List[int]
) – 任意一个FID出现在`has_fids`中,则样本被选择select_fids (
List[int]
) – 所有`select_fids`均出现在样本中,则样本被选择has_actions (
List[int]
) – 任意一个action出现在`has_actions`中,则样本被选择req_time_min (
int
) – 请求时间最小值select_slots (
List[int]
) – 所有`select_slots`均出现在样本中,样本才被选择variant_type (
str
) – variant类型,可以为instance/example
- Returns:
variant tensor,过滤后的数据,variant类型
- filter_by_value(variant, field_name, op, operand, variant_type='instance', keep_empty=False, operand_filepath=None)[source]¶
通过值过滤,连续特征过滤,
- Parameters:
variant (
Tensor
) – 输入数据,必须是variant类型field_name (
List[int]
) – 任意一个FID出现`filter_fids`中,样本被过滤op (
str
) – 比较运算符,可以是 gt/ge/eq/lt/le/neq/between/in/not-in 等 布尔运算,也可以是 all/any/diff 等集合布尔运算operand (
float
) – 操作数,用于比较variant_type (
str
) – variant类型,可以为instance/examplekeep_empty (
bool
) – False
- Returns:
variant tensor,过滤后的数据,variant类型
- add_action(variant, field_name, op, operand, action, variant_type='example')[source]¶
根据指定 LineId 字段经过简单的关系运算,决定是否为 actions 字段增加值
- Parameters:
variant (
Tensor
) – 输入数据,必须是 variant 类型field_name (
List[int]
) – 根据 field_name 对应值进行条件判断op (
str
) – 比较运算符,可以是 gt/ge/eq/lt/le/neq/between/inoperand (
float
) – 操作数,用于比较action (
int
) – 当条件满足时,需要往 LineId.actions 添加的值variant_type (
str
) – ‘instance’ 或 ‘example’
- Returns:
variant tensor,改写后的数据,variant 类型
- add_label(variant, config, negative_value, new_sample_rate, variant_type='example')[source]¶
- 根据给定配置决定是否添加 label,支持 multi-task label 生成,请务必配合
filter_by_label 过滤算子同时使用,否则可能会有无效样本被喂入训练器。
举例 config=’1,2:3:1.0;4::0.5’,表示一共有两个 task(;分隔), task1 pos_actions = {1,2},neg_actions = {3},sample_rate = 1.0,而 task2 pos_actions = {4},neg_actions 为空,sample_rate = 0.5 add_label 的执行逻辑如下
对于 task1,如果当前样本的 actions 包含 {1,2} 任一个则判定为正例,否则根据给定 采样率决定是否采样(sample_rate < 1.0 方可触发采样),若触发采样且在采样范围内 标为负例,不在采样范围内置为无效 label,若未触发采样直接标记为负例。这个例子里由于 task1 的 sample_rate=1.0,因此不会触发负采样
对于 task2,如果当前样本的 actions 包含 {4} 则判定为正例,由于未指定 neg_actions 对于不包含 {4} 的样本直接进行负采样,在采样范围内标为负例,不在采样范围内置为 无效 label。这个例子里由于 task2 的 sample_rate=0.5,因此会对于不包含 {4} 的样本 触发负采样
- Parameters:
variant (
Tensor
) – 输入数据,必须是 variant 类型config (
str
) – 形如 ‘1,2:3:1.0;4::0.5’negative_value (
float
) – 如 -1.0 或 0.0new_sample_rate (
float
) – 为 LineId.sample_rate 赋值variant_type (
str
) – ‘instance’ 或 ‘example’
- Returns:
variant tensor,改写后的数据,variant 类型
- scatter_label(variant, config, variant_type='example')[source]¶
根据给定配置 scatter label 以支持 multi-task label 生成,配置形如 ‘chnid0:index0,chnid1:index1’,请务必配合 filter_by_label 过滤算子使用, 否则可能会有无效样本被喂入训练器。举例 config=’100:3,200:1,300:4’, 表示一共有 5 个 task(最大的 index=4),scatter_label 的执行逻辑如下
1。获取 label_value = label[0],亦即默认待处理样本的 label.size() > 0 2。重置待处理样本的 label 长度为 5,并全部初始化为 INVALID_LABEL 3。if 样本的 chnid = 100,label[3] = label_value 4。else if 样本的 chnid = 200,label[1] = label_value 5。else if 样本的 chnid = 300,label[4] = label_value 6。else 样本的 chnid not in {100,200,300},则 label 中全部值为 INVALID_LABEL
- Parameters:
variant (
Tensor
) – 输入数据,必须是 variant 类型config (
str
) – 形如 ‘100:3,200:1,300:4’variant_type (
str
) – ‘instance’ 或 ‘example’
- Returns:
variant tensor,改写后的数据,variant 类型
- filter_by_label(variant, label_threshold, filter_equal=False, variant_type='example')[source]¶
根据给定配置决定是否保留当前样本,支持 multi-task
- Parameters:
variant (
Tensor
) – 输入数据,必须是 variant 类型label_threshold (
List[float]
) – 样本任一 label 值 >= 相应 label_threshold[-100.0,0.0],假设样本 (值则样本被保留,否则被丢弃。举例 label_threshold =) –
label = [-1000,-1],则该样本被丢弃,即不存在任何合法 label 值
label = [-1000,0],则该样本被保留,即第 2 个 label 值合法
label = [-1,-1],则该样本被保留,即第 1 个 label 值合法
label = [-1,1],则该样本被保留,即第 1,2 个 label 值均合法
filter_equal (
bool
) – Whether to filter when label equals to threshold。variant_type (
str
) – ‘instance’ 或 ‘example’
- Returns:
valid tensor,是否保留当前样本
- special_strategy(variant, strategy_list, strategy_conf=None, variant_type='instance', keep_empty_strategy=True)[source]¶
用LineID中的special_strategy进行过滤,
- Parameters:
variant (
Tensor
) – 输入数据,必须是variant类型strategy_list (
List[int]
) – strategy列表strategy_conf (
str
) – 配置方式为 strategy:sample_rate:label,如果有多个可以用逗号分割。 用于实现采样,包括对正例/负例/所有样本采样,并修改样本标签variant_type (
str
) – variant类型,可以为instance/examplekeep_empty_strategy (
bool
) – 是否保留strategy为空的样本,默认为False
- Returns:
variant tensor,过滤后的数据,variant类型
- negative_sample(variant, drop_rate, label_index=0, threshold=0.0, variant_type='instance')[source]¶
负例采样
- Parameters:
variant (
Tensor
) – 输入数据,必须是variant类型drop_rate (
float
) – 负例丢弃比例,取值区间为[0,1),sample_rate = 1 - drop_rate。label_index (
int
) – 样本中labels是一个列表,label_index表示本次启用哪一个index对应的labelthreshold (
float
) – label是一个实数,大于`threshold`的是正样本variant_type (
str
) – variant类型,可以为instance/example
- Returns:
variant tensor,过滤后的数据,variant类型
- feature_combine(src1, src2, slot)[source]¶
特征交叉,用于对已抽取Sparse特征的交叉
- Parameters:
src1 (
RaggedTensor
) – 参与交叉的sparse特征,可以是简单特征,也可以是序列特征src1 – 参与交叉的sparse特征,可以是简单特征,也可以是序列特征
slot (
int
) – 输出特征的slot
- Returns:
RaggedTensor,交叉后的特征
- switch_slot(ragged, slot)[source]¶
对Sparse特征切换slot
- Parameters:
ragged (
RaggedTensor
) – 输入sparse特征,可以是简单特征,也可以是序列特征slot (
int
) – 输出特征的slot
- Returns:
RaggedTensor,切换后的特征
- label_upper_bound(variant, label_upper_bounds, variant_type='instance')[source]¶
给label设置upper_bound,instance的label超过upper_bound的会被设置成upper_bound。 :param variant: 输入数据,必须是 variant 类型 :type variant:
Tensor
:param label_upper_bounds: 样本任一 label 值 >= 相应 label_upper_bounds :type label_upper_bounds:List[float]
:param 时,该label会被设置为upper_bound: :param variant_type: ‘instance’ 或 ‘example’ :type variant_type:str
- Returns:
variant tensor,label根据upper_bound调整后的数据,variant类型
- label_normalization(variant, norm_methods, norm_values, variant_type='instance')[source]¶
对Label进行normalization,instance的label会被修改为norm之后的数值。 :param variant: 输入数据,必须是 variant 类型 :type variant:
Tensor
:param norm_methods: normlization的方法,例如log,scale,repow,scalelog :type norm_methods:List[str]
:param norm_values: 对应normalization方法使用的norm_value,长度需要与norm_methods保持一致 :type norm_values:List[float]
:param variant_type: ‘instance’ 或 ‘example’ :type variant_type:str
- Returns:
variant tensor,label根据upper_bound调整后的数据,variant类型
- use_field_as_label(variant, field_name, overwrite_invalid_value=False, label_threshold=7200, variant_type='instance')[source]¶
用line_id里的field作为新的label。 :param variant: 输入数据,必须是 variant 类型 :type variant:
Tensor
:param overwrite_invalid_value: 是否对新field进行overwrite,如果overwrite会在value >= label_threshold时overwrite成0。 :type overwrite_invalid_value:bool
:param label_threshold: 对新field进行overwrite的threshold值,如果value >= label_threshold则改写为0。 :type label_threshold:List[float]
:param variant_type: ‘instance’ 或 ‘example’ :type variant_type:str
- Returns:
variant tensor,label根据upper_bound调整后的数据,variant类型