Branches Iceberg. Pyspark
NOTE
ΠΠ΅Π±ΠΎΠ»ΡΡΠΎΠΉ ΡΠΊΡΠΏΠ΅ΡΠΈΠΌΠ΅Π½Ρ ΠΏΠΎ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΡ Π²Π΅ΡΠΎΠΊ Π² ΡΠ°Π±Π»ΠΈΡΠ°Ρ Iceberg.
Friend:: Branches Iceberg. Spark submit
-
Π‘ΠΎΠ·Π΄Π°ΡΠΌ Spark ΡΠ΅ΡΡΠΈΡ (ΠΏΡΠ΅Π΄Π²Π°ΡΠΈΡΠ΅Π»ΡΠ½ΠΎ ΡΠ±ΠΈΠ² ΡΡΠ°ΡΡΡ!!)
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»spark.stop() spark = ( SparkSession.builder .appName("IcebergBranch") .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.spark_catalog.type", "hive") .config("spark.sql.catalog.spark_catalog.uri", "thrift://100.64.88.101:9083") .config("spark.hadoop.fs.defaultFS", "hdfs://100.64.88.101:9000") .config("spark.sql.catalog.spark_catalog.warehouse", "hdfs://100.64.88.101:9000/warehouse") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .getOrCreate() ) -
ΠΠΎΠ»ΡΡΠ΅Π½ΠΈΠ΅ ΡΠ΅ΡΡΠΎΠ²ΡΡ Π΄Π°Π½Π½ΡΡ ΠΈΠ· ΡΠ°Π±Π»ΠΈΡΡ JDBC
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»db_url = "jdbc:postgresql://db.mak-sim.ru/finance" db_properties = { "user": "maksim", "password": "pas$$", "driver": "org.postgresql.Driver" } table_or_query = "(SELECT * FROM btc) as result" df = spark.read.jdbc(url=db_url, table=table_or_query, properties=db_properties) -
ΠΠ° Π²ΡΡΠΊΠΈΠΉ ΡΠ»ΡΡΠ°ΠΉ ΡΠ΄Π°Π»ΡΠ΅ΠΌ ΡΠ°Π±Π»ΠΈΡΡ ΠΈ Π·Π°ΠΏΠΈΡΡΠ²Π°Π΅ΠΌ ΡΠ΅ΡΡΠΎΠ²ΡΠ΅ Π΄Π°Π½Π½ΡΠ΅ Π² ΡΠΎΡΠΌΠ°ΡΠ΅ Iceberg.
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»spark.sql("DROP TABLE IF EXISTS spark_catalog.default.btc"); df.write.format("iceberg").mode("append").saveAsTable("spark_catalog.default.btc") -
ΠΡΠΎΠ²Π΅ΡΡΠ΅ΠΌ, ΡΡΠΎ Π²ΡΡ Π·Π°Π³ΡΡΠ·ΠΈΠ»ΠΎΡΡ ΡΡΠΏΠ΅ΡΠ½ΠΎ:
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»spark.sql("SELECT * FROM spark_catalog.default.btc LIMIT 10").show()
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»+--------------------+---------+ | date| value| +--------------------+---------+ |2024-08-26 10:56:...|5858493.0| |2024-08-26 11:05:...|5847901.0| |2024-08-27 00:20:...|5786144.0| |2024-08-27 14:40:...|5665017.0| |2024-08-28 05:00:...|5426125.0| |2024-08-28 19:20:...|5424955.0| |2024-08-29 09:40:...|5477816.0| |2024-08-30 00:00:...|5429526.0| |2024-08-30 14:20:...|5372384.0| |2024-08-26 11:06:...|5847901.0| +--------------------+---------+ -
Π‘ΠΎΠ·Π΄Π°ΡΠΌ Π½ΠΎΠ²ΡΠΉ Π±ΡΠ°Π½Ρ
test Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»spark.sql("ALTER TABLE spark_catalog.default.btc CREATE BRANCH test") -
Π‘ΠΎΠ·Π΄Π°ΡΠΌ ΡΠ΅ΡΡΠΎΠ²ΡΠΉ Π½Π°Π±ΠΎΡ Π΄Π°Π½Π½ΡΡ ΠΈ Π·Π°ΠΏΠΈΡΡΠ²Π°Π΅ΠΌΡΡ Π² Π½ΠΎΠ²ΡΠΉ Π±ΡΠ°Π½Ρ
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»from datetime import datetime data = [(datetime.now(), 1.0)] df_new = spark.createDataFrame(data, df.schema) Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»df_new.writeTo("spark_catalog.default.btc.branch_test").append() -
ΠΡΠΎΠ²Π΅ΡΡΠ΅ΠΌ ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ ΡΡΡΠΎΠΊ Π² ΠΎΡΠ½ΠΎΠ²Π½ΠΎΠΌ ΠΈ ΡΠ΅ΡΡΠΎΠ²ΠΎΠΌ Π±ΡΠ°Π½ΡΠ΅
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»spark.sql("SELECT count(*) from spark_catalog.default.btc.branch_test").show()
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»+--------+ |count(1)| +--------+ | 407917| +--------+ Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»spark.sql("SELECT count(*) from spark_catalog.default.btc").show()
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»+--------+ |count(1)| +--------+ | 407916| +--------+ -
Π‘Π»ΠΈΠ²Π°Π΅ΠΌ Π½ΠΎΠ²ΡΠΉ Π±ΡΡΠ½Ρ Π² ΠΎΡΠ½ΠΎΠ²Π½ΠΎΠΉ
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»spark.sql("CALL spark_catalog.system.fast_forward('btc', 'main', 'test')") -
ΠΡΠΎΠ²Π΅ΡΡΠ΅ΠΌ, ΡΡΠΎ Π² ΠΎΡΠ½ΠΎΠ²Π½ΠΎΠΌ Π±ΡΡΠ½ΡΠ΅ ΡΠ΅ΠΏΠ΅ΡΡ Π½ΡΠΆΠ½ΠΎΠ΅ ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ ΡΡΡΠΎΠΊ
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»spark.sql("SELECT count(*) from spark_catalog.default.btc").show()
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»+--------+ |count(1)| +--------+ | 407917| +--------+ -
Π£Π΄Π°Π»ΡΠ΅ΠΌ Π½ΠΎΠ²ΡΠΉ Π±ΡΡΠ½Ρ
Π‘ΡΡΠ»ΠΊΠ° Π½Π° ΠΎΡΠΈΠ³ΠΈΠ½Π°Π»spark.sql("ALTER TABLE spark_catalog.default.btc DROP BRANCH test")