Jump to content

User:DCausse (WMF)/PySpark And Search Satisfaction

From Wikitech

Manipulate data from TestSatisfaction

imports

from pyspark.sql import Row
import requests
import json
from pyspark.sql import functions as F

Filter data from A/B test

# Load the data
data = sc.sequenceFile("/wmf/data/raw/eventlogging/eventlogging_TestSearchSatisfaction2/hourly/2017/10/*/*").map(lambda x: x[1]);
df_all = spark.read.json(data)
df_all = df_all.filter("event.subTest in ('rec_3t_80_66', 'rec_4t_80_66') AND event.source='fulltext'")

Generate click logs

df_searches = df_all.filter('event.action = "searchResultPage"').select(F.col('event.searchToken').alias('tok'), F.col('event.query').alias('query'), F.col('event.didYouMeanVisible').alias('dym'))

df_clicks = df_all.filter(F.col('event.articleId').isNotNull()).filter(F.col('event.searchToken').isNotNull()).select(F.col('event.searchToken').alias('tok'), F.col('event.articleId'))

df_clicklog = df_searches.join(df_clicks, 'tok').dropDuplicates().cache()

Generate a new dataset with articleId, page title and a boolean to see the query could have matched

Function to collect info from elastic

def quelastic(searchQuery, id, dym):
    query = {
        "_source": [
            "_id",
            "title"
        ],
        "query": {
            "bool": {
                "should": [
                    {
                        "match": {
                            "all": {
                                "_name": "matched",
                                "query": searchQuery,
                                "operator": "and",
                            }
                        }
                    }
                ],
                "filter": {
                    "ids": {
                        "type": "page",
                        "values": [id]
                    },
                },
            }
        }
    }
    resp = requests.post("http://elastic2010.codfw.wmnet:9200/enwiki_content/_search", data=json.dumps(query))
    resp = resp.json()
    if len(resp['hits']['hits']) == 0:
        return {
            "id": id,
            "found": False,
            "query": searchQuery,
            "title": None,
            "can_match": False,
            "dym": dym,
        }
    hit = resp['hits']['hits'][0]
    return {
        "id": id,
        "found": True,
        "query": searchQuery,
        "title": hit['_source']['title'],
        "can_match": 'matched_queries' in hit and 'matched' in hit['matched_queries'],
        "dym": dym,
    }

Now build the dataset:

df_cl_tr = sqlCtx.createDataFrame(df_clicklog.rdd.map(lambda r: Row(**quelastic(r['query'], r['articleId'], r['dym'])))).cache()
# output some examples:
df_cl_tr.filter('found = true').filter('can_match = false').show(10)
+---------+-----+--------+--------------------+--------------------+
|can_match|found|      id|               query|               title|
+---------+-----+--------+--------------------+--------------------+
|    false| true| 1618529|XYZ                 |         Sand filter|
|    false| true|11978561|XYZ                 | Arborfield Garrison|
|    false| true|   39034|XYZ                 |J. Robert Oppenhe...|
|    false| true|31282049|XYZ                 |Apologies, I Have...|
|    false| true|  144829|XYZ                 |       Steve McQueen|
|    false| true| 4384721|XYZ                 |(Shake, Shake, Sh...|
|    false| true|44294098|XYZ                 |  Comparison diagram|
|    false| true| 9087364|XYZ                 |Islamic State of ...|
|    false| true|17238607|XYZ                 |       Faroe Islands|
|    false| true|   27698|XYZ                 |            Sanskrit|
+---------+-----+--------+--------------------+--------------------+
# store the data for further processing
df_partial_match = df_cl_tr.filter('found = true').filter('can_match = false')
df_partial_match.write.parquet('/user/dcausse/recall_ab_test_partial_match')