TFRA 动态 Embedding 组件在唯品会搜索推荐广告场景中的实践

发布人:王东新(资深开发工程师)、王新春(高级研发经理)、陆家凡(资深算法工程师)、谢章华(高级算法工程师)、詹益峰(资深开发工程师),来自唯品会 AI 团队

唯品会 (NYSE:VIPS) 成立于 2008 年 8 月,总部设在中国广州,旗下网站于同年 12 月 8 日上线。唯品会主营业务为互联网在线销售品牌折扣商品,涵盖名品服饰鞋包、美妆、母婴、居家、生活等全品类。

唯品会在中国开创了“名牌折扣+限时抢购+正品保障”的创新电商模式,并持续深化为“精选品牌+深度折扣+限时抢购”的正品特卖模式,每天准点上线数百个正品品牌特卖,为消费者提供超值的购物惊喜。2019 年 7 月唯品会通过收购杉杉奥莱,将线上特卖和线下特卖开始进行深度整合,打造全渠道的特卖体系。

唯品会 AI 平台为搜索、推荐、广告、自然语言、图片、风控等核心算法应用提供了公司级一站式的服务平台。搜索、推荐、广告等能通过提升用户购买的转化率,最终提升公司销售的 GMV;也能帮助用户解决信息过载的情况下,快速找到自己感兴趣的商品,增加用户购买的满意度。

在支持搜索、推荐、广告等场景的排序模型训练时,遇到了如下问题:

1. 对于电商搜推广场景的排序模型的特征选型,时下业界多采用大规模离散 ID 的思路,ID 特征(商品 ID、用户 ID、品牌 ID 等)规模大且稀疏,原生 TF 框架不适用。

2. TensorFlow variable 是固定大小的,不能动态增加 ID。

为了解决上面的问题,唯品会 AI 平台引进了 TFRA(TensorFlow Recommenders Addons),这款开源 TensoFlow 软件包,解决了大规模稀疏域隔离的训练方案,支持从训练、评估、到在线预估。

我们选择这款软件,主要有以下几个原因:

1. 支持动态 Embedding 特征动态水平扩展(TB 级别)。

2. TFRA API 兼容 Tensorflow 生态,不改变算法工程师建模习惯。

唯品会 AI 平台介绍

在唯品会,搜索推荐广告等场景有上百个,如搜索商品排序、推荐商品排序、品牌排序等核心的场景。通过 AI 平台能力来快速支持和满足业务的需求,提供了实时离线数据处理,特征工程,召回引擎,粗排精排,训练平台、在线预估、策略平台等端到端一站式的基础服务平台。

WeChat70984220f7c6d4d98c76a18f6fdc64ae

  • **基础设施:**提供了统一的云容器化平台,支持 CPU/GPU/25GE/RDMA 等不同的硬件组件;对应用发布、服务路由、监控中心等统一基础设施。

  • **数据处理:**对用户、商品、行为等数据,进行离线和实时处理;为了提高开发效率,支持可配置和自定义的特征工程;同时,也提供了元数据治理和数据服务的数据平台。

  • **召回引擎:**提供了倒排索引,向量索引,和 KV 引擎,支持对文本、语义、用户行为、知识图谱融合等不同场景的召回引擎。

  • **ABT / 实验平台:**实验平台是提供完整的实验生命周期管理流程、科学可靠的多层分流算法、强大的实验实时数据分析能力,从而帮助用户快速验证产品方案、运营策略、优化算法等,进而助力业务有效增长;ABT 核心是分流能力。

  • **策略平台:**提供对算法规则,运营规则、智能规则、创意规则等策略管理平台。

  • **模型训练:**支持超大规模深度机器学习训练和在线学习平台,对训练、评估、模型导出等不同阶段的生命周期管理,完善的监控指标体系,辅助用户分析和定位问题。

  • **模型预估:**提供预估服务统一入口,支持深度学习 TF Serving + TRT;对服务可进行版本管理、分发、灰度、回滚、流量录制回放等体系化的管理。

TFRA 动态 Embedding 组件在搜索推荐广告场景中的实践

2.1 业务介绍

