Iceberg Branches. Jupyter Notebook
from pyspark.sql import SparkSession
spark.stop()
spark = (
SparkSession.builder
.appName("IcebergBranch")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hive")
.config("spark.sql.catalog.spark_catalog.uri", "thrift://100.64.88.101:9083")
.config("spark.hadoop.fs.defaultFS", "hdfs://100.64.88.101:9000")
.config("spark.sql.catalog.spark_catalog.warehouse", "hdfs://100.64.88.101:9000/warehouse")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
)
db_url = "jdbc:postgresql://db.mak-sim.ru/finance"
db_properties = {
"user": "maksim",
"password": "pas$$",
"driver": "org.postgresql.Driver"
}
table_or_query = "(SELECT * FROM btc) as result"
df = spark.read.jdbc(url=db_url, table=table_or_query, properties=db_properties)
spark.sql("DROP TABLE IF EXISTS spark_catalog.default.btc");
df.write.format("iceberg").mode("append").saveAsTable("spark_catalog.default.btc")
spark.sql("SELECT * FROM spark_catalog.default.btc LIMIT 10").show()
[Stage 1:> (0 + 1) / 1]
+--------------------+---------+
| date| value|
+--------------------+---------+
|2024-08-26 10:56:...|5858493.0|
|2024-08-26 11:05:...|5847901.0|
|2024-08-27 00:20:...|5786144.0|
|2024-08-27 14:40:...|5665017.0|
|2024-08-28 05:00:...|5426125.0|
|2024-08-28 19:20:...|5424955.0|
|2024-08-29 09:40:...|5477816.0|
|2024-08-30 00:00:...|5429526.0|
|2024-08-30 14:20:...|5372384.0|
|2024-08-26 11:06:...|5847901.0|
+--------------------+---------+
spark.sql("ALTER TABLE spark_catalog.default.btc CREATE BRANCH test")
DataFrame[]
from datetime import datetime
data = [(datetime.now(), 1.0)]
df_new = spark.createDataFrame(data, df.schema)
df_new.writeTo("spark_catalog.default.btc.branch_test").append()
[Stage 2:> (0 + 16) / 16]
spark.sql("SELECT count(*) from spark_catalog.default.btc.branch_test").show()
+--------+
|count(1)|
+--------+
| 407917|
+--------+
spark.sql("SELECT count(*) from spark_catalog.default.btc").show()
+--------+
|count(1)|
+--------+
| 407916|
+--------+
spark.sql("CALL spark_catalog.system.fast_forward('btc', 'main', 'test')")
DataFrame[branch_updated: string, previous_ref: bigint, updated_ref: bigint]
spark.sql("SELECT count(*) from spark_catalog.default.btc").show()
+--------+
|count(1)|
+--------+
| 407917|
+--------+
spark.sql("ALTER TABLE spark_catalog.default.btc DROP BRANCH test")
DataFrame[]