Java
package ru.mak_sim.iceberg_branches_demo;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class IcebergBranchesDemo {
public static void main(String[] args) {
// 1. Π‘ΠΎΠ·Π΄Π°Π½ΠΈΠ΅ SparkSession Ρ Π½Π°ΡΡΡΠΎΠΉΠΊΠ°ΠΌΠΈ Iceberg
SparkSession spark = SparkSession.builder()
.appName("IcebergBranchesDemo")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hive")
.config("spark.sql.catalog.spark_catalog.uri", "thrift://100.64.88.101:9083")
.config("spark.hadoop.fs.defaultFS", "hdfs://100.64.88.101:9000")
.config("spark.sql.catalog.spark_catalog.warehouse", "hdfs://100.64.88.101:9000/warehouse")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate();
try {
// 2. ΠΠ°Π³ΡΡΠ·ΠΊΠ° Π΄Π°Π½Π½ΡΡ
ΠΈΠ· PostgreSQL
Properties dbProps = new Properties();
dbProps.setProperty("user", "user");
dbProps.setProperty("password", "pa$$");
dbProps.setProperty("driver", "org.postgresql.Driver");
Dataset<Row> df = spark.read()
.jdbc("jdbc:postgresql://db.mak-sim.ru/finance", "(SELECT * FROM btc) as result", dbProps);
// 3. Π£Π΄Π°Π»Π΅Π½ΠΈΠ΅ ΡΡΠ°ΡΠΎΠΉ ΡΠ°Π±Π»ΠΈΡΡ ΠΈ ΡΠΎΠ·Π΄Π°Π½ΠΈΠ΅ Π½ΠΎΠ²ΠΎΠΉ
spark.sql("DROP TABLE IF EXISTS spark_catalog.default.btc");
df.write().format("iceberg").mode("overwrite").saveAsTable("spark_catalog.default.btc");
// 4. ΠΡΠΎΠ²Π΅ΡΠΊΠ° Π½Π°ΡΠ°Π»ΡΠ½ΡΡ
Π΄Π°Π½Π½ΡΡ
System.out.println("Initial data preview:");
spark.sql("SELECT * FROM spark_catalog.default.btc LIMIT 10").show();
// 5. Π‘ΠΎΠ·Π΄Π°Π½ΠΈΠ΅ Π²Π΅ΡΠΊΠΈ test
spark.sql("ALTER TABLE spark_catalog.default.btc CREATE BRANCH test");
System.out.println("Branch 'test' created");
// 6. ΠΠ΅Π½Π΅ΡΠ°ΡΠΈΡ ΠΈ Π·Π°ΠΏΠΈΡΡ Π½ΠΎΠ²ΡΡ
Π΄Π°Π½Π½ΡΡ
Π² Π²Π΅ΡΠΊΡ test
StructType schema = df.schema();
List<Row> newData = Arrays.asList(
RowFactory.create(new Timestamp(System.currentTimeMillis()), 1.0f)
);
Dataset<Row> dfNew = spark.createDataFrame(newData, schema);
dfNew.writeTo("spark_catalog.default.btc.branch_test").append();
System.out.println("Data written to branch 'test'");
// 7. ΠΡΠΎΠ²Π΅ΡΠΊΠ° ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²Π° Π·Π°ΠΏΠΈΡΠ΅ΠΉ
System.out.println("Row count in test branch:");
spark.sql("SELECT count(*) from spark_catalog.default.btc.branch_test").show();
System.out.println("Row count in main branch:");
spark.sql("SELECT count(*) from spark_catalog.default.btc").show();
// 8. Π‘Π»ΠΈΡΠ½ΠΈΠ΅ Π²Π΅ΡΠΎΠΊ
spark.sql("CALL spark_catalog.system.fast_forward('default.btc', 'main', 'test')");
System.out.println("Branches merged with fast_forward");
// 9. ΠΡΠΎΠ²Π΅ΡΠΊΠ° ΠΏΠΎΡΠ»Π΅ ΡΠ»ΠΈΡΠ½ΠΈΡ
System.out.println("Row count after merge:");
spark.sql("SELECT count(*) from spark_catalog.default.btc").show();
// 10. Π£Π΄Π°Π»Π΅Π½ΠΈΠ΅ Π²Π΅ΡΠΊΠΈ
spark.sql("ALTER TABLE spark_catalog.default.btc DROP BRANCH test");
System.out.println("Branch 'test' dropped");
} catch (Exception e) {
e.printStackTrace();
} finally {
spark.stop();
}
}
}
build.gradle
plugins {
id 'java'
id 'application'
id 'com.github.johnrengelman.shadow' version '8.1.1'
}
group 'ru.mak_sim'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
// ΠΡΠΊΠ»ΡΡΠ°Π΅ΠΌ Spark ΠΈΠ· ΠΈΡΠΎΠ³ΠΎΠ²ΠΎΠ³ΠΎ jar (Π±ΡΠ΄Π΅Ρ ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Π»Π΅Π½ ΠΊΠ»Π°ΡΡΠ΅ΡΠΎΠΌ)
compileOnly 'org.apache.spark:spark-core_2.12:3.5.6'
compileOnly 'org.apache.spark:spark-sql_2.12:3.5.6'
// Iceberg: ΡΠΎΠ»ΡΠΊΠΎ spark-runtime, core ΠΈ common ΠΏΠΎΠ΄ΡΡΠ³ΠΈΠ²Π°ΡΡΡΡ ΡΡΠ°Π½Π·ΠΈΡΠΈΠ²Π½ΠΎ, ΠΌΠΎΠΆΠ½ΠΎ ΡΠ²Π½ΠΎ (spark-runtime Π² implementation)
implementation 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1'
// JDBC-Π΄ΡΠ°ΠΉΠ²Π΅Ρ Π΄Π»Ρ PostgreSQL
implementation 'org.postgresql:postgresql:42.7.7'
// HDFS-ΠΊΠ»ΠΈΠ΅Π½Ρ (min set, ΡΡΠΎΠ±Ρ Π½Π΅ ΡΠ°ΡΠΈΡΡ Π²ΡΡ hadoop, ΡΠΎΠ»ΡΠΊΠΎ client)
implementation 'org.apache.hadoop:hadoop-client:3.3.4'
}
application {
mainClass = 'ru.mak_sim.iceberg_branches_demo.IcebergBranchesDemo'
}
java {
toolchain {
languageVersion = JavaLanguageVersion.of(11) // ΠΈΠ»ΠΈ 17 Π΅ΡΠ»ΠΈ ΠΏΠΎΠ΄Π΄Π΅ΡΠΆΠΈΠ²Π°Π΅ΡΡΡ Π²Π°ΡΠΈΠΌ Spark
}
}
shadowJar {
zip64 = true
mergeServiceFiles()
manifest {
attributes 'Main-Class': application.mainClass
}
// ΠΡΠΊΠ»ΡΡΠ°Π΅ΠΌ ΠΊΠΎΠ½ΡΠ»ΠΈΠΊΡΡΡΡΠΈΠ΅ ΠΌΠ΅ΡΠ°-ΡΠ°ΠΉΠ»Ρ
exclude 'META-INF/*.RSA'
exclude 'META-INF/*.SF'
exclude 'META-INF/*.DSA'
// ΠΠΎΠΆΠ½ΠΎ Π΄ΠΎΠ±Π°Π²ΠΈΡΡ Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡΠ΅Π»ΡΠ½ΡΠ΅ exclude/relocate ΠΏΡΠΈ Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎΡΡΠΈ
// ΠΠ±ΡΡΠ½ΠΎ relocate Π΄Π»Ρ commons ΠΈ google Π½Π΅ ΡΡΠ΅Π±ΡΠ΅ΡΡΡ Π΄Π»Ρ ΡΡΠ°Π½Π΄Π°ΡΡΠ½ΠΎΠ³ΠΎ Iceberg/Spark
}
Π‘Π±ΠΎΡΠΊΠ° ΠΈ Π·Π°ΠΏΡΡΠΊ
./gradlew clean shadowJar
spark-submit --master spark://100.64.88.58:7077 --class ru.mak_sim.iceberg_branches_demo.IcebergBranchesDemo ~/SynologyDrive/dev/IcebergBranch/build/libs/IcebergBranch-1.0-SNAPSHOT-all.jar
Child:: Branches Iceberg. Spark submit. Result