TFRA 动态 Embedding 组件,已经应用到搜索推荐广告等十几个场景中。这次主要介绍搜索商品排序模型,特征体系包括用户基础属性特征、用户偏好特征、用户长期行为特征、用户实时行为特征、query 意图特征、商品 ID 特征、商品统计特征、context 特征和交叉特征。

2.2 ID 动态 Embedding 应用到 MMoE 网络结构

  • ID 动态 Embedding 定义

WeChat26501bc2beba8b1912ec3a3fdb85c0a3

ID 有 userID、ageID、itemID、queryID、goodID、contextID、spuID、crossID 等。结合 TFRA 动态 embedding 设计 embedding ID 2^64 时,2^12 空间表示特征的类别,2^52 空间表示特征 hash 值;如:不同的 userID,2^12 空间都是 000111110100,表示同一个类别,而 2^52 空间就是存放不同 userID 的 hash 值。每个 embedding ID 对应一个 embedding vector 存储在cuckoohash_map。

  • MMoE 网络结构定义

MMoE 模型,全称为:Modeling Task Relationships in Multi-task Learning with Multi-gate Mixture-of-Experts,论文网址为:https://dl.acm.org/doi/epdf/10.1145/3219819.3220007

WeChatfb795d1455098c2faa68ae8e99f7bb50

在 MMoE 网络结构中,输入层有 User、Item、Query 等特征,都经过 Embedding 层。Embedding 层使用的是 TFRA 动态 Embedding,所有的 Embedding 进行拼接和扁平,把数据经过多个专家网络、门控网络以及自定义的神经网络,能得到点击率、加购率、转化率等。

2.3 代码示例

导入 TFRA 的 dynamic_embedding 组件

import tensorflow_recommenders_addons.dynamic_embedding as dynamic_embedding

创建一个动态 Embedding 变量

deep_dynamic_variables = dynamic_embedding.get_variable(
    name="deep_dynamic_embeddings",
    initializer=initializer,
    dim=embedding_size,
    devices=ps_list,
    trainable=is_training,
    partitioner=addone_partition_fn,
    checkpoint=True
)

调用 embedding_lookup API 得到 embedding

dynamic_embedding.embedding_lookup(
    params=deep_dynamic_variables,
    ids=sparse_unique_ids,
    name="deep_sparse_weights"
)

使用 TensorFlow Adam 优化器训练

optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
optimizer = tfra.dynamic_embedding.DynamicEmbeddingOptimizer(optimizer)

训练和保存模型

estimator = tf.estimator.Estimator(
   model_fn=model_fn
   , model_dir=model_dir
   , params=config
   , config=model_config
   , warm_start_from=ws
)

train_spec = tf.estimator.TrainSpec(
    input_fn=lambda: dataset.input_fn_shard(
            train_files
            , batch_size=config["parameters"]["batch_size"]
            , num_epochs=config["parameters"]["num_epochs"]
            , task_number=task_number
            , task_index=task_index)
    , max_steps=config["parameters"]["max_steps"], hooks=[log_hook])

eval_spec = tf.estimator.EvalSpec(
     input_fn=lambda: dataset.input_fn_shard(
            eval_path
            , batch_size=config["parameters"]["batch_size"]
            , num_epochs=config["parameters"]["num_epochs"]
            , task_number=task_number
            , task_index=task_index)
     , steps=2000
     , start_delay_secs=100
     , throttle_secs=120
)

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

2.4 训练到在线预估

WeChat4c96455bf51483a9865b78925fa9e670

从上图看,从训练,评估,到在线预估都是例行化的。

1. 训练任务,是由调度平台触发,也可以手动触发来调试验证;

2. 经过离线评估,如果评估达到预期的 AUC,就可以生成推送模型版本,并且通过模型管理 API 汇报可发布的版本给平台;

3. 在线预估实例,由 Agent 调用模型管理 API 获取最新版本的或者指定版本的模型数据,拉取模型文件,更新模型配置文件,然后加载模型。

从训练到在线预估,我们 TensorFlow 版本选择的是 2.4.1,TF Serving 版本选择的是 2.4.0,TFRA 版本是社区 v0.1.0。

2.5 一致性验证

WeChat692d67cfa8aceb4039e68c75fcb83cab

验证系统的正确性,保证离线的优化成果可以在线上得到正确且完整的体现。一致性验证包括:离线一致性验证和在线一致性验证:

