Branches Iceberg. Pyspark

NOTE

Небольшой эксперимент по использованию веток в таблицах Iceberg.

Friend:: Branches Iceberg. Spark submit

  • Создаём Spark сессию (предварительно убив старую!!)

    spark.stop()
    spark = (
        SparkSession.builder
        .appName("IcebergBranch")
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.spark_catalog.type", "hive")
        .config("spark.sql.catalog.spark_catalog.uri", "thrift://100.64.88.101:9083")
        .config("spark.hadoop.fs.defaultFS", "hdfs://100.64.88.101:9000")
        .config("spark.sql.catalog.spark_catalog.warehouse", "hdfs://100.64.88.101:9000/warehouse")
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .getOrCreate()
    )
    Ссылка на оригинал

  • Получение тестовых данных из таблицы JDBC

    db_url = "jdbc:postgresql://db.mak-sim.ru/finance"
    db_properties = {
        "user": "maksim",
        "password": "pas$$",
        "driver": "org.postgresql.Driver"
    }
    table_or_query = "(SELECT * FROM btc) as result"
     
    df = spark.read.jdbc(url=db_url, table=table_or_query, properties=db_properties)
    Ссылка на оригинал

  • На всякий случай удаляем таблицу и записываем тестовые данные в формате Iceberg.

    spark.sql("DROP TABLE IF EXISTS spark_catalog.default.btc");
    df.write.format("iceberg").mode("append").saveAsTable("spark_catalog.default.btc")
    Ссылка на оригинал

  • Проверяем, что всё загрузилось успешно:

    spark.sql("SELECT * FROM spark_catalog.default.btc LIMIT 10").show()
    Ссылка на оригинал
    +--------------------+---------+
    |                date|    value|
    +--------------------+---------+
    |2024-08-26 10:56:...|5858493.0|
    |2024-08-26 11:05:...|5847901.0|
    |2024-08-27 00:20:...|5786144.0|
    |2024-08-27 14:40:...|5665017.0|
    |2024-08-28 05:00:...|5426125.0|
    |2024-08-28 19:20:...|5424955.0|
    |2024-08-29 09:40:...|5477816.0|
    |2024-08-30 00:00:...|5429526.0|
    |2024-08-30 14:20:...|5372384.0|
    |2024-08-26 11:06:...|5847901.0|
    +--------------------+---------+
    
    Ссылка на оригинал

  • Создаём новый бранч test

    spark.sql("ALTER TABLE spark_catalog.default.btc CREATE BRANCH test")
    Ссылка на оригинал

  • Создаём тестовый набор данных и записываемых в новый бранч

    from datetime import datetime
    data = [(datetime.now(), 1.0)]
    df_new = spark.createDataFrame(data, df.schema)
    Ссылка на оригинал
    df_new.writeTo("spark_catalog.default.btc.branch_test").append()
    Ссылка на оригинал

  • Проверяем количество строк в основном и тестовом бранче

    spark.sql("SELECT count(*) from spark_catalog.default.btc.branch_test").show()
    Ссылка на оригинал
    +--------+
    |count(1)|
    +--------+
    |  407917|
    +--------+
    
    Ссылка на оригинал
    spark.sql("SELECT count(*) from spark_catalog.default.btc").show()
    Ссылка на оригинал
    +--------+
    |count(1)|
    +--------+
    |  407916|
    +--------+
    
    Ссылка на оригинал

  • Сливаем новый брэнч в основной

    spark.sql("CALL spark_catalog.system.fast_forward('btc', 'main', 'test')")
    Ссылка на оригинал

  • Проверяем, что в основном брэнче теперь нужное количество строк

    spark.sql("SELECT count(*) from spark_catalog.default.btc").show()
    Ссылка на оригинал
    +--------+
    |count(1)|
    +--------+
    |  407917|
    +--------+
    
    Ссылка на оригинал

  • Удаляем новый брэнч

    spark.sql("ALTER TABLE spark_catalog.default.btc DROP BRANCH test")
    Ссылка на оригинал