User:Milimetric/Learning iceberg/Copy large table

From Wikitech
spark3-sql --master yarn --driver-memory 4G \
    --executor-cores 1 --executor-memory 2G \
    --conf spark.dynamicAllocation.maxExecutors=256 \
    --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.0.0 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.shuffle.partitions=4096 \
    --conf spark.shuffle.file.buffer=16M \
    --conf spark.file.transferTo=False \
    --conf spark.executor.heartbeatInterval=30s \
    --conf spark.executor.memoryOverhead=1G \
    --conf spark.network.timeout=240s

NOTES
    * Failed a lot of tasks without memoryOverhead >= 1G, this was key
    * still don't understand shuffle partitions vs. how the query actually distributes / coalesces, it seems to ignore the latter
    * file buffer I don't think comes into play here, but when there is spill it should help
    * heartbeatInterval was timing out with 10s, but that was before we figured out the overhead setting, so maybe not relevant

Initially I had to run 4 separate jobs per wiki, but hopeful for new approach with more overhead that it can run everything at once

 create table milimetric.iceberg_wikitext_metadata
  using iceberg
 partitioned by (
    truncate(10, revision_timestamp)
 )
 tblproperties (
     'format-version'='2',
     'write.format.default'='parquet',
     'write.parquet.row-group-size-bytes'='268435456',
     'write.parquet.page-size-bytes'='2097152',
     'write.metadata.previous-versions-max'='2',
     'write.metadata.delete-after-commit.enabled'='true',
     'write.delete.mode'='copy-on-write',
     'write.update.mode'='merge-on-read',
     'write.merge.mode'='merge-on-read'
 )
 as
-- insert into milimetric.iceberg_wikitext_metadata
 select wiki_db,
        page_id,
        page_namespace,
        page_title,
        page_redirect_title,
        page_restrictions,
        user_id,
        user_text,
        revision_id,
        revision_parent_id,
        revision_timestamp,
        revision_minor_edit,
        revision_comment,
        revision_text_bytes,
        revision_text_sha1,
        revision_content_model,
        revision_content_format,
        cast(array() as array<string>) as revision_deleted_parts,
        false as revision_is_latest
   from wmf.mediawiki_wikitext_history
  where snapshot = '2022-10'
--    and wiki_db = 'enwiki'
        distribute by substring(revision_timestamp, 0, 10)
        sort by substring(revision_timestamp, 0, 10), wiki_db
;


Then optimize data files:

import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.Table
import org.apache.iceberg.spark.actions.SparkActions
import org.apache.iceberg.SortOrder
import org.apache.iceberg.SortDirection
import org.apache.iceberg.NullOrder
import org.apache.iceberg.actions.RewriteDataFiles

val catalog = new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration)
val properties = new java.util.HashMap[String, String]()
catalog.initialize("spark_catalog", properties)
val ice = TableIdentifier.of("milimetric", "iceberg_wikitext_metadata")
val iceT = catalog.loadTable(ice)
val rewrite = SparkActions.get.rewriteDataFiles(iceT).option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "128").option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
val sortOrder = SortOrder.builderFor(iceT.schema).sortBy("wiki_db", SortDirection.ASC, NullOrder.NULLS_LAST).sortBy("page_id", SortDirection.ASC, NullOrder.NULLS_LAST).sortBy("user_id", SortDirection.ASC, NullOrder.NULLS_LAST).build()
rewrite.sort(sortOrder).execute()

SparkActions.get().expireSnapshots(iceT).expireOlderThan(System.currentTimeMillis()).execute()
SparkActions.get().deleteOrphanFiles(iceT).olderThan(System.currentTimeMillis()).execute()