1. 离线一致性,使用离线数据,对比离线评估打分(线 1)和 TF Serving 打分(线 2)的一致性,来确保离线打分一致性。

2. 在线一致性,在线数据会录制一份离线数据,离线评估打分(线 1)是使用离线数据来打分,TF Serving 打分(线 3)是使用在线数据来打分,对比来保证在线打分一致性。

具体讲一讲离线一致性验证:

1. 准备好模型的离线训练数据 (TFRecord);

2. 把数据提供给离线评估,得到离线的打分;

3. 再把数据提供给 TF Serving,得到 Serving 的打分;

4. 对比分数的一致性。

2.6 准备 TFRecord 离线数据

def input_fn(file_name, batch_size, num_epochs, is_shuffle=True):
    def _parse(example_proto):
        features = tf.io.parse_example(example_proto, feature_spec)
        return features

    data = tf.data.Dataset.list_files(file_name, shuffle=False)

    data = data.apply(
        tf.data.experimental.parallel_interleave(
            lambda filename: tf.data.TFRecordDataset(
                filename,
                buffer_size=batch_size * 10
            ),
            sloppy=True,
            block_length=1,
            cycle_length=16
        )
    )

    if is_shuffle:
        data = data.shuffle(buffer_size=batch_size * 10)
    data = data.prefetch(buffer_size=batch_size * 10)

    data = data.repeat(num_epochs)
    data = data.map(_parse, num_parallel_calls=16)
    data = data.batch(batch_size)

    return data

dataset = input_fn(file_name, batch_size, num_epochs, is_shuffle=True)

获取离线打分

predictions = estimator.predict(
    input_fn=lambda: dataset.input_fn_shard(
         eval_files
         , batch_size=config["parameters"]["batch_size"]
         , num_epochs=config["parameters"]["num_epochs"]
         , task_number=task_number
         , task_index=task_index)
)

2.7 获取 TF Serving 打分

def get_request_tf(features, feature_names):
    request = predict_pb2.PredictRequest()
    request.model_spec.name = self.name
    request.model_spec.signature_name = self.signature_name
    for x in feature_names:
        request.inputs[x].CopyFrom(tf.make_tensor_proto(features[x])
    return request

for features in dataset:
    request = client.get_request_tf(features)
    result_future = client.stub.Predict.future(
        request,
        overtime_time
    )
    response = result_future.result().outputs

2.8 TFRA 带来的效果

WeChate143e83c335c275b7e0dd957586b3fe4

对于搜索推荐广告业务,使用 TFRA 动态 Embedding 场景超过 10 个,验证模型数超过 25 个,线上模型数超过 10 个,最好的模型带来的转化率提升超过 10 个点。

实践中的 FAQ

我们在使用 TFRA 的时候也遇到一些问题,因此将之前的一些经验进行了总结并整理如下,希望对未来想将 TFRA 落地在实际业务上的同学有所帮助。

3.1 评估 AUC 比训练 AUC 低 5 个百分点

# 错误版本
sparse_weights = tf.dynamic_embedding.embedding_lookup(
    params=deep_dynamic_variables,
    ids=tf.reshape(ft_sparse_idx, [-1]),
    name="deep_sparse_weights"
)
# 正确版本
# 是由于重复 id 未被正确更新,出现了相互覆盖的情况
# 根据社区开发者反馈,下一个版本将增加 dynamic_embedding.embedding_lookup_unique API,方便用户简化代码。PR:https://github.com/tensorflow/recommenders-addons/pull/62
sparse_unique_ids, sparse_unique_idx, _ = tf.unique_with_counts(tf.reshape(ft_sparse_idx, [-1]))
sparse_weights = tf.dynamic_embedding.embedding_lookup(
    params=deep_dynamic_variables,
    ids=sparse_unique_ids,
    name="deep_sparse_weights"
)
deep_sparse_weights = tf.gather(sparse_weights, sparse_unique_idx)

3.2 模型训练阶段出现 Loss 震荡

# 错误版本
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
    name="deep_dynamic_embeddings",
    devices=None,
    initializer=tf.compat.v1.random_normal_initializer(0, 0.005),
    dim=embedding_size,
    trainable=is_training
)
# 正确代码
# 问题原因是由于 devices 应该设置为 ps list,否则 embedding 参数保存 worker 本地,导致 embedding 参数没有正确更新
devices_info = ["/job:ps/replica:0/task:{}/CPU:0".format(i) for i in range(params["parameters"]["ps_nums"])]
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
    name="deep_dynamic_embeddings",
    devices=devices_info,
    initializer=tf.compat.v1.random_normal_initializer(0, 0.005),
    dim=embedding_size,
    trainable=is_training
)

