发布人:王东新(资深开发工程师)、王新春(高级研发经理)、陆家凡(资深算法工程师)、谢章华(高级算法工程师)、詹益峰(资深开发工程师),来自唯品会 AI 团队
唯品会 (NYSE:VIPS) 成立于 2008 年 8 月,总部设在中国广州,旗下网站于同年 12 月 8 日上线。唯品会主营业务为互联网在线销售品牌折扣商品,涵盖名品服饰鞋包、美妆、母婴、居家、生活等全品类。
唯品会在中国开创了“名牌折扣+限时抢购+正品保障”的创新电商模式,并持续深化为“精选品牌+深度折扣+限时抢购”的正品特卖模式,每天准点上线数百个正品品牌特卖,为消费者提供超值的购物惊喜。2019 年 7 月唯品会通过收购杉杉奥莱,将线上特卖和线下特卖开始进行深度整合,打造全渠道的特卖体系。
唯品会 AI 平台为搜索、推荐、广告、自然语言、图片、风控等核心算法应用提供了公司级一站式的服务平台。搜索、推荐、广告等能通过提升用户购买的转化率,最终提升公司销售的 GMV;也能帮助用户解决信息过载的情况下,快速找到自己感兴趣的商品,增加用户购买的满意度。
在支持搜索、推荐、广告等场景的排序模型训练时,遇到了如下问题:
1. 对于电商搜推广场景的排序模型的特征选型,时下业界多采用大规模离散 ID 的思路,ID 特征(商品 ID、用户 ID、品牌 ID 等)规模大且稀疏,原生 TF 框架不适用。
2. TensorFlow variable 是固定大小的,不能动态增加 ID。
为了解决上面的问题,唯品会 AI 平台引进了 TFRA(TensorFlow Recommenders Addons),这款开源 TensoFlow 软件包,解决了大规模稀疏域隔离的训练方案,支持从训练、评估、到在线预估。
我们选择这款软件,主要有以下几个原因:
1. 支持动态 Embedding 特征动态水平扩展(TB 级别)。
2. TFRA API 兼容 Tensorflow 生态,不改变算法工程师建模习惯。
唯品会 AI 平台介绍
在唯品会,搜索推荐广告等场景有上百个,如搜索商品排序、推荐商品排序、品牌排序等核心的场景。通过 AI 平台能力来快速支持和满足业务的需求,提供了实时离线数据处理,特征工程,召回引擎,粗排精排,训练平台、在线预估、策略平台等端到端一站式的基础服务平台。
-
**基础设施:**提供了统一的云容器化平台,支持 CPU/GPU/25GE/RDMA 等不同的硬件组件;对应用发布、服务路由、监控中心等统一基础设施。
-
**数据处理:**对用户、商品、行为等数据,进行离线和实时处理;为了提高开发效率,支持可配置和自定义的特征工程;同时,也提供了元数据治理和数据服务的数据平台。
-
**召回引擎:**提供了倒排索引,向量索引,和 KV 引擎,支持对文本、语义、用户行为、知识图谱融合等不同场景的召回引擎。
-
**ABT / 实验平台:**实验平台是提供完整的实验生命周期管理流程、科学可靠的多层分流算法、强大的实验实时数据分析能力,从而帮助用户快速验证产品方案、运营策略、优化算法等,进而助力业务有效增长;ABT 核心是分流能力。
-
**策略平台:**提供对算法规则,运营规则、智能规则、创意规则等策略管理平台。
-
**模型训练:**支持超大规模深度机器学习训练和在线学习平台,对训练、评估、模型导出等不同阶段的生命周期管理,完善的监控指标体系,辅助用户分析和定位问题。
-
**模型预估:**提供预估服务统一入口,支持深度学习 TF Serving + TRT;对服务可进行版本管理、分发、灰度、回滚、流量录制回放等体系化的管理。
TFRA 动态 Embedding 组件在搜索推荐广告场景中的实践
2.1 业务介绍
TFRA 动态 Embedding 组件,已经应用到搜索推荐广告等十几个场景中。这次主要介绍搜索商品排序模型,特征体系包括用户基础属性特征、用户偏好特征、用户长期行为特征、用户实时行为特征、query 意图特征、商品 ID 特征、商品统计特征、context 特征和交叉特征。
2.2 ID 动态 Embedding 应用到 MMoE 网络结构
- ID 动态 Embedding 定义
ID 有 userID、ageID、itemID、queryID、goodID、contextID、spuID、crossID 等。结合 TFRA 动态 embedding 设计 embedding ID 2^64 时,2^12 空间表示特征的类别,2^52 空间表示特征 hash 值;如:不同的 userID,2^12 空间都是 000111110100,表示同一个类别,而 2^52 空间就是存放不同 userID 的 hash 值。每个 embedding ID 对应一个 embedding vector 存储在cuckoohash_map。
- MMoE 网络结构定义
MMoE 模型,全称为:Modeling Task Relationships in Multi-task Learning with Multi-gate Mixture-of-Experts,论文网址为:https://dl.acm.org/doi/epdf/10.1145/3219819.3220007。
在 MMoE 网络结构中,输入层有 User、Item、Query 等特征,都经过 Embedding 层。Embedding 层使用的是 TFRA 动态 Embedding,所有的 Embedding 进行拼接和扁平,把数据经过多个专家网络、门控网络以及自定义的神经网络,能得到点击率、加购率、转化率等。
2.3 代码示例
导入 TFRA 的 dynamic_embedding 组件
import tensorflow_recommenders_addons.dynamic_embedding as dynamic_embedding
创建一个动态 Embedding 变量
deep_dynamic_variables = dynamic_embedding.get_variable(
name="deep_dynamic_embeddings",
initializer=initializer,
dim=embedding_size,
devices=ps_list,
trainable=is_training,
partitioner=addone_partition_fn,
checkpoint=True
)
调用 embedding_lookup API 得到 embedding
dynamic_embedding.embedding_lookup(
params=deep_dynamic_variables,
ids=sparse_unique_ids,
name="deep_sparse_weights"
)
使用 TensorFlow Adam 优化器训练
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
optimizer = tfra.dynamic_embedding.DynamicEmbeddingOptimizer(optimizer)
训练和保存模型
estimator = tf.estimator.Estimator(
model_fn=model_fn
, model_dir=model_dir
, params=config
, config=model_config
, warm_start_from=ws
)
train_spec = tf.estimator.TrainSpec(
input_fn=lambda: dataset.input_fn_shard(
train_files
, batch_size=config["parameters"]["batch_size"]
, num_epochs=config["parameters"]["num_epochs"]
, task_number=task_number
, task_index=task_index)
, max_steps=config["parameters"]["max_steps"], hooks=[log_hook])
eval_spec = tf.estimator.EvalSpec(
input_fn=lambda: dataset.input_fn_shard(
eval_path
, batch_size=config["parameters"]["batch_size"]
, num_epochs=config["parameters"]["num_epochs"]
, task_number=task_number
, task_index=task_index)
, steps=2000
, start_delay_secs=100
, throttle_secs=120
)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
2.4 训练到在线预估
从上图看,从训练,评估,到在线预估都是例行化的。
1. 训练任务,是由调度平台触发,也可以手动触发来调试验证;
2. 经过离线评估,如果评估达到预期的 AUC,就可以生成推送模型版本,并且通过模型管理 API 汇报可发布的版本给平台;
3. 在线预估实例,由 Agent 调用模型管理 API 获取最新版本的或者指定版本的模型数据,拉取模型文件,更新模型配置文件,然后加载模型。
从训练到在线预估,我们 TensorFlow 版本选择的是 2.4.1,TF Serving 版本选择的是 2.4.0,TFRA 版本是社区 v0.1.0。
2.5 一致性验证
验证系统的正确性,保证离线的优化成果可以在线上得到正确且完整的体现。一致性验证包括:离线一致性验证和在线一致性验证:
1. 离线一致性,使用离线数据,对比离线评估打分(线 1)和 TF Serving 打分(线 2)的一致性,来确保离线打分一致性。
2. 在线一致性,在线数据会录制一份离线数据,离线评估打分(线 1)是使用离线数据来打分,TF Serving 打分(线 3)是使用在线数据来打分,对比来保证在线打分一致性。
具体讲一讲离线一致性验证:
1. 准备好模型的离线训练数据 (TFRecord);
2. 把数据提供给离线评估,得到离线的打分;
3. 再把数据提供给 TF Serving,得到 Serving 的打分;
4. 对比分数的一致性。
2.6 准备 TFRecord 离线数据
def input_fn(file_name, batch_size, num_epochs, is_shuffle=True):
def _parse(example_proto):
features = tf.io.parse_example(example_proto, feature_spec)
return features
data = tf.data.Dataset.list_files(file_name, shuffle=False)
data = data.apply(
tf.data.experimental.parallel_interleave(
lambda filename: tf.data.TFRecordDataset(
filename,
buffer_size=batch_size * 10
),
sloppy=True,
block_length=1,
cycle_length=16
)
)
if is_shuffle:
data = data.shuffle(buffer_size=batch_size * 10)
data = data.prefetch(buffer_size=batch_size * 10)
data = data.repeat(num_epochs)
data = data.map(_parse, num_parallel_calls=16)
data = data.batch(batch_size)
return data
dataset = input_fn(file_name, batch_size, num_epochs, is_shuffle=True)
获取离线打分
predictions = estimator.predict(
input_fn=lambda: dataset.input_fn_shard(
eval_files
, batch_size=config["parameters"]["batch_size"]
, num_epochs=config["parameters"]["num_epochs"]
, task_number=task_number
, task_index=task_index)
)
2.7 获取 TF Serving 打分
def get_request_tf(features, feature_names):
request = predict_pb2.PredictRequest()
request.model_spec.name = self.name
request.model_spec.signature_name = self.signature_name
for x in feature_names:
request.inputs[x].CopyFrom(tf.make_tensor_proto(features[x])
return request
for features in dataset:
request = client.get_request_tf(features)
result_future = client.stub.Predict.future(
request,
overtime_time
)
response = result_future.result().outputs
2.8 TFRA 带来的效果
对于搜索推荐广告业务,使用 TFRA 动态 Embedding 场景超过 10 个,验证模型数超过 25 个,线上模型数超过 10 个,最好的模型带来的转化率提升超过 10 个点。
实践中的 FAQ
我们在使用 TFRA 的时候也遇到一些问题,因此将之前的一些经验进行了总结并整理如下,希望对未来想将 TFRA 落地在实际业务上的同学有所帮助。
3.1 评估 AUC 比训练 AUC 低 5 个百分点
# 错误版本
sparse_weights = tf.dynamic_embedding.embedding_lookup(
params=deep_dynamic_variables,
ids=tf.reshape(ft_sparse_idx, [-1]),
name="deep_sparse_weights"
)
# 正确版本
# 是由于重复 id 未被正确更新,出现了相互覆盖的情况
# 根据社区开发者反馈,下一个版本将增加 dynamic_embedding.embedding_lookup_unique API,方便用户简化代码。PR:https://github.com/tensorflow/recommenders-addons/pull/62
sparse_unique_ids, sparse_unique_idx, _ = tf.unique_with_counts(tf.reshape(ft_sparse_idx, [-1]))
sparse_weights = tf.dynamic_embedding.embedding_lookup(
params=deep_dynamic_variables,
ids=sparse_unique_ids,
name="deep_sparse_weights"
)
deep_sparse_weights = tf.gather(sparse_weights, sparse_unique_idx)
3.2 模型训练阶段出现 Loss 震荡
# 错误版本
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
name="deep_dynamic_embeddings",
devices=None,
initializer=tf.compat.v1.random_normal_initializer(0, 0.005),
dim=embedding_size,
trainable=is_training
)
# 正确代码
# 问题原因是由于 devices 应该设置为 ps list,否则 embedding 参数保存 worker 本地,导致 embedding 参数没有正确更新
devices_info = ["/job:ps/replica:0/task:{}/CPU:0".format(i) for i in range(params["parameters"]["ps_nums"])]
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
name="deep_dynamic_embeddings",
devices=devices_info,
initializer=tf.compat.v1.random_normal_initializer(0, 0.005),
dim=embedding_size,
trainable=is_training
)
3.3 模型导出时出现参数丢失
# 错误版本
devices_info = ["/job:ps/replica:0/task:{}/CPU:0".format(i) for i in range(params["parameters"]["ps_nums"])]
logging.info("------ dynamic_embedding devices_info is {}-------".format(devices_info))
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
name="deep_dynamic_embeddings",
devices=devices_info,
initializer=tf.compat.v1.random_normal_initializer(0, 0.005),
dim=embedding_size,
trainable=is_training
)
# 正确代码
# 问题原因是由于 estimator 推理是本地模式,无分布式 ps,需要为 dynamic_embedding.Variable 设置不同的 devices
if is_training:
devices_info = ["/job:ps/replica:0/task:{}/CPU:0".format(i) for i in range(params["parameters"]["ps_nums"])]
else:
devices_info = ["/job:localhost/replica:0/task:{}/CPU:0".format(0) for i in range(params["parameters"]["ps_nums"])]
logging.info("------ dynamic_embedding devices_info is {}-------".format(devices_info))
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
name="deep_dynamic_embeddings",
devices=devices_info,
initializer=tf.compat.v1.random_normal_initializer(0, 0.005),
dim=embedding_size,
trainable=is_training
)
3.4 空请求出现打分值一直变化
# 错误版本
if is_training:
devices_info = ["/job:ps/replica:0/task:{}/CPU:0".format(i) for i in range(params["parameters"]["ps_nums"])]
initializer = tf.compat.v1.truncated_normal_initializer(0.0, 1e-2)
else:
devices_info = ["/job:localhost/replica:0/task:{}/CPU:0".format(0) for i in range(params["parameters"]["ps_nums"])]
logging.info("------ dynamic_embedding devices_info is {}-------".format(devices_info))
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
name="deep_dynamic_embeddings",
devices=devices_info,
initializer=initializer,
dim=embedding_size,
trainable=is_training
)
# 正确代码
# 问题原因是由于推理时会遇到训练过程中未见过的特征,此时需要设置初始化器为 zero 类型,
# 避免未知 id 随机 embedding 干扰打分结果。
if is_training:
devices_info = ["/job:ps/replica:0/task:{}/CPU:0".format(i) for i in range(params["parameters"]["ps_nums"])]
initializer = tf.compat.v1.truncated_normal_initializer(0.0, 1e-2)
else:
devices_info = ["/job:localhost/replica:0/task:{}/CPU:0".format(0) for i in range(params["parameters"]["ps_nums"])]
initializer = tf.compat.v1.zeros_initializer()
logging.info("------ dynamic_embedding devices_info is {}-------".format(devices_info))
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
name="deep_dynamic_embeddings",
devices=devices_info,
initializer=initializer,
dim=embedding_size,
trainable=is_training
)
未来展望
从社区和公司应用角度来看,我们期待在不久的将来可以实现:
- 超大动态 Embedding(TB级别)支持外部 KV 存储。
- GPU Embedding Cache 方案(hot embedding 放显存,cold embedding 放内存)。
致谢
TensorFlow Recommenders-Addons 是 SIG Recommenders-Addons 人员共同努力的成果。非常感谢腾讯微信看一看团队的戎海栋、程川、张亚霏提供的帮助和技术支持。
同时我们还要感谢 TensorFlow 团队成员:
- 李双峰、魏巍、Thea Lamkin 和 Joana Carrasqueira 在社区工作上的推动和支持。
- 周玥枫、谭振宇的技术建议和方案评审。