Data¶
datasets¶
- class PBDataset(*args, **kwargs)¶
Monolith 训练阶段的数据源头,它从标准输入/磁盘文件中读取数据。 其后可串联
Dataset
(如negative_gen
)对样本进行改写,也可使用filter_by_fids
,filter_by_feature_value
,negative_sample
等 Op 过滤样本。- Parameters:
*args – 用户无需关心,由调度侧自动注入
**kwargs – 用户无需关心,由调度侧自动注入
- Raises:
TypeError – 如果有任何参数与类型不匹配, 则抛TypeError
ValueError – 如果有任何值与期望不匹配, 则抛ValueError
- negative_gen(self, neg_num, per_channel=False, channel_feature='', item_features=[], start_num=500, max_item_num=100000, positive_label=1, negative_label=-1, negative_action=-99999, positive_actions=[], label_index=0, action_priority='', index_feature='', throw_origin=False, throw_origin_neg=False, cache_only_pos=False, cache_negative_actions=[], real_neg_instance_weight=1.0, sampled_neg_instance_weight=-1.0, unbias_sampled_neg=True, origin_neg_in_pool_proba=1.0, neg_sample_declay_factor=1.0, easy_hard_ratio=0.0, **kwargs)¶
- 如果样本中只有正例,没有负例,
negative_gen
可根据需要生成随机负例。推荐系统中的样本通常是由 user 侧,item 侧两部分组成。 这里的做法是: 先收集每个样本的 item 侧信息,生成一个 item 池子
item 池子并不是平铺的,而是按某个特征(
channel_feature
)分类组织。 如果在同一个 channel 随机取 item 得到的是 hard 负例,在其它 channel 中抽样得到的是 easy 负例并不是一开始就生成负例,而是要等 item 池子积累到一定大小才开始生成负例
Example
>>> from monolith.native_training.data import PBDataset >>> dataset = PBDataset() >>> dataset = dataset.negative_gen(neg_num=10, ... channel_feature="f_page", ... item_features=["f_item_id", "f_item_cat"], ... per_channel=True, ... start_num=1000, ... max_item_num=5000, ... negative_action=int(Actions.RANDOM_NEG), ... positive_actions=[int(Actions.CLICK)])
- Parameters:
neg_num (
int
) – 为一个正例生成`neg_num`个负例per_channel (
bool
) – 是否分桶channel_feature (
string
) – 用于当 item 分桶的字段item_features – (
List[str]
): item 侧的特征名列表start_num (
int
) – 在 item 池子中积累多少个后才开始采样max_iten_num (
int
) – 每一个 channel(如果分桶)最多收集多少个 itempositive_actions (
List[int]
) – 判定样本是否为正例的 action 值列表,若样本的action ∈ positive_actions
,则该样本是正例。如果样本的 action 有多个值,可以通过action_priority
指定优先级, 选择优先级最高的一个。negative_action (
int
) – 对于新生成的负例会为其 action 添加该negative_action
值,即如果negative_action!=-99999
,则新生成的样本 action 值为negative_action
positive_label (
int
) – 判定样本是否为正例的 label 值,若样本的label[label_index] == positive_label
,则该样本是正例。注意:它的优先级低于positive_actions
,也就是两者同时指定时以positive_actions
为准。label_index (
int
) – 默认值为 0,根据样本的 label 值判定样本是否是正例时,如果样本 label 有多个,则用label_index
指定真正起作用的 label。negative_label (
int
) – 对于新生成的负例会为其 label 添加该negative_label
值cache_only_pos (
bool
) – 原始负例是否入池,cache_only_pos=True
,原始负例不入池,负例池中的 item features 都来自原始正例。easy_hard_ratio (
float
) – 当使用 per_channel 的时候,hard 和 easy 负例之间的比例。取值在 0 ~ 1 之间。举例:0.8就是大致 80% easy负例
Warning
positive_actions
与positive_label
不要同时指定,同时指定时以positive_actions
为准。
- Raises:
TypeError – 如果有任何参数与类型不匹配, 则抛TypeError
ValueError – 如果有任何值与期望不匹配, 则抛ValueError
- 如果样本中只有正例,没有负例,
- transform(self, t, **kwargs)¶
新版样本过滤/改写逻辑,在过滤条件复杂时比原来的写法更加高效。它接受一个
Transform
对象,然后执行该对象所定义的过滤改写逻辑。Example
>>> from monolith.native_training.data import PBDataset, filter_by_fids, transforms >>> # 原来的写法 >>> def filter_fn(variant): ... return filter_by_fids(variant, has_fids=[100], has_actions=[1, 2]) >>> >>> dataset = PBDataset() >>> dataset = dataset.filter(filter_fn) >>> >>> # 新版本写法,两者过滤逻辑等价 >>> dataset = PBDataset() >>> dataset = dataset.transform( ... transforms.Compose([ ... transforms.FilterByFid(has_fids=[100]), ... transforms.FilterByAction(has_actions=[1, 2]) ... ]))
Note
dataset.transform(...)
目前仅支持过滤行为,负例生成仍然需要使用dataset.negative_gen(...)
- Parameters:
t (
Transform
) – 样本改写方式- Raises:
TypeError – 如果有任何参数与类型不匹配,则抛TypeError
ValueError – 如果有任何值与期望不匹配,则抛ValueError
dataset ops¶
- filter_by_fids(variant, filter_fids=None, has_fids=None, select_fids=None, has_actions=None, req_time_min=0, select_slots=None, variant_type='instance')¶
根据离散特征的 FID 值进行过滤
- Parameters:
variant (
Tensor
) – 输入数据,必须是variant
类型filter_fids (
List[int]
) – 任意一个FID出现filter_fids
中,则样本被过滤has_fids (
List[int]
) – 任意一个FID出现在has_fids
中,则样本被选择select_fids (
List[int]
) – 所有select_fids
均出现在样本中,样本才被选择has_actions (
List[int]
) – 样本actions
字段任意一个值出现在has_actions
中,则样本被选择req_time_min (
int
) – 请求时间最小值select_slots (
List[int]
) – 所有select_slots
均出现在样本中,样本才被选择
Warning
select_slots
即将被废弃,尽量不要使用
Example
>>> from monolith.native_training.data import PBDataset, filter_by_fids >>> def filter_fn(variant): ... return filter_by_fids(variant, has_actions=[1, 2], filter_fids=[100]) >>> >>> dataset = PBDataset() >>> dataset = dataset.filter(filter_fn)
- Returns:
True 表示样本被保留,False 表示样本被丢弃
- Return type:
bool
- filter_by_feature_value(variant, field_name, op, operand, field_type, keep_empty=False, operand_filepath=None)¶
根据连续特征值进行过滤
Example
>>> from monolith.native_training.data import PBDataset, filter_by_feature_value >>> def filter_fn(variant): ... return filter_by_feature_value(variant, field_name='__meta__att_traced', op='neq', operand=1, field_type='int64') >>> >>> dataset = PBDataset() >>> dataset = dataset.filter(filter_fn)
- Parameters:
variant (
Tensor
) – 输入数据,必须是variant
类型field_name (
List[int]
) – 样本在指定 field_name 进行过滤op (
str
) – 比较运算符,可以是 gt/ge/eq/lt/le/neq/between/in/not-in 等 布尔运算,也可以是 all/any/diff 等集合布尔运算operand (
Union[float, int, str, List[float], List[int], List[str]]
) – 操作数,用于比较,可以为值或者 Listkeep_empty (
bool
) – 默认 Falsefield_type (
str
) – 需要显式指定字段类型,可以为int64/float/double/bytes/string
(bytes
orstring
,含义相同)
- Returns:
True 表示样本被保留,False 表示样本被丢弃
- Return type:
bool
- negative_sample(variant, drop_rate, label_index=0, threshold=0.0, variant_type='instance', action_priority=None, per_action_drop_rate=None)¶
负例采样
Example
>>> from monolith.native_training.data import PBDataset, negative_sample >>> def filter_fn(variant): ... return negative_sample(dataset, drop_rate=0.5, label_index=0, threshold=0) >>> >>> dataset = PBDataset() >>> dataset = dataset.filter(filter_fn)
- Parameters:
variant (
Tensor
) – 输入数据,必须是variant
类型drop_rate (
float
) – 负例丢弃比例,取值区间为[0, 1),sample_rate = 1 - drop_ratelabel_index (
int
) – 样本中 label 字段是一个float
数组,label_index 表示本次启用哪一个 index 对应的 labelthreshold (
float
) – label 值大于threshold
的是正样本
- Returns:
True 表示样本被保留,False 表示样本被丢弃
- Return type:
bool
transforms¶
- class Compose(transforms)¶
Composes several transforms together.
- Parameters:
transforms (
List[Transform]
) – list of transforms to compose.
Example
>>> from monolith.native_training.data import transforms >>> transforms.Compose([ ... transforms.FilterByFid(has_fids=[100]), ... transforms.FilterByAction(has_actions=[1, 2]), ... ])
- class FilterByFid(has_fids=None, filter_fids=None, select_fids=None, feature_name=None)¶
根据 Sparse Feature 的 FID 值进行过滤
- Parameters:
has_fids (
List[int]
) – 包含给定任意一个 FID 值,则样本被保留。filter_fids (
List[int]
) – 包含给定任意一个 FID 值,则样本被丢弃。select_fids (
List[int]
) – 包含给定所有 FID 值,样本才被保留。
Example
>>> from monolith.native_training.data import transforms >>> transforms.FilterByFid(has_fids=[100])
- class FilterByAction(has_actions=None)¶
根据 action 的值进行过滤
- Parameters:
has_actions (
List[int]
) – 包含给定任意一个 action 值,则样本被保留。
Example
>>> from monolith.native_training.data import transforms >>> transforms.FilterByAction(has_actions=[1, 2])
- class NegativeSample(drop_rate, label_index, threshold)¶
负例采样
- Parameters:
drop_rate (
float
) – 负例丢弃比例,取值区间为[0, 1),sample_rate = 1 - drop_ratelabel_index (
int
) – 样本中 labels 是一个列表,label_index表示本次启用哪一个 index 对应的 labelthreshold (
float
) – label 值大于threshold
的是正样本
Example
>>> from monolith.native_training.data import transforms >>> transforms.NegativeSample(drop_rate=0.5, label_index=0, threshold=0)
- class FilterByFeature(field_name, op, operand, field_type, keep_empty=False)¶
根据给定 dense 特征值过滤
- Parameters:
field_name (
List[int]
) – 样本在指定 field_name 进行过滤op (
str
) – 比较运算符,可以是 gt/ge/eq/lt/le/neq/between/in/not-in 等布尔运算,也可以是 all/any/diff 等集合布尔运算operand (
float
) – 操作数,用于比较,可以为值或者 Listkeep_empty (
bool
) – 默认 Falsefield_type (
str
) – 需要显式指定字段类型,可以为int64/float/double/bytes/string
(bytes
orstring
,含义相同)
Example
>>> from monolith.native_training.data import transforms >>> transforms.FilterByFeature(field_name='__meta__att_traced', ... op='neq', ... operand=1, ... field_type='int64')
- class LogicalOr(x, y)¶
对两个
Transform
进行或操作- Parameters:
x (
Transform
) – left hand sideTransform
y (
Transform
) – right hand sideTransform
Example
>>> from monolith.native_training.data import transforms >>> transforms.LogicalOr(transforms.FilterByFid(has_fids=[100]), ... transforms.FilterByAction(has_actions=[1, 2]))
- class LogicalAnd(x, y)¶
对两个
Transform
进行与操作- Parameters:
x (
Transform
) – left hand sideTransform
y (
Transform
) – right hand sideTransform
Example
>>> from monolith.native_training.data import transforms >>> transforms.LogicalAnd(transforms.FilterByFidFilterByFid(has_fids=[100]), ... transforms.FilterByAction(has_actions=[1, 2]))
- class LogicalNot(x)¶
对
Transform
取反- Parameters:
x (
Transform
) –Transform
操作
Example
>>> from monolith.native_training.data import transforms >>> transforms.LogicalNot(transforms.FilterByFid(has_fids=[100]))
parsers¶
- parse_examples(tensor, sparse_features, dense_features=None, dense_feature_shapes=None, dense_feature_types=None, extra_features=None, extra_feature_shapes=None)¶
将 List[Example] variant/string 解析成 Dict[str, Tensor] 以便后续入图
Example 格式中,所有特征均存于 feature 中,没有平铺特征。Sparse 特征由于长度不固定,输出
tf.RaggedTensor
,其他特征输出tf.Tensor
Example
>>> from monolith.native_training.data import PBDataset, parse_examples >>> def parser(tensor): ... extra_features = ['sample_rate', 'actions'] ... extra_feature_shapes = [1] * len(extra_features) ... features = parse_examples(tensor, ... sparse_features=VALID_FNAMES, ... extra_features=extra_features, ... extra_feature_shapes=extra_feature_shapes) ... if mode != tf.estimator.ModeKeys.PREDICT: ... actions = tf.reshape(features['actions'], shape=(-1,)) ... features['label'] = tf.where(tf.math.equal(actions, int(Actions.CLICK)), ... tf.ones_like(actions, dtype=tf.float32), ... tf.zeros_like(actions, dtype=tf.float32)) ... features['sample_rate'] = tf.reshape(features['sample_rate'], shape=(-1,)) ... return features >>> >>> dataset = PBDataset() >>> dataset = dataset.shuffle(100).batch(256, drop_remainder=True) >>> dataset = dataset.map(parser)
- Parameters:
tensor (
tf.Tensor
) – 输入样本sparse_features (
List[str]
) – 稀疏特征名称,可以有多个dense_features (
List[str]
) – 稠密特征(或 Label)名称,可以有多个,也可以有不同类型dense_feature_shapes (
List[int]
) – 稠密特征名称的 shapedense_feature_types (
List[dtype]
) – 稠密特征名称的数据类型,默认为tf.float32
extra_features (
List[str]
) – 主要指 LineId 中的字段,可以有多个,Monolith会自动从 LineId 中提取数据类型extra_feature_shapes (
List[int]
) – extra 特征名称的 shape
Warning
解析
extra_features
时不需要指定其字段类型,因为框架可以从LineId
proto 定义中预先获取。解析
dense_features
字段类型只能由用户来指定,只有用户清楚其数据类型
- Returns:
Dict[str, Tensor] 解析出特征名到特征的字典
- parse_example_batch(tensor, sparse_features, dense_features=None, dense_feature_shapes=None, dense_feature_types=None, extra_features=None, extra_feature_shapes=None)¶
将 ExampleBatch variant/string tensor 解析成 Dict[str, Tensor] 以便后续入图
ExampleBatch 格式中,所有特征均存于 feature 中,没有平铺特征。Sparse 特征由于长度不固定,输出
tf.RaggedTensor
,其他特征输出tf.Tensor
Example
>>> from monolith.native_training.data import PBDataset, parse_example_batch >>> def serving_input_receiver_fn(self): ... receiver_tensors = {} ... examples_placeholder = tf.compat.v1.placeholder(dtype=tf.string, ... shape=(None,)) ... receiver_tensors["example_batch"] = examples_placeholder ... parsed_results = parse_example_batch(examples_placeholder, ... sparse_features=VALID_FNAMES) ... return tf.estimator.export.ServingInputReceiver(parsed_results, ... receiver_tensors)
- Parameters:
tensor (
tf.Tensor
) – 输入样本sparse_features (
List[str]
) – 稀疏特征名称,可以有多个dense_features (
List[str]
) – 稠密特征(或 Label)名称,可以有多个,也可以有不同类型dense_feature_shapes (
List[int]
) – 稠密特征名称的 shapedense_feature_types (
List[dtype]
) – 稠密特征名称的数据类型,默认为tf.float32
extra_features (
List[str]
) – 主要指 LineId 中的字段,可以有多个,Monolith 会自动从 LineId 中提取数据类型extra_feature_shapes (
List[int]
) – extra 特征名称的 shape
Warning
解析
extra_features
时不需要指定其字段类型,因为框架可以从LineId
proto 定义中预先获取。解析
dense_features
字段类型只能由用户来指定,只有用户清楚其数据类型
- Returns:
Dict[str, Tensor] 解析出特征名到特征的字典