Branches Iceberg. Pyspark
NOTE
Небольшой эксперимент по использованию веток в таблицах Iceberg.
Friend:: Branches Iceberg. Spark submit
-
Создаём Spark сессию (предварительно убив старую!!)
Ссылка на оригиналspark.stop() spark = ( SparkSession.builder .appName("IcebergBranch") .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.spark_catalog.type", "hive") .config("spark.sql.catalog.spark_catalog.uri", "thrift://100.64.88.101:9083") .config("spark.hadoop.fs.defaultFS", "hdfs://100.64.88.101:9000") .config("spark.sql.catalog.spark_catalog.warehouse", "hdfs://100.64.88.101:9000/warehouse") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .getOrCreate() )
-
Получение тестовых данных из таблицы JDBC
Ссылка на оригиналdb_url = "jdbc:postgresql://db.mak-sim.ru/finance" db_properties = { "user": "maksim", "password": "pas$$", "driver": "org.postgresql.Driver" } table_or_query = "(SELECT * FROM btc) as result" df = spark.read.jdbc(url=db_url, table=table_or_query, properties=db_properties)
-
На всякий случай удаляем таблицу и записываем тестовые данные в формате Iceberg.
Ссылка на оригиналspark.sql("DROP TABLE IF EXISTS spark_catalog.default.btc"); df.write.format("iceberg").mode("append").saveAsTable("spark_catalog.default.btc")
-
Проверяем, что всё загрузилось успешно:
Ссылка на оригиналspark.sql("SELECT * FROM spark_catalog.default.btc LIMIT 10").show()
Ссылка на оригинал+--------------------+---------+ | date| value| +--------------------+---------+ |2024-08-26 10:56:...|5858493.0| |2024-08-26 11:05:...|5847901.0| |2024-08-27 00:20:...|5786144.0| |2024-08-27 14:40:...|5665017.0| |2024-08-28 05:00:...|5426125.0| |2024-08-28 19:20:...|5424955.0| |2024-08-29 09:40:...|5477816.0| |2024-08-30 00:00:...|5429526.0| |2024-08-30 14:20:...|5372384.0| |2024-08-26 11:06:...|5847901.0| +--------------------+---------+
-
Создаём новый бранч
test
Ссылка на оригиналspark.sql("ALTER TABLE spark_catalog.default.btc CREATE BRANCH test")
-
Создаём тестовый набор данных и записываемых в новый бранч
Ссылка на оригиналfrom datetime import datetime data = [(datetime.now(), 1.0)] df_new = spark.createDataFrame(data, df.schema)
Ссылка на оригиналdf_new.writeTo("spark_catalog.default.btc.branch_test").append()
-
Проверяем количество строк в основном и тестовом бранче
Ссылка на оригиналspark.sql("SELECT count(*) from spark_catalog.default.btc.branch_test").show()
Ссылка на оригинал+--------+ |count(1)| +--------+ | 407917| +--------+
Ссылка на оригиналspark.sql("SELECT count(*) from spark_catalog.default.btc").show()
Ссылка на оригинал+--------+ |count(1)| +--------+ | 407916| +--------+
-
Сливаем новый брэнч в основной
Ссылка на оригиналspark.sql("CALL spark_catalog.system.fast_forward('btc', 'main', 'test')")
-
Проверяем, что в основном брэнче теперь нужное количество строк
Ссылка на оригиналspark.sql("SELECT count(*) from spark_catalog.default.btc").show()
Ссылка на оригинал+--------+ |count(1)| +--------+ | 407917| +--------+
-
Удаляем новый брэнч
Ссылка на оригиналspark.sql("ALTER TABLE spark_catalog.default.btc DROP BRANCH test")