Navigating the Iceberg: unit testing iceberg tables with Pyspark
The table format iceberg has gained a lot traction and has caused great excitement across the data-landscape. The architecture allows interesting features like time travelling, acid transactions, partition evolution, schema evolution, data compaction, … , making it a great option for your data lake. So to my surprise when looking for documentation and guides regarding setting up your development environment for execution and testing of iceberg tables using pyspark, I ended up empty handed.
That is precisely why I wrote this blog post. In the upcoming sections, we immerse ourselves in the depths of iceberg execution in an isolated development environment, it’s significance, establishing an Iceberg environment with PySpark, and employing them effectively in unit tests. By journey’s end, you’ll adeptly navigate the challenges of local Icebergs. Let’s Dive! 🤿
Why a execution in a development environment is important
Let’s first take a step back and recap why it is important to be able to execute your code in a development environment. An effective approach to convey this with clarity is by linking it to the data product lifecycle. If you are not familiar with this concept I suggest you read the following enlightening post.
So when creating a new product or advancing on one, one would always need to go trough the experimentation phase, and this goes hand in hand with execution inside of your development environment. It provides a safe space to test various approaches without affecting production data or systems.
I actively chose to refer to a development environment rather than a “local” environment. Developers today have the luxury of using remote IDE’s or notebooks which are often a much better representation of the production environment while still having freedom. If you have a microservices setup, you can use the same images and containers for your development environment as you would for your production environment. This closes the gap and and makes sure to mitigate as much as possible the infamous “It Works on My Machine” paradigm.
When building a data product it is very important to keep your iterations this cycle as short as possible. If you want to reach your 10 deploys a day, you need to be able to iterate quickly. Most often than not, developing and experimenting on testing and acceptance systems is slow and feedback takes a long time. When you can test your entire application inside your own development environment, you receive feedback much more rapidly, enabling faster iterations over this lifecycle.
When you are done with experimentation you industrialise your product, with that comes — drum rolls- ✨ TESTS✨. Our favorite part of engineering (after this blogpost at least)! When writing tests it is good practice to follow the practical-test-pyramid metaphor by Martin Fowler. If you haven’t read that post, I highly suggest you do. The posts describes the details of building effective and readable automated tests. Not all test types can be automated or are as easy to automate. It comes down to the more integration is required within the test the slower it is to develop and run them, consequently the lesser tests you have. The opposite also holds true, you should have lots of unit tests that are quick to make and run, thus have a high level of isolation.
In other words, you need a lot of automated unit tests in an isolated environment. Also I mentioned earlier how important it is to reflect the production systems as much as possible, a good DevOps practice is to integrate these unit tests into your CICD. As a result you have automated where the gap between development and production environments are very close.
So we see the importance of unit tests, this brings me to the next part. In the following sections I will show you how to easily create unit tests with Iceberg using pyspark in an isolated environment.
Establishing an isolated Iceberg environment with PySpark
In this section we shortly go over the configuration setup of spark to be able to run iceberg on any development environment and develop our tests. In this example use case we make use of Pyspark 3.4 and everything is ran on macOS. All code that is shown in this post can be found on Github.
To work with iceberg tables in pyspark you need some extra configuration in your spark session. The first thing you need is the iceberg-spark-runtime
jar. You can find them all in the maven repo, however make sure to get the correct jar depending on your version of spark, scala and iceberg. In my case: iceberg-spark-runtime-3.4_2.12-1.3.0.jar. You can make sure spark gets the jar at runtime using the following config line:
spark_builder.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0'
With some help of the iceberg documentation and trial and error I got the following snippet to create my iceberg session. I created my session within a fixture so I can easily reuse it in all of my tests. If it is unclear why some settings are needed, I think it is best to read the Iceberg documentation in detail!
warehouse_path: str = "./warehouse"
default_database_name: Final[str] = "my_db"
@pytest.fixture(scope="session")
def spark() -> SparkSession:
spark_builder = SparkSession.builder.appName("test_session")
spark_builder.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
spark_builder.config(
f"spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog"
)
spark_builder.config(f"spark.sql.catalog.local.type", "hadoop")
spark_builder.config(f"spark.sql.catalog.local.warehouse", warehouse_path)
spark_builder.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0')
spark_builder.config("spark.sql.catalog.local.default.write.metadata-flush-after-create", "true")
spark_builder.config("spark.sql.defaultCatalog", "local")
spark: SparkSession = spark_builder.getOrCreate()
spark.sql(f"CREATE DATABASE IF NOT EXISTS {default_database_name}")
yield spark
spark.stop()
In this snippet we also create a database — spark.sql(f”CREATE DATABASE IF NOT EXISTS {default_database_name}”)
— to do all of our work in. Depending on your use case you might need to create more or less but in this dummy application we will just work with one database so it is fine to create it together with our session.
Since we use Hadoop as our catalog type (see config), we create a local warehouse structure every time we run queries. The path under which it gets created is in the variable warehouse_path
. This way of working has some consequences, the first being that every time we execute code that creates a table or updates one, it is persisted locally. Meaning that the next session you run your tests, all your previous results will still be there. As a result, your tests are not idempotent. A simple solution is to delete the warehouse locally every time you complete a session. You can see an example on how to do this in the following pytest fixture:
@pytest.fixture(scope="session")
def cleanup_warehouse(request):
yield
# Remove the ./warehouse folder after the session tests are done
if os.path.exists(warehouse_path):
shutil.rmtree(warehouse_path)
Another consequence is the manner in which the tables are written to your local filesystem. By default they are only persisted after the execution of the code and when the session is done, however when testing this is not really desired since you need to validate it after writing. The way we move around this is, is by adding the following line to the spark config:
spark_builder.config("spark.sql.catalog.local.default.write.metadata-flush-after-create", "true"
Note that it is already present in our config above but for clarity reasons I wanted to underline this configuration option.
Prior to test creation, I will develop three helper methods to aid in application validation. These methods cover two for crafting an Iceberg table from a dataframe and one to verify table existence in our database.
def create_iceberg_table_from_dataframe(spark: SparkSession, data: DataFrame, table_name: str,
database_name: str, ):
data.createOrReplaceTempView("dataframe")
query: str = construct_create_or_replace_table_query(
glue_database=database_name,
glue_table=table_name,
)
spark.sql(query)
def table_exists_in_database(
spark: SparkSession, database: str, table: str
) -> bool:
spark.sql(f"USE {database}")
tables_collection = spark.sql(
"SHOW TABLES"
).collect()
table_names_in_db = [row["tableName"] for row in tables_collection]
return table in table_names_in_db
def construct_create_or_replace_table_query(
glue_database: str,
glue_table: str,
):
return f"""
CREATE OR REPLACE TABLE {glue_database}.{glue_table}
USING ICEBERG
AS SELECT * from dataframe
"""
Now we have our configuration and environment in place it is time to write some tests! ✍️
Unit Testing my Iceberg application
So let’s create a dummy example, suppose we have a windmill that generates power and we can capture these in the form of measurements. We are interested in the latest measurements at all time. We would like a table with all of those measurement inside it so we can run some analysis on it. It would be handy if we could simply upserts our new records for each windmill so we don’t have to keep all of our measurements , and that is where iceberg comes into play.
In the following snippet we define a data class WindMillMeasurement
that represents a single measurement with a schema attached to it. We also define a class for our table WindMillPowerTable
where we include a method with a merge into statement according to our application at hand.
We will create two tests regarding this table, the first one that checks if the table is actually created, and the second one that tests our MERGE INTO
.
@dataclass
class WindmillMeasurement:
windmill_id: str
timestamp_start: datetime
power_generated_kwh: float
@classmethod
def schema(cls):
return StructType([
StructField("windmill_id", StringType(), False),
StructField("timestamp_start", TimestampType(), False),
StructField("power_generated_kwh", FloatType(), False),
])
class WindMillPowerTable:
database_name: str
table_name: str = "windmill_power_timeseries"
def __init__(self, database_name: str, ):
self.database_name = database_name
def merge_windmill_measurements(self, spark: SparkSession, windmill_measurements: Iterable[WindmillMeasurement]):
data = spark.createDataFrame(windmill_measurements, schema=WindmillMeasurement.schema())
if not table_exists_in_database(spark=spark, database=self.database_name, table=self.table_name):
create_iceberg_table_from_dataframe(data=data, table_name=self.table_name, database_name=self.database_name,
spark=spark)
else:
data.createOrReplaceTempView("source")
spark.sql(
f"""MERGE INTO {self.database_name}.{self.table_name} target
USING (SELECT * FROM source) source
ON source.windmill_id = target.windmill_id
WHEN MATCHED AND source.timestamp_start > target.timestamp_start THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Since we might want to create our table for the two scenarios we can create a fixture out of it, that way we need to define it only once and we can use it in our two tests. Note that our scope is limited to one test execution and not to the whole session.
class TestWindMillPowerTable:
@pytest.fixture(scope="function")
def windmill_power_table(self, spark):
data = [
WindmillMeasurement("1", datetime(2023, 1, 1, 0, 0, 0), 1.0),
WindmillMeasurement("2", datetime(2023, 1, 1, 0, 0, 0), 1.0),
]
windmill_power_table: WindMillPowerTable = WindMillPowerTable(database_name=default_database_name)
windmill_power_table.merge_windmill_measurements(spark=spark, windmill_measurements=data)
yield
spark.sql(f"DROP TABLE {default_database_name}.{WindMillPowerTable.table_name}")
This will create a table in our database with the given data. But we didn’t test yet if if our table is actually written, so let’s check and add our first test to our class. And after executing we can confirm our table is present with the correct data.
def test_wind_mill_power_table(cls, spark, cleanup_warehouse, windmill_power_table):
expected = spark.createDataFrame([
WindmillMeasurement("1", datetime(2023, 1, 1, 0, 0, 0), 1.0),
WindmillMeasurement("2", datetime(2023, 1, 1, 0, 0, 0), 1.0),
], schema=WindmillMeasurement.schema())
result: DataFrame = spark.table(WindMillPowerTable.table_name)
#Here I use chispa which is a handy library to compare dataframes
assert_approx_df_equality(result, expected, precision=1e-5, ignore_nullable=True)
For our second scenario, we can reuse the fixture that creates the table and merge some extra records into it to check our logic. Low and behold, we have tested our application that uses Iceberg Tables using Pyspark! ⭐️
def test_merge_wind_mill_power_table(self, spark, windmill_power_table,cleanup_warehouse):
data = [
# Existing measurement but the same timestamp -> not update
WindmillMeasurement("1", datetime(2023, 1, 1, 0, 0, 0), 2.0),
# Existing measurement with a newer timestamp -> update
WindmillMeasurement("2", datetime(2023, 1, 1, 0, 15, 0), 2.0),
# New Measurement
WindmillMeasurement("3", datetime(2023, 1, 1, 0, 15, 0), 1.0),
]
expected = spark.createDataFrame([
WindmillMeasurement("1", datetime(2023, 1, 1, 0, 0, 0), 1.0),
WindmillMeasurement("2", datetime(2023, 1, 1, 0, 15, 0), 2.0),
WindmillMeasurement("3", datetime(2023, 1, 1, 0, 15, 0), 1.0),
], schema=WindmillMeasurement.schema())
windmill_power_table: WindMillPowerTable = WindMillPowerTable(database_name=default_database_name)
windmill_power_table.merge_windmill_measurements(spark=spark, windmill_measurements=data)
result: DataFrame = spark.table(WindMillPowerTable.table_name)
#Here I use chispa which is a handy library to compare dataframes
assert_approx_df_equality(result, expected, precision=1e-5, ignore_nullable=True)
The cases we tested here were just dummy example but it should give you a head start to create some test of your own. Next to testing your application, it is also very interesting to test/play around with all the different spark options regarding iceberg. It is not always easy or plausible to test them on the real systems, so having a good understanding before you industrialise it is a good practice!
So, I hope this post helped you to navigate your iceberg and encouraged you to write some tests for your Iceberg-Application! Good luck diving! ❄️ 🤿