This website requires JavaScript.

搜狗用户查询日志处理

本文使用PySpark对搜狗的用户查询日志进行简单关性排序处理。

数据源

来自搜狗实验室,完整文件大小2G。 数据格式为(官方说的时间这个字段是没有的,另外返回结果排名和用户点击顺序号相隔的是空格!。。。坑!!!) 用户ID\t[查询词]\t该URL在返回结果中的排名 用户点击的顺序号\t用户点击的URL 其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID

启动Spark

## 如果启动出错检查下 yarn.scheduler.maximum-allocation-mb 和 yarn.nodemanager.resource.memory-mb设置。
## 任务执行时间过长会被系统Kill可以修改spark.dynamicAllocation.executorIdleTimeout。
pyspark --executor-memory 3g

搜索结果排名第1,但是点击次序排在第2的数据有多少

rdd1 = sc.textFile("/user/cloudera/SogouQ/SogouQ")
rdd2=rdd1.map(lambda x: x.split("\t")).filter(lambda x: len(x)==4).cache()
rdd2.count()
rdd3=rdd2.map(lambda x: x[2].split(" ")).cache()

def is_number(var):
    try:
        if int(var[0])==4 and int(var[1])==2 :
            return True
        else:
            return False
    except Exception:
        return False

rdd4=rdd3.filter(is_number).cache()
rdd4.take(5)
rdd4.toDebugString

session查询次数排行榜

from operator import add
rdd1 = sc.textFile("/user/cloudera/SogouQ/SogouQ")
rdd2=rdd1.map(lambda x: x.split("\t")).filter(lambda x: len(x)==4)
rdd3=rdd2.map(lambda x: (x[0],1)).reduceByKey(add).cache()
map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
rdd4=rdd3.map(lambda (x, y): (y, x)).sortByKey(False).map(lambda (x, y): (y, x))
rdd4.saveAsTextFile("/user/cloudera/SogouQ/output")

获取文件

hadoop fs -getmerge /user/cloudera/SogouQ/output result
0条评论
avatar