Sitemap

Broadcast variables in spark work like they sound

7 min readJun 21, 2023
Press enter or click to view image in full size

Let us first look at what broadcast variables are.

Broadcast variables as the name suggest are ‘broadacast’ to the nodes of the spark cluster to avoid shuffle operations. And they are read only.

Broadcast variables are an essential component of distributed computing frameworks, such as Apache Spark and Apache Flink. They serve as a mechanism to efficiently share immutable data structures across a cluster of machines, minimizing the need for redundant data transfers. Broadcast variables are read-only and are cached on each machine that participates in the distributed computation, ensuring quick and easy access.

What do they do ?

The primary purpose of broadcast variables is to address the challenge of data replication and distribution in distributed systems. Instead of replicating large datasets across multiple nodes, which can be both time-consuming and resource-intensive, broadcast variables enable the efficient transfer of data to all the machines in the cluster. By doing so, broadcast variables eliminate the need for repetitive data transfers and improve the performance of distributed computations.

Benefits and Use Cases

1. Enhanced Performance: Broadcast variables significantly enhance the performance of data-intensive applications by minimizing data transfer overhead. By distributing data to the machines beforehand, subsequent operations can access the data locally, eliminating the need for network communication and reducing latency.

2. Memory Optimization: Since broadcast variables are cached on each machine, they enable memory optimization by avoiding unnecessary duplication of data. This aspect becomes especially valuable when dealing with large datasets, as memory resources can be better utilized by storing data only once.

3. Common Data Sharing: Broadcast variables are useful in scenarios where multiple operations or tasks need to access the same data simultaneously. By broadcasting the data, it becomes readily available to all the tasks, ensuring consistency and avoiding redundant data transfers.

4. Customizable Initialization: Broadcast variables allow for customizable initialization, enabling developers to define the behavior of the variable during the distributed computation. This flexibility ensures that the data is appropriately prepared and ready for processing.

Example Usage

Let’s consider a practical example to illustrate the usefulness of broadcast variables. Suppose we have a distributed machine learning application that requires access to a pre-trained model. Instead of replicating the model across all the machines, which can consume a significant amount of network bandwidth and memory, we can broadcast the model as a variable. This approach allows each machine to access the model locally, reducing network overhead and improving overall performance.

Good, Now we have some idea about broadcast variables in spark.

Here is a code example of how to create a broadcast variable

import org.apache.spark.{SparkConf, SparkContext}
object BroadcastExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(“Broadcast Example”).setMaster(“local”)
val sc = new SparkContext(conf)
val broadcastVar = sc.broadcast(Array(1, 2, 3, 4, 5))
val rdd = sc.parallelize(Array(2, 3, 4, 5, 6))
val result = rdd.map(x => x * broadcastVar.value(0))
result.foreach(println)
sc.stop()
}
}

Here, we initialize a broadcast variable broadcastVar with an array [1, 2, 3, 4, 5]. We then create an RDD rdd with elements [2, 3, 4, 5, 6] and use the broadcast variable to multiply each element of rdd. The result is printed, demonstrating how the broadcast variable enables the distribution and usage of data across multiple machines efficiently.

Some Real-Life Use Cases of using broadcast variables:

  1. Machine Learning Models: Broadcasting pre-trained machine learning models, such as neural networks or decision trees, allows each machine in a cluster to access the model locally during the prediction phase. This avoids redundant transfers of the model and improves the inference performance.
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.linalg.Vectors

object BroadcastMachineLearningModel {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Broadcast Machine Learning Model").setMaster("local")
val sc = new SparkContext(conf)

// Initialize and train the machine learning model
val model = new LogisticRegression().setMaxIter(10).fit(
Seq((0.0, Vectors.dense(1.0)), (1.0, Vectors.dense(2.0)), (1.0, Vectors.dense(3.0)))).toDF("label", "features")

// Broadcast the trained model
val broadcastModel = sc.broadcast(model)

// Simulate new data for prediction
val newData = Seq(Vectors.dense(4.0), Vectors.dense(5.0), Vectors.dense(6.0))
val rdd = sc.parallelize(newData)

// Make predictions using the broadcasted model
val predictions = rdd.map(broadcastModel.value.transform)

predictions.show()

sc.stop()
}
}

In above example, we train a logistic regression model using the Spark MLlib API. Once the model is trained, we broadcast it using the sc.broadcast() method. Then, we simulate new data and make predictions using the broadcasted model. This approach allows each machine to access the trained model locally during the prediction phase, improving efficiency.

2. Lookup Tables or Reference Data: In data processing pipelines, there are often lookup tables or reference data that are used for enrichment or filtering operations. Broadcasting these tables allows all nodes to access them locally, reducing network overhead and improving query performance.

import org.apache.spark.{SparkConf, SparkContext}

object BroadcastLookupTable {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Broadcast Lookup Table").setMaster("local")
val sc = new SparkContext(conf)

// Create a lookup table
val lookupTable = Map("A" -> 1, "B" -> 2, "C" -> 3)

// Broadcast the lookup table
val broadcastTable = sc.broadcast(lookupTable)

// Simulate data with keys
val data = Seq("A", "B", "C", "B", "A")
val rdd = sc.parallelize(data)

// Use the broadcasted lookup table for value retrieval
val result = rdd.map(broadcastTable.value)

result.foreach(println)

sc.stop()
}
}

