User:Milimetric/Learning iceberg/Spark streaming append

From Wikitech

Tiny local spark SQL environment, just making a test table:

spark3-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.0.0 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive

Simple table with schema (string_key, string_value), fed by a stream of events, with random keys ranging from 1 to 100. Partition by the first character of the key, so that we'll have roughly 10 partitions and lots of opportunities for collisions as time goes on.

 create table milimetric.iceberg_test_update (
            string_key string not null,
            string_value string
        )
  using iceberg
 partitioned by (
    truncate(1, string_key)
 )
 tblproperties (
     'format-version'='2',
     'write.format.default'='parquet',
     'write.parquet.row-group-size-bytes'='268435456',
     'write.parquet.page-size-bytes'='2097152',
     'write.metadata.previous-versions-max'='10',
     'write.metadata.delete-after-commit.enabled'='true',
     'write.delete.mode'='copy-on-write',
     'write.update.mode'='merge-on-read',
     'write.merge.mode'='merge-on-read'
 );
 insert into milimetric.iceberg_test_update
 select '14' as string_key,
        'first value' as string_value
;
 insert into milimetric.iceberg_test_update
 select '14' as string_key,
        'second value' as string_value
;

Then feed data into it in different ways and see what happens.

The simplest way is according to docs, by using append mode of structured streaming.

  spark3-shell --master yarn \
    --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.0.0 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive

import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit

val sdf = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 10)
  .load()
  .selectExpr("coalesce(cast(value % 101 as string), 0) as string_key", "cast(value as string) as string_value")
  
/**
 * Test:
 * val sdf.writeStream.format("console").start()
 */
    
val job = sdf.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
    .option("path", "spark_catalog.milimetric.iceberg_test_update")
    .option("fanout-enabled", "true")
    .option("checkpointLocation", "/tmp/milimetric_test")
    .start()

job.awaitTermination()

Run that for a while and evaluate (same spark3-sql as above)

select * from iceberg_test_update where string_key = '14';
-- of course returns all the values inserted for this key, including the initial one set up with the table.
-- so how do we get it to update?

Try 1. update identifier fields and insert again - no good

import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.Table
import org.apache.iceberg.spark.actions.SparkActions
import org.apache.iceberg.SortOrder
import org.apache.iceberg.SortDirection
import org.apache.iceberg.NullOrder
import org.apache.iceberg.actions.RewriteDataFiles

val catalog = new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration)
val properties = new java.util.HashMap[String, String]()
catalog.initialize("spark_catalog", properties)
val ice = TableIdentifier.of("milimetric", "iceberg_test_update")
val iceT = catalog.loadTable(ice)

iceT.updateSchema.setIdentifierFields("string_key").commit()

Try 2. with the schema set up as above, see if there's some upsert...