User:DCausse (WMF)/PySpark And Search Satisfaction
Appearance
Manipulate data from TestSatisfaction
imports
from pyspark.sql import Row
import requests
import json
from pyspark.sql import functions as F
Filter data from A/B test
# Load the data
data = sc.sequenceFile("/wmf/data/raw/eventlogging/eventlogging_TestSearchSatisfaction2/hourly/2017/10/*/*").map(lambda x: x[1]);
df_all = spark.read.json(data)
df_all = df_all.filter("event.subTest in ('rec_3t_80_66', 'rec_4t_80_66') AND event.source='fulltext'")
Generate click logs
df_searches = df_all.filter('event.action = "searchResultPage"').select(F.col('event.searchToken').alias('tok'), F.col('event.query').alias('query'), F.col('event.didYouMeanVisible').alias('dym'))
df_clicks = df_all.filter(F.col('event.articleId').isNotNull()).filter(F.col('event.searchToken').isNotNull()).select(F.col('event.searchToken').alias('tok'), F.col('event.articleId'))
df_clicklog = df_searches.join(df_clicks, 'tok').dropDuplicates().cache()
Generate a new dataset with articleId, page title and a boolean to see the query could have matched
Function to collect info from elastic
def quelastic(searchQuery, id, dym):
query = {
"_source": [
"_id",
"title"
],
"query": {
"bool": {
"should": [
{
"match": {
"all": {
"_name": "matched",
"query": searchQuery,
"operator": "and",
}
}
}
],
"filter": {
"ids": {
"type": "page",
"values": [id]
},
},
}
}
}
resp = requests.post("http://elastic2010.codfw.wmnet:9200/enwiki_content/_search", data=json.dumps(query))
resp = resp.json()
if len(resp['hits']['hits']) == 0:
return {
"id": id,
"found": False,
"query": searchQuery,
"title": None,
"can_match": False,
"dym": dym,
}
hit = resp['hits']['hits'][0]
return {
"id": id,
"found": True,
"query": searchQuery,
"title": hit['_source']['title'],
"can_match": 'matched_queries' in hit and 'matched' in hit['matched_queries'],
"dym": dym,
}
Now build the dataset:
df_cl_tr = sqlCtx.createDataFrame(df_clicklog.rdd.map(lambda r: Row(**quelastic(r['query'], r['articleId'], r['dym'])))).cache()
# output some examples:
df_cl_tr.filter('found = true').filter('can_match = false').show(10)
+---------+-----+--------+--------------------+--------------------+ |can_match|found| id| query| title| +---------+-----+--------+--------------------+--------------------+ | false| true| 1618529|XYZ | Sand filter| | false| true|11978561|XYZ | Arborfield Garrison| | false| true| 39034|XYZ |J. Robert Oppenhe...| | false| true|31282049|XYZ |Apologies, I Have...| | false| true| 144829|XYZ | Steve McQueen| | false| true| 4384721|XYZ |(Shake, Shake, Sh...| | false| true|44294098|XYZ | Comparison diagram| | false| true| 9087364|XYZ |Islamic State of ...| | false| true|17238607|XYZ | Faroe Islands| | false| true| 27698|XYZ | Sanskrit| +---------+-----+--------+--------------------+--------------------+
# store the data for further processing
df_partial_match = df_cl_tr.filter('found = true').filter('can_match = false')
df_partial_match.write.parquet('/user/dcausse/recall_ab_test_partial_match')