User:Milimetric/Learning iceberg/Copy large table
Appearance
spark3-sql --master yarn --driver-memory 4G \ --executor-cores 1 --executor-memory 2G \ --conf spark.dynamicAllocation.maxExecutors=256 \ --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.0.0 \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hive \ --conf spark.sql.shuffle.partitions=4096 \ --conf spark.shuffle.file.buffer=16M \ --conf spark.file.transferTo=False \ --conf spark.executor.heartbeatInterval=30s \ --conf spark.executor.memoryOverhead=1G \ --conf spark.network.timeout=240s NOTES * Failed a lot of tasks without memoryOverhead >= 1G, this was key * still don't understand shuffle partitions vs. how the query actually distributes / coalesces, it seems to ignore the latter * file buffer I don't think comes into play here, but when there is spill it should help * heartbeatInterval was timing out with 10s, but that was before we figured out the overhead setting, so maybe not relevant
Initially I had to run 4 separate jobs per wiki, but hopeful for new approach with more overhead that it can run everything at once
create table milimetric.iceberg_wikitext_metadata using iceberg partitioned by ( truncate(10, revision_timestamp) ) tblproperties ( 'format-version'='2', 'write.format.default'='parquet', 'write.parquet.row-group-size-bytes'='268435456', 'write.parquet.page-size-bytes'='2097152', 'write.metadata.previous-versions-max'='2', 'write.metadata.delete-after-commit.enabled'='true', 'write.delete.mode'='copy-on-write', 'write.update.mode'='merge-on-read', 'write.merge.mode'='merge-on-read' ) as -- insert into milimetric.iceberg_wikitext_metadata select wiki_db, page_id, page_namespace, page_title, page_redirect_title, page_restrictions, user_id, user_text, revision_id, revision_parent_id, revision_timestamp, revision_minor_edit, revision_comment, revision_text_bytes, revision_text_sha1, revision_content_model, revision_content_format, cast(array() as array<string>) as revision_deleted_parts, false as revision_is_latest from wmf.mediawiki_wikitext_history where snapshot = '2022-10' -- and wiki_db = 'enwiki' distribute by substring(revision_timestamp, 0, 10) sort by substring(revision_timestamp, 0, 10), wiki_db ;
Then optimize data files:
import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.Table import org.apache.iceberg.spark.actions.SparkActions import org.apache.iceberg.SortOrder import org.apache.iceberg.SortDirection import org.apache.iceberg.NullOrder import org.apache.iceberg.actions.RewriteDataFiles val catalog = new HiveCatalog() catalog.setConf(spark.sparkContext.hadoopConfiguration) val properties = new java.util.HashMap[String, String]() catalog.initialize("spark_catalog", properties) val ice = TableIdentifier.of("milimetric", "iceberg_wikitext_metadata") val iceT = catalog.loadTable(ice) val rewrite = SparkActions.get.rewriteDataFiles(iceT).option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "128").option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") val sortOrder = SortOrder.builderFor(iceT.schema).sortBy("wiki_db", SortDirection.ASC, NullOrder.NULLS_LAST).sortBy("page_id", SortDirection.ASC, NullOrder.NULLS_LAST).sortBy("user_id", SortDirection.ASC, NullOrder.NULLS_LAST).build() rewrite.sort(sortOrder).execute() SparkActions.get().expireSnapshots(iceT).expireOlderThan(System.currentTimeMillis()).execute() SparkActions.get().deleteOrphanFiles(iceT).olderThan(System.currentTimeMillis()).execute()