Branches Iceberg. Spark submit

Java

package ru.mak_sim.iceberg_branches_demo;
 
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
 
public class IcebergBranchesDemo {
    public static void main(String[] args) {
        // 1. Π‘ΠΎΠ·Π΄Π°Π½ΠΈΠ΅ SparkSession с настройками Iceberg
        SparkSession spark = SparkSession.builder()
                .appName("IcebergBranchesDemo")
                .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
                .config("spark.sql.catalog.spark_catalog.type", "hive")
                .config("spark.sql.catalog.spark_catalog.uri", "thrift://100.64.88.101:9083")
                .config("spark.hadoop.fs.defaultFS", "hdfs://100.64.88.101:9000")
                .config("spark.sql.catalog.spark_catalog.warehouse", "hdfs://100.64.88.101:9000/warehouse")
                .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
                .getOrCreate();
 
        try {
            // 2. Π—Π°Π³Ρ€ΡƒΠ·ΠΊΠ° Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· PostgreSQL
            Properties dbProps = new Properties();
            dbProps.setProperty("user", "user");
            dbProps.setProperty("password", "pa$$");
            dbProps.setProperty("driver", "org.postgresql.Driver");
 
            Dataset<Row> df = spark.read()
                    .jdbc("jdbc:postgresql://db.mak-sim.ru/finance", "(SELECT * FROM btc) as result", dbProps);
 
            // 3. Π£Π΄Π°Π»Π΅Π½ΠΈΠ΅ старой Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹ ΠΈ созданиС Π½ΠΎΠ²ΠΎΠΉ
            spark.sql("DROP TABLE IF EXISTS spark_catalog.default.btc");
            df.write().format("iceberg").mode("overwrite").saveAsTable("spark_catalog.default.btc");
 
            // 4. ΠŸΡ€ΠΎΠ²Π΅Ρ€ΠΊΠ° Π½Π°Ρ‡Π°Π»ΡŒΠ½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ…
            System.out.println("Initial data preview:");
            spark.sql("SELECT * FROM spark_catalog.default.btc LIMIT 10").show();
 
            // 5. Π‘ΠΎΠ·Π΄Π°Π½ΠΈΠ΅ Π²Π΅Ρ‚ΠΊΠΈ test
            spark.sql("ALTER TABLE spark_catalog.default.btc CREATE BRANCH test");
            System.out.println("Branch 'test' created");
 
            // 6. ГСнСрация ΠΈ запись Π½ΠΎΠ²Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ… Π² Π²Π΅Ρ‚ΠΊΡƒ test
            StructType schema = df.schema();
            List<Row> newData = Arrays.asList(
                    RowFactory.create(new Timestamp(System.currentTimeMillis()), 1.0f)
            );
            Dataset<Row> dfNew = spark.createDataFrame(newData, schema);
 
            dfNew.writeTo("spark_catalog.default.btc.branch_test").append();
            System.out.println("Data written to branch 'test'");
 
            // 7. ΠŸΡ€ΠΎΠ²Π΅Ρ€ΠΊΠ° количСства записСй
            System.out.println("Row count in test branch:");
            spark.sql("SELECT count(*) from spark_catalog.default.btc.branch_test").show();
 
            System.out.println("Row count in main branch:");
            spark.sql("SELECT count(*) from spark_catalog.default.btc").show();
 
            // 8. БлияниС Π²Π΅Ρ‚ΠΎΠΊ
            spark.sql("CALL spark_catalog.system.fast_forward('default.btc', 'main', 'test')");
            System.out.println("Branches merged with fast_forward");
 
            // 9. ΠŸΡ€ΠΎΠ²Π΅Ρ€ΠΊΠ° послС слияния
            System.out.println("Row count after merge:");
            spark.sql("SELECT count(*) from spark_catalog.default.btc").show();
 
            // 10. Π£Π΄Π°Π»Π΅Π½ΠΈΠ΅ Π²Π΅Ρ‚ΠΊΠΈ
            spark.sql("ALTER TABLE spark_catalog.default.btc DROP BRANCH test");
            System.out.println("Branch 'test' dropped");
 
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            spark.stop();
        }
    }
}

build.gradle

plugins {
    id 'java'
    id 'application'
    id 'com.github.johnrengelman.shadow' version '8.1.1'
}
 
group 'ru.mak_sim'
version '1.0-SNAPSHOT'
 
repositories {
    mavenCentral()
}
 
dependencies {
    // Π˜ΡΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌ Spark ΠΈΠ· ΠΈΡ‚ΠΎΠ³ΠΎΠ²ΠΎΠ³ΠΎ jar (Π±ΡƒΠ΄Π΅Ρ‚ прСдоставлСн кластСром)
    compileOnly 'org.apache.spark:spark-core_2.12:3.5.6'
    compileOnly 'org.apache.spark:spark-sql_2.12:3.5.6'
 
    // Iceberg: Ρ‚ΠΎΠ»ΡŒΠΊΠΎ spark-runtime, core ΠΈ common ΠΏΠΎΠ΄Ρ‚ΡΠ³ΠΈΠ²Π°ΡŽΡ‚ΡΡ Ρ‚Ρ€Π°Π½Π·ΠΈΡ‚ΠΈΠ²Π½ΠΎ, ΠΌΠΎΠΆΠ½ΠΎ явно (spark-runtime Π² implementation)
    implementation 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1'
 
    // JDBC-Π΄Ρ€Π°ΠΉΠ²Π΅Ρ€ для PostgreSQL
    implementation 'org.postgresql:postgresql:42.7.7'
 
    // HDFS-ΠΊΠ»ΠΈΠ΅Π½Ρ‚ (min set, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π½Π΅ Ρ‚Π°Ρ‰ΠΈΡ‚ΡŒ всё hadoop, Ρ‚ΠΎΠ»ΡŒΠΊΠΎ client)
    implementation 'org.apache.hadoop:hadoop-client:3.3.4'
}
 
application {
    mainClass = 'ru.mak_sim.iceberg_branches_demo.IcebergBranchesDemo'
}
 
java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(11) // ΠΈΠ»ΠΈ 17 Ссли поддСрТиваСтся вашим Spark
    }
}
 
shadowJar {
    zip64 = true
    mergeServiceFiles()
    manifest {
        attributes 'Main-Class': application.mainClass
    }
 
    // Π˜ΡΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌ ΠΊΠΎΠ½Ρ„Π»ΠΈΠΊΡ‚ΡƒΡŽΡ‰ΠΈΠ΅ ΠΌΠ΅Ρ‚Π°-Ρ„Π°ΠΉΠ»Ρ‹
    exclude 'META-INF/*.RSA'
    exclude 'META-INF/*.SF'
    exclude 'META-INF/*.DSA'
    // МоТно Π΄ΠΎΠ±Π°Π²ΠΈΡ‚ΡŒ Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡ‚Π΅Π»ΡŒΠ½Ρ‹Π΅ exclude/relocate ΠΏΡ€ΠΈ нСобходимости
    // ΠžΠ±Ρ‹Ρ‡Π½ΠΎ relocate для commons ΠΈ google Π½Π΅ трСбуСтся для стандартного Iceberg/Spark
}

Π‘Π±ΠΎΡ€ΠΊΠ° ΠΈ запуск

./gradlew clean shadowJar
 
spark-submit --master spark://100.64.88.58:7077 --class ru.mak_sim.iceberg_branches_demo.IcebergBranchesDemo ~/SynologyDrive/dev/IcebergBranch/build/libs/IcebergBranch-1.0-SNAPSHOT-all.jar 

Child:: Branches Iceberg. Spark submit. Result