User:Milimetric/Learning iceberg/Spark streaming append
Appearance
Tiny local spark SQL environment, just making a test table:
spark3-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.0.0 \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hive
Simple table with schema (string_key, string_value)
, fed by a stream of events, with random keys ranging from 1 to 100. Partition by the first character of the key, so that we'll have roughly 10 partitions and lots of opportunities for collisions as time goes on.
create table milimetric.iceberg_test_update ( string_key string not null, string_value string ) using iceberg partitioned by ( truncate(1, string_key) ) tblproperties ( 'format-version'='2', 'write.format.default'='parquet', 'write.parquet.row-group-size-bytes'='268435456', 'write.parquet.page-size-bytes'='2097152', 'write.metadata.previous-versions-max'='10', 'write.metadata.delete-after-commit.enabled'='true', 'write.delete.mode'='copy-on-write', 'write.update.mode'='merge-on-read', 'write.merge.mode'='merge-on-read' ); insert into milimetric.iceberg_test_update select '14' as string_key, 'first value' as string_value ; insert into milimetric.iceberg_test_update select '14' as string_key, 'second value' as string_value ;
Then feed data into it in different ways and see what happens.
The simplest way is according to docs, by using append mode of structured streaming.
spark3-shell --master yarn \ --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.0.0 \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hive import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit val sdf = spark.readStream .format("rate") .option("rowsPerSecond", 10) .load() .selectExpr("coalesce(cast(value % 101 as string), 0) as string_key", "cast(value as string) as string_value") /** * Test: * val sdf.writeStream.format("console").start() */ val job = sdf.writeStream .format("iceberg") .outputMode("append") .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) .option("path", "spark_catalog.milimetric.iceberg_test_update") .option("fanout-enabled", "true") .option("checkpointLocation", "/tmp/milimetric_test") .start() job.awaitTermination()
Run that for a while and evaluate (same spark3-sql as above)
select * from iceberg_test_update where string_key = '14'; -- of course returns all the values inserted for this key, including the initial one set up with the table. -- so how do we get it to update?
Try 1. update identifier fields and insert again - no good
import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.Table import org.apache.iceberg.spark.actions.SparkActions import org.apache.iceberg.SortOrder import org.apache.iceberg.SortDirection import org.apache.iceberg.NullOrder import org.apache.iceberg.actions.RewriteDataFiles val catalog = new HiveCatalog() catalog.setConf(spark.sparkContext.hadoopConfiguration) val properties = new java.util.HashMap[String, String]() catalog.initialize("spark_catalog", properties) val ice = TableIdentifier.of("milimetric", "iceberg_test_update") val iceT = catalog.loadTable(ice) iceT.updateSchema.setIdentifierFields("string_key").commit()
Try 2. with the schema set up as above, see if there's some upsert...