3.3 模型导出时出现参数丢失

# 错误版本
devices_info = ["/job:ps/replica:0/task:{}/CPU:0".format(i) for i in range(params["parameters"]["ps_nums"])]
logging.info("------ dynamic_embedding devices_info is {}-------".format(devices_info))
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
    name="deep_dynamic_embeddings",
    devices=devices_info,
    initializer=tf.compat.v1.random_normal_initializer(0, 0.005),
    dim=embedding_size,
    trainable=is_training
)
# 正确代码
# 问题原因是由于 estimator 推理是本地模式,无分布式 ps,需要为 dynamic_embedding.Variable 设置不同的 devices
if is_training:
    devices_info = ["/job:ps/replica:0/task:{}/CPU:0".format(i) for i in range(params["parameters"]["ps_nums"])]
else:
    devices_info = ["/job:localhost/replica:0/task:{}/CPU:0".format(0) for i in range(params["parameters"]["ps_nums"])]
logging.info("------ dynamic_embedding devices_info is {}-------".format(devices_info))
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
    name="deep_dynamic_embeddings",
    devices=devices_info,
    initializer=tf.compat.v1.random_normal_initializer(0, 0.005),
    dim=embedding_size,
    trainable=is_training
)

3.4 空请求出现打分值一直变化

# 错误版本
if is_training:
    devices_info = ["/job:ps/replica:0/task:{}/CPU:0".format(i) for i in range(params["parameters"]["ps_nums"])]
    initializer = tf.compat.v1.truncated_normal_initializer(0.0, 1e-2)
else:
    devices_info = ["/job:localhost/replica:0/task:{}/CPU:0".format(0) for i in range(params["parameters"]["ps_nums"])]
logging.info("------ dynamic_embedding devices_info is {}-------".format(devices_info))
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
    name="deep_dynamic_embeddings",
    devices=devices_info,
    initializer=initializer,
    dim=embedding_size,
    trainable=is_training
)
# 正确代码
# 问题原因是由于推理时会遇到训练过程中未见过的特征,此时需要设置初始化器为 zero 类型,
# 避免未知 id 随机 embedding 干扰打分结果。
if is_training:
    devices_info = ["/job:ps/replica:0/task:{}/CPU:0".format(i) for i in range(params["parameters"]["ps_nums"])]
    initializer = tf.compat.v1.truncated_normal_initializer(0.0, 1e-2)
else:
    devices_info = ["/job:localhost/replica:0/task:{}/CPU:0".format(0) for i in range(params["parameters"]["ps_nums"])]
    initializer = tf.compat.v1.zeros_initializer()
logging.info("------ dynamic_embedding devices_info is {}-------".format(devices_info))
deep_dynamic_variables = tf.dynamic_embedding.get_variable(
    name="deep_dynamic_embeddings",
    devices=devices_info,
    initializer=initializer,
    dim=embedding_size,
    trainable=is_training
)

未来展望

从社区和公司应用角度来看,我们期待在不久的将来可以实现:

  • 超大动态 Embedding(TB级别)支持外部 KV 存储。
  • GPU Embedding Cache 方案(hot embedding 放显存,cold embedding 放内存)。

致谢

TensorFlow Recommenders-Addons 是 SIG Recommenders-Addons 人员共同努力的成果。非常感谢腾讯微信看一看团队的戎海栋、程川、张亚霏提供的帮助和技术支持。

同时我们还要感谢 TensorFlow 团队成员:

  • 李双峰、魏巍、Thea Lamkin 和 Joana Carrasqueira‎ 在社区工作上的推动和支持。
  • 周玥枫、谭振宇的技术建议和方案评审。

中文:TensorFlow 公众号