Above we created a lookup table as a Scala Map containing key-value pairs. We then broadcast the lookup table using the sc.broadcast() method. Next, we simulate data with keys and create an RDD from the data. Finally, we use the broadcasted lookup table to retrieve the corresponding values for each key in the RDD. Broadcasting the lookup table allows all nodes to access the data locally, improving performance when performing operations based on the lookup table.

Using Broadcast with spark SQL

Yes, broadcast variables can be used in Spark SQL as well. In fact, Spark SQL provides seamless integration with broadcast variables, allowing you to leverage their benefits in SQL queries and DataFrame operations.

To use broadcast variables in Spark SQL, you can follow these steps:

1. Create a broadcast variable using the `sparkContext.broadcast()` method and specify the data you want to broadcast. For example:

val broadcastVar = spark.sparkContext.broadcast(Array(1, 2, 3, 4, 5))

2. Register the broadcast variable as a temporary table or view in Spark SQL. This step allows you to reference the broadcast variable directly in SQL queries or DataFrame operations. For example:


spark.sql(s”CREATE OR REPLACE TEMPORARY VIEW broadcast_table AS SELECT * FROM table WHERE column IN (${broadcastVar.value.mkString(“,”)})”)

3. Use the broadcast variable in your SQL queries or DataFrame operations as needed. You can reference the broadcast variable directly in your SQL queries or use it within DataFrame transformations. For example:


val result = spark.sql(“SELECT * FROM table JOIN broadcast_table ON table.column = broadcast_table.column”)

By utilizing broadcast variables in Spark SQL, you can optimize your queries by broadcasting smaller datasets or lookup tables to all worker nodes, reducing network communication and improving query performance.

It’s worth noting that Spark SQL automatically optimizes queries involving broadcast variables by deciding whether to broadcast the variable or not based on the size of the data and the available resources. However, one can also use the spark.sql.broadcastTimeout configuration property to control the broadcast timeout threshold.

import org.apache.spark.sql.{SparkSession, DataFrame}

object BroadcastStaticTable {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Broadcast Static Table")
.master("local")
.getOrCreate()

// Create a static table DataFrame
val staticTableData = Seq(("A", 1), ("B", 2), ("C", 3))
val staticTableDF = spark.createDataFrame(staticTableData).toDF("key", "value")

// Broadcast the static table DataFrame
val broadcastStaticTable = spark.sparkContext.broadcast(staticTableDF)

// Use the broadcasted static table in a SQL query
val result = spark.sql("SELECT t1.*, t2.value FROM table1 t1 JOIN broadcast_table t2 ON t1.key = t2.key")

result.show()

spark.stop()
}
}

In the above example, we create a static table DataFrame staticTableDF using the createDataFrame method and provide the data as a Seq. We then broadcast the static table DataFrame using spark.sparkContext.broadcast().

Next, we can use the broadcasted static table in a SQL query. In this case, we join table1 with the broadcasted table broadcast_table based on a common key column. The result is a DataFrame that includes the columns from table1 as well as the value column from the broadcasted static table.

By broadcasting the static table, Spark SQL ensures that the table data is efficiently distributed and accessible on each worker node. This helps in improving query performance by avoiding redundant data transfers and reducing network overhead.

Overall, using a static table as a broadcast variable in Spark SQL enables efficient data distribution and improves the performance of SQL queries involving lookup or reference data.

Be careful when not to use broadcast variables !

Although broadcast variables are powerful tools for optimizing distributed computations, there are scenarios where using them may degrade performance instead. One such example is when the size of the broadcasted data exceeds the available memory on the worker nodes, leading to excessive memory consumption and potential performance issues.

below is an example where broadcasting a large dataset may not be an optimal solution:

import org.apache.spark.sql.SparkSession
object BroadcastLargeDataset {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Broadcast Large Dataset")
.master("local")
.getOrCreate()
// Create a large dataset
val largeData = 1 to 1000000
// Broadcast the large dataset
val broadcastLargeData = spark.sparkContext.broadcast(largeData)
// Perform some operations using the broadcasted large dataset
// …
spark.stop()
}
}

In this example, we attempt to broadcast a large dataset with one million elements. However, if the worker nodes do not have sufficient memory to accommodate this dataset, it can lead to excessive memory consumption and potentially degrade performance.

Broadcasting large datasets can cause memory pressure on the worker nodes, as they need to store the entire dataset in memory. This may result in increased garbage collection overhead, longer task execution times, and even OutOfMemoryError exceptions if the available memory is exhausted.

In such cases, it’s recommended to explore alternative strategies for sharing data, such as leveraging data partitioning or using distributed caching mechanisms. These approaches can provide better performance and memory utilization by distributing the data across the cluster in a more scalable and efficient manner.

It’s essential to carefully analyze the size of the dataset, available resources, and the specific requirements of the computation to determine whether broadcasting is the appropriate approach or if other methods should be considered to avoid performance degradation.

--

--

No responses yet