新闻个性化推荐系统源代码之基于热门文章和新文章的在线召回


新闻个性化推荐系统源代码之基于热门文章和新文章的在线召回文章插图
在上篇文章中我们实现了基于内容的在线召回 , 接下来 , 我们将实现基于热门文章和新文章的在线召回 。 主要思路是根据点击次数 , 统计每个频道下的热门文章 , 根据发布时间统计每个频道下的新文章 , 当推荐文章不足时 , 可以根据这些文章进行补足 。
由于数据量较小 , 这里采用 Redis 存储热门文章和新文章的召回结果 , 数据结构如下所示
热门文章召回结构示例popular_recallch:{}:hotch:18:hot
新文章召回结构示例new_articlech:{}:newch:18:new
热门文章存储 , 键为 ch:频道ID:hot 值为 分数 和 文章ID
# ZINCRBY key increment member# ZSCORE# 为有序集 key 的成员 member 的 score 值加上增量 increment。 client.zincrby("ch:{}:hot".format(row['channelId']), 1, row['param']['articleId'])# ZREVRANGE key start stop [WITHSCORES]client.zrevrange(ch:{}:new, 0, -1)新文章存储 , 键为 ch:{频道ID}:new 值为 文章ID:时间戳
# ZADD ZRANGE# ZADD key score member [[score member] [score member] ...]# ZRANGE page_rank 0 -1client.zadd("ch:{}:new".format(channel_id), {article_id: time.time()})热门文章在线召回首先 , 添加 Spark Streaming 和 Kafka 的配置 , 热门文章读取由业务系统发送到 Kafka 的 click-trace 主题中的用户实时行为数据
KAFKA_SERVER = "192.168.19.137:9092"click_kafkaParams = {"metadata.broker.list": KAFKA_SERVER}HOT_DS = KafkaUtils.createDirectStream(stream_c, ['click-trace'], click_kafkaParams)接下来 , 利用 Spark Streaming 读取 Kafka 中的用户行为数据 , 筛选出被点击过的文章 , 将 Redis 中的文章热度分数进行累加即可
client = redis.StrictRedis(host=DefaultConfig.REDIS_HOST, port=DefaultConfig.REDIS_PORT, db=10)def update_hot_redis(self):"""收集用户行为 , 更新热门文章分数:return:"""def update_hot_article(rdd):for data in rdd.collect():# 过滤用户行为if data['param']['action'] in ['exposure', 'read']:passelse:client.zincrby("ch:{}:hot".format(data['channelId']), 1, data['param']['articleId'])HOT_DS.map(lambda x: json.loads(x[1])).foreachRDD(update_hot_article)测试 , 写入用户行为日志
echo {\"actionTime\":\"2019-04-10 21:04:39\",\"readTime\":\"\",\"channelId\":18,\"param\":{\"action\": \"click\", \"userId\": \"2\", \"articleId\": \"14299\", \"algorithmCombine\": \"C2\"}} >> userClick.log查询热门文章
127.0.0.1:6379[10]> keys *1) "ch:18:hot"127.0.0.1:6379[10]> ZRANGE "ch:18:hot" 0 -11) "14299"新文章在线召回首先 , 添加 Spark Streaming 和 Kafka 的配置 , 新文章读取由业务系统发送到 Kafka 的 new-article 主题中的最新发布文章数据
NEW_ARTICLE_DS = KafkaUtils.createDirectStream(stream_c, ['new-article'], click_kafkaParams)【新闻个性化推荐系统源代码之基于热门文章和新文章的在线召回】接下来 , 利用 Spark Streaming 读取 Kafka 的新文章 , 将其按频道添加到 Redis 中 , Redis 的值为当前时间
defupdate_new_redis(self):"""更新频道最新文章:return:"""def add_new_article(rdd):for row in rdd.collect():channel_id, article_id = row.split(',')client.zadd("ch:{}:new".format(channel_id), {article_id: time.time()})NEW_ARTICLE_DS.map(lambda x: x[1]).foreachRDD(add_new_article)还需要在 Kafka 的启动脚本中添加 new-article 主题监听配置 , 这样就可以收到业务系统发送过来的新文章了 , 重新启动 Flume 和 Kafka
/root/bigdata/kafka/bin/kafka-topics.sh --zookeeper 192.168.19.137:2181 --create --replication-factor 1 --topic new-article --partitions 1测试 , 向 Kafka 发送新文章数据
from kafka import KafkaProducer# kafka消息生产者kafka_producer = KafkaProducer(bootstrap_servers=['192.168.19.137:9092'])# 构造消息并发送msg = '{},{}'.format(18, 13891)kafka_producer.send('new-article', msg.encode())查看新文章
127.0.0.1:6379[10]> keys *1) "ch:18:hot"2) "ch:18:new"127.0.0.1:6379[10]> ZRANGE "ch:18:new" 0 -11) "13890"2) "13891"最后 , 修改 online_update.py , 加入基于热门文章和新文章的在线召回逻辑 , 开启实时运行即可
if __name__ == '__main__':ore = OnlineRecall()ore.update_content_recall()ore.update_hot_redis()ore.update_new_redis()stream_sc.start()# 使用 ctrl+c 可以退出服务_ONE_DAY_IN_SECONDS = 60 * 60 * 24try:while True:time.sleep(_ONE_DAY_IN_SECONDS)except KeyboardInterrupt:pass到这里 , 我们就完成了召回阶段的全部工作 , 包括基于模型和基于内容的离线召回 , 以及基于内容、热门文章和新文章的在线召回 。 通过召回 , 我们可以从数百万甚至上亿的原始物品数据中 , 筛选出和用户相关的几百、几千个可能感兴趣的物品 , 后面 , 我们将要进入到排序阶段 , 对召回的几百、几千个物品进行进一步的筛选和排序 。