Jump to content

User:Elukey/Analytics/Spark

From Wikitech


Docs

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#hive-metastore-parquet-table-conversion

Read log files via Spark

https://phabricator.wikimedia.org/T236698#5676234

val df = spark.read.text("/var/log/hadoop-yarn/apps/analytics/logs/application_1573208467349_68436")

df.
  selectExpr("(substr(value, 15) like 'main] DEBUG o.w.a.r.cassandra.ReducerToCassandra - Writing new result for line%') as removed_line", "octet_length(value) as size").
  groupBy("removed_line").
  agg(count(lit(1L)).as("count"), sum("size").as("size")).
  show(1000, false)

+------------+---------+-----------+                                            
|removed_line|count    |size       |
+------------+---------+-----------+
|true        |88514207 |16763811434|
|false       |590467017|91221912694|
+------------+---------+-----------+
spark.sql("SELECT * FROM wmf.webrequest where year=2019 and month=12 and day=16 and hour=0 limit 10").show();

Check raw files content via Spark

# Read SequenceFile (key:value) as RDD
val rdd = sc.sequenceFile[org.apache.hadoop.io.LongWritable,String]("/wmf/data/raw/eventlogging/eventlogging_MobileWebUIClickTracking/hourly/2020/07/26/00")

# Check records (selecting only the "value" part)
rdd.map(x => x._2).take(100)

# Transform it into a DataFrame
val df = spark.read.json(rdd.map(x => x._2))

# Show DataFrame
df.show()