Data Platform/Data Lake/Edits/Mediawiki history dumps/Python Pandas examples
Appearance
(Redirected from Analytics/Data Lake/Edits/Mediawiki history dumps/Python Pandas examples)
This page provides examples of using Python and Pandas to process the Mediawiki history dumps. There are slight difference between Pandas v1 and Pandas v2, therefore you can find an example for each version.
Pandas v1
# ---
# jupyter:
# jupytext:
# text_representation:
# extension: .py
# format_name: light
# format_version: '1.5'
# jupytext_version: 1.16.1
# kernelspec:
# display_name: Python [conda-mwhd_pandas1]
# language: python
# name: conda-env-conda-mwhd_pandas1-py
# ---
# + [markdown] editable=true slideshow={"slide_type": ""}
# # Processing MediaWiki HistoryDump files with Pandas
# +
import bz2
import csv
import pathlib
import tempfile
import pandas as pd
import matplotlib.pyplot as plt
# -
# %matplotlib inline
# let's use catalan Wikipedia as an example and let's assume the data are in
# the directory:
# data/mwhd/2023-10/cawiki/
LANG = "ca"
data_dir = pathlib.Path("data/mwhd/2023-10/{lang}wiki".format(lang=LANG))
# create a plots directory, if does not exist
plots_dir = pathlib.Path("plots")
plots_dir.mkdir(parents=True, exist_ok=True)
csv_files = sorted([f for f in data_dir.glob("*tsv.bz2")])
csv_files
tmpdir = tempfile.TemporaryDirectory(prefix="mwhd-pandas.")
tmpdir_path = pathlib.Path(tmpdir.name)
print(f"Created temporary directory at {tmpdir_path}")
# ### Helper functions
def decompress_bz2_files(file_paths, tmpdir):
decompressed_files = []
# Decompress each bz2 file
for file_path in file_paths:
# check if file is bz2 compressed, otherwise skip
if not file_path.suffix.endswith(".bz2"):
print(f"File {file_path} is not bz2 compressed, skipping...")
decompressed_files.append(file_path)
continue
# det the base name without .bz2 extension for the decompressed file
decompressed_file_path = pathlib.Path(tmpdir, file_path.stem)
# Read the bz2 file and write the decompressed data
with bz2.BZ2File(file_path, "rb") as file, decompressed_file_path.open(
"wb"
) as new_file:
# Copy the decompressed data to the new file
for data in iter(lambda: file.read(100 * 1024), b""):
new_file.write(data)
decompressed_files.append(decompressed_file_path)
print(f" - decompressed {file_path} to {decompressed_file_path}")
return decompressed_files
# ### Read input
# let's assume that there is a file with the MWHD fields in:
# data/mwhd/mwhd_fields.csv
fields_file = pathlib.Path("data/mwhd/mwhd_fields.csv")
CSV_FIELDS = []
CSV_FIELDS_META = {}
with fields_file.open("r") as infile:
reader = csv.reader(infile, delimiter="\t")
# skip header
next(reader)
for line in reader:
fclass = line[0]
fname = line[1]
dtype = line[2]
comment = line[3]
CSV_FIELDS.append(fname)
if dtype == "int":
dtype = "Int64"
elif dtype == "bigint":
dtype = "Int64"
elif dtype == "array<string>":
dtype = "object"
if "timestamp" in fname:
dtype = "object"
CSV_FIELDS_META[fname] = {"class": fclass, "dtype": dtype, "comment": comment}
# +
maxl = 60
print(f"id\t{'field': <{maxl}}\tdtype")
print("----\t" + "-" * maxl + "\t" + "------")
for id, field in enumerate(CSV_FIELDS, start=1):
print(f"{id}\t{field: <{maxl}}\t{CSV_FIELDS_META[field]['dtype']}")
# +
timestamp_fields = [
(id, field) for id, field in enumerate(CSV_FIELDS, start=1) if "timestamp" in field
]
print(f"id\t{'field': <{maxl}}\tdtype")
print("----\t" + "-" * maxl + "\t" + "------")
for id, field in timestamp_fields:
print(f"{id}\t{field: <{maxl}}\t{CSV_FIELDS_META[field]['dtype']}")
# +
# ## Load CSV with pandas
df_list = []
for file in csv_files:
tmpdf = pd.read_csv(
file,
delimiter="\t",
encoding="utf-8",
quotechar='"',
quoting=csv.QUOTE_NONE,
doublequote=False,
header=None,
names=CSV_FIELDS,
dtype={field: CSV_FIELDS_META[field]["dtype"] for field in CSV_FIELDS},
# date_format={field: "%Y-%m-%d %H:%M:%S.%f"
# for field in CSV_FIELDS
# if "timestamp" in field},
)
df_list.append(tmpdf)
df = pd.concat(df_list)
df.head()
del df_list
# -
for _, field in timestamp_fields:
df[field] = pd.to_datetime(
df[field], errors="coerce", format="%Y-%m-%d %H:%M:%S.%f"
)
for field in df.columns.tolist():
if pd.api.types.is_bool_dtype(df.dtypes[field]):
df[field] = df[field].astype("boolean")
df[field] = df[field].fillna(False)
df["event_date"] = df["event_timestamp"].dt.date
basedf = df[
[
"event_date",
"event_entity",
"event_type",
"event_user_is_created_by_self",
"event_user_is_created_by_system",
"event_user_is_created_by_peer",
"event_timestamp",
]
].copy()
basedf.dtypes
basedf.head()
event_entities = basedf["event_entity"].unique()
event_entities
event_user_df = basedf[basedf["event_entity"] == "user"]
event_user_df.head()
# +
# %%time
# number of new users created
new_users_all = (
event_user_df[
(event_user_df["event_type"] == "create")
& (
event_user_df["event_user_is_created_by_self"]
| event_user_df["event_user_is_created_by_system"]
| event_user_df["event_user_is_created_by_peer"]
)
]
.groupby("event_date")
.size()
)
# +
ax = new_users_all.plot()
plt.xticks(rotation=70)
# Save the figure to a file
plot_filename = "{lang}wiki.new_users_all.pandas-v1.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)
plt.show()
# +
# %%time
# number of new users created
new_users_self = (
event_user_df[
(event_user_df["event_type"] == "create")
& (event_user_df["event_user_is_created_by_self"])
]
.groupby("event_date")
.size()
)
# +
ax = new_users_self.plot()
plt.xticks(rotation=70)
# Save the figure to a file
plot_filename = "{lang}wiki.new_users_self.pandas-v1.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)
plt.show()
# +
# %%time
# number of new pages created
new_pages = (
basedf[(basedf["event_entity"] == "page") & (basedf["event_type"] == "create")]
.groupby("event_date")
.size()
)
# +
ax = new_pages.plot()
plt.xticks(rotation=70)
# Save the figure to a file
plot_filename = "{lang}wiki.new_pages.pandas-v1.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)
plt.show()
# +
# %%time
# absolute value of total bytes added or removed
total_bytes = df["revision_text_bytes_diff"].abs().groupby(df["event_date"]).sum()
# +
ax = total_bytes.plot()
plt.xticks(rotation=70)
# Save the figure to a file
plot_filename = "{lang}wiki.total_bytes.pandas-v1.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)
plt.show()
# +
# %%time
# number of new revisions created
new_revisions = (
basedf[(basedf["event_entity"] == "revision") & (basedf["event_type"] == "create")]
.groupby("event_date")
.size()
)
# +
ax = new_revisions.plot()
plt.xticks(rotation=70)
# Save the figure to a file
plot_filename = "{lang}wiki.new_revisions.pandas-v1.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)
plt.show()
Requirements
Example conda/pip requirements file.
# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: linux-64
matplotlib=3.7.2=py38h06a4308_0
matplotlib-base=3.7.2=py38h1128e8f_0
matplotlib-inline=0.1.6=py38h06a4308_0
pandas=1.5.3=py38h417a72b_0
Pandas v2
# ---
# jupyter:
# jupytext:
# text_representation:
# extension: .py
# format_name: light
# format_version: '1.5'
# jupytext_version: 1.16.1
# kernelspec:
# display_name: Python [conda-mwhd_pandas2]
# language: python
# name: conda-env-conda-mwhd_pandas2-py
# ---
# + [markdown] editable=true slideshow={"slide_type": ""}
# # Processing MediaWiki HistoryDump files with Pandas
# +
import bz2
import csv
import pathlib
import tempfile
import pandas as pd
import matplotlib.pyplot as plt
# -
# %matplotlib inline
# let's use catalan Wikipedia as an example and let's assume the data are in
# the directory:
# data/mwhd/2023-10/cawiki/
LANG = "ca"
data_dir = pathlib.Path("data/mwhd/2023-10/{lang}wiki".format(lang=LANG))
# create a plots directory, if does not exist
plots_dir = pathlib.Path("plots")
plots_dir.mkdir(parents=True, exist_ok=True)
csv_files = sorted([f for f in data_dir.glob("*tsv.bz2")])
csv_files
tmpdir = tempfile.TemporaryDirectory(prefix="mwhd-pandas.")
tmpdir_path = pathlib.Path(tmpdir.name)
print(f"Created temporary directory at {tmpdir_path}")
# ### Helper functions
def decompress_bz2_files(file_paths, tmpdir):
decompressed_files = []
# Decompress each bz2 file
for file_path in file_paths:
# check if file is bz2 compressed, otherwise skip
if not file_path.suffix.endswith(".bz2"):
print(f"File {file_path} is not bz2 compressed, skipping...")
decompressed_files.append(file_path)
continue
# det the base name without .bz2 extension for the decompressed file
decompressed_file_path = pathlib.Path(tmpdir, file_path.stem)
# Read the bz2 file and write the decompressed data
with bz2.BZ2File(file_path, "rb") as file, decompressed_file_path.open(
"wb"
) as new_file:
# Copy the decompressed data to the new file
for data in iter(lambda: file.read(100 * 1024), b""):
new_file.write(data)
decompressed_files.append(decompressed_file_path)
print(f" - decompressed {file_path} to {decompressed_file_path}")
return decompressed_files
# ### Read input
# let's assume that there is a file with the MWHD fields in:
# data/mwhd/mwhd_fields.csv
fields_file = pathlib.Path("data/mwhd/mwhd_fields.csv")
CSV_FIELDS = []
CSV_FIELDS_META = {}
with fields_file.open("r") as infile:
reader = csv.reader(infile, delimiter="\t")
# skip header
next(reader)
for line in reader:
fclass = line[0]
fname = line[1]
dtype = line[2]
comment = line[3]
CSV_FIELDS.append(fname)
if dtype == "int":
dtype = "Int64"
elif dtype == "bigint":
dtype = "Int64"
elif dtype == "array<string>":
dtype = "object"
if "timestamp" in fname:
dtype = "object"
CSV_FIELDS_META[fname] = {"class": fclass, "dtype": dtype, "comment": comment}
# +
maxl = 60
print(f"id\t{'field': <{maxl}}\tdtype")
print("----\t" + "-" * maxl + "\t" + "------")
for id, field in enumerate(CSV_FIELDS, start=1):
print(f"{id}\t{field: <{maxl}}\t{CSV_FIELDS_META[field]['dtype']}")
# +
timestamp_fields = [
(id, field) for id, field in enumerate(CSV_FIELDS, start=1) if "timestamp" in field
]
print(f"id\t{'field': <{maxl}}\tdtype")
print("----\t" + "-" * maxl + "\t" + "------")
for id, field in timestamp_fields:
print(f"{id}\t{field: <{maxl}}\t{CSV_FIELDS_META[field]['dtype']}")
# +
# ## Load CSV with pandas
df_list = []
for file in csv_files:
tmpdf = pd.read_csv(
file,
delimiter="\t",
encoding="utf-8",
quotechar='"',
quoting=csv.QUOTE_NONE,
doublequote=False,
header=None,
names=CSV_FIELDS,
dtype={field: CSV_FIELDS_META[field]["dtype"] for field in CSV_FIELDS},
date_format={
field: "%Y-%m-%d %H:%M:%S.%f"
for field in CSV_FIELDS
if "timestamp" in field
},
)
df_list.append(tmpdf)
df = pd.concat(df_list)
df.head()
del df_list
# -
for _, field in timestamp_fields:
df[field] = pd.to_datetime(
df[field], errors="coerce", format="%Y-%m-%d %H:%M:%S.%f"
)
for field in df.columns.tolist():
if df.dtypes[field] == "boolean":
df[field] = df[field].astype("boolean")
df[field] = df[field].fillna(False)
df["event_date"] = df["event_timestamp"].dt.date
basedf = df[
[
"event_date",
"event_entity",
"event_type",
"event_user_is_created_by_self",
"event_user_is_created_by_system",
"event_user_is_created_by_peer",
"event_timestamp",
]
].copy()
basedf.dtypes
basedf.head()
event_entities = basedf["event_entity"].unique()
event_entities
event_user_df = basedf[basedf["event_entity"] == "user"]
event_user_df.head()
# +
# %%time
# number of new users created
new_users_all = (
event_user_df[
(event_user_df["event_type"] == "create")
& (
event_user_df["event_user_is_created_by_self"]
| event_user_df["event_user_is_created_by_system"]
| event_user_df["event_user_is_created_by_peer"]
)
]
.groupby("event_date")
.size()
)
# +
ax = new_users_all.plot()
plt.xticks(rotation=70)
# Save the figure to a file
plot_filename = "{lang}wiki.new_users_all.pandas-v2.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)
plt.show()
# +
# %%time
# number of new users created
new_users_self = (
event_user_df[
(event_user_df["event_type"] == "create")
& (event_user_df["event_user_is_created_by_self"])
]
.groupby("event_date")
.size()
)
# +
ax = new_users_self.plot()
plt.xticks(rotation=70)
# Save the figure to a file
plot_filename = "{lang}wiki.new_users_self.pandas-v2.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)
plt.show()
# +
# %%time
# number of new pages created
new_pages = (
basedf[(basedf["event_entity"] == "page") & (basedf["event_type"] == "create")]
.groupby("event_date")
.size()
)
# +
ax = new_pages.plot()
plt.xticks(rotation=70)
# Save the figure to a file
plot_filename = "{lang}wiki.new_pages.pandas-v2.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)
plt.show()
# +
# %%time
# absolute value of total bytes added or removed
total_bytes = df["revision_text_bytes_diff"].abs().groupby(df["event_date"]).sum()
# +
ax = total_bytes.plot()
plt.xticks(rotation=70)
# Save the figure to a file
plot_filename = "{lang}wiki.total_bytes.pandas-v2.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)
plt.show()
# +
# %%time
# number of new revisions created
new_revisions = (
basedf[(basedf["event_entity"] == "revision") & (basedf["event_type"] == "create")]
.groupby("event_date")
.size()
)
# +
ax = new_revisions.plot()
plt.xticks(rotation=70)
# Save the figure to a file
plot_filename = "{lang}wiki.new_revisions.pandas-v2.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)
plt.show()
Requirements
Example conda/pip requirements file.
# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: linux-64
matplotlib=3.7.2=py38h06a4308_0
matplotlib-base=3.7.2=py38h1128e8f_0
matplotlib-inline=0.1.6=py38h06a4308_0
pandas=2.0.3=py38h1128e8f_0