Top Strategies for Optimizing Data Retrieval in Databricks — A Guide to Databricks Statement Execution

Databricks SQL SME
DBSQL SME Engineering
10 min readApr 18, 2024

--

Benchmark results: DBSQL Connector vs Statement API (Total Processing Time)

Author: Claudio Tito, Delivery Solutions Architect @ Databricks

Introduction

In today’s data-driven world, quickly and efficiently retrieving large volumes of data is not just an advantage but a necessity. Organizations increasingly rely on big data analytics for insights that drive decision-making, operational efficiency, and innovation. As one of the leading platforms for big data analytics, Databricks offers robust solutions for processing vast datasets at scale. This blog post delves into a comparative analysis of two primary methods for querying large data volumes (millions of records) through Databricks: the Databricks SQL Connector for Python and the Statement Execution API.

  • Databricks SQL Connector for Python (DBSQL): This connector provides a high-performance, Python-native interface for executing SQL queries on Databricks clusters. By leveraging this connector, Python applications can seamlessly interact with Databricks SQL, enabling developers to directly incorporate Databricks powerful data processing capabilities into their Python applications. Learn more about the Databricks SQL Connector for Python.
  • Statement Execution API (External Links): Designed for versatility, the Statement Execution API allows for executing SQL queries in Databricks through simple HTTP requests. This REST API provides an alternative method for querying data, particularly beneficial for applications and services not written in Python, ensuring broader accessibility to Databricks’ data processing features. Explore the Statement Execution API.

A significant part of this analysis also focuses on converting queried data into two of the industry’s most widely used formats: CSV and JSON. Understanding how these formats impact the performance of data retrieval and processing is crucial for organisations aiming to optimise their data analytics workloads.

Furthermore, the role of parallel processing in enhancing performance will be examined in detail. The Databricks SQL Connector employs thread-based parallelism, utilising the Python threading library to facilitate concurrent execution of tasks, thereby speeding up data retrieval and processing. On the other hand, external links for data retrieval leverage the concurrent.futures module, offering a modern approach to parallel execution through futures and threads or process pools.

  • Thread-Based Parallelism: Employs multiple threads to execute tasks concurrently, reducing overall processing time by leveraging multi-core processors. Python’s threading module serves as the foundation for implementing this parallelism.
  • External Links with concurrent.futures: Facilitates parallel task execution by abstracting thread management, allowing for scalable and efficient data processing. concurrent.futures module provides a high-level interface for asynchronously executing callables.

This comparative study is structured as an experiment conducted within Databricks Notebooks to minimise external influences such as network delays or JVM bottlenecks. By controlling these variables, the experiment aims to isolate the effects of thread-based parallelism, futures, and buffer size adjustments on data retrieval and processing efficiency.

Key variables in this experiment include:

  • Independent Variables: Parallel processing mechanisms (threads vs futures), buffer_size_bytes (determining the Thrift buffer size for data downloads), and query limits.
  • Dependent Variables: Total processing time (query execution plus data conversion), query time (for the Databricks SQL Connector) or time to retrieve external links, and conversion time (either multithreaded processing or future processing time for external links).

Efficient data retrieval is paramount in data-intensive environments like Databricks. Whether improving operational processes, deepening analytical insights, or enhancing real-time interactions with applications, the ability to swiftly navigate and process large datasets stands at the core of contemporary data strategies. This blog aims to illuminate how organisations can achieve these competencies, offering a guide to optimising data retrieval within the expansive landscape of Databricks.

Context

Code Architecture — Streamlining Data Retrieval and Conversion in Databricks:

Our exploration is structured around five distinct templates:

  • Single-Threaded CSV, Single-Threaded JSON
  • Multi-thread CSV, Multi-thread JSON
  • External links JSON/CSV

Each serves as a blueprint for our experiments. The architecture of our code bifurcates into two main segments (retrieval and conversion), tailored to harness the capabilities of the Databricks SQL Connector and External links effectively:

Retrieval: At the core of our retrieval process lies the execution of queries through the SQL Connector and the fetching of external links, which paves the way for the subsequent transformation phase. This approach allows for a streamlined data handling approach, catering to SQL-based data extraction and direct link retrieval mechanisms.


#Databricks SQL Connector Retrieval

#connection.lz4_compression = False


with connection.cursor(buffer_size_bytes=tuned_buffer_size_bytes) as cursor:
start_query = time.time()
cursor.execute(statement)
end_query = time.time()
total_query_time=end_query-start_query
print(f"Query completed in {total_query_time:.1f} seconds")

Conversion: On the other hand, the transformation segment is where the data undergoes conversion from Arrow to CSV/JSON for data fetched via the DBSQL Connector and from raw data to JSON/CSV for data gathered through external links.

#External Links Conversion to JSON/CSV

def save_json_from_futures(futures, output_file_path):
all_data = [] # Accumulate all JSON data here
# Process futures and accumulate JSON data
for future in cf.as_completed(futures):
response = future.result()
if response.status_code == 200:
data = json.loads(response.content.decode('utf-8'))
all_data.extend(data)


# Now, write all accumulated JSON data to file
with open(output_file_path, 'w') as output_file:
json.dump(all_data, output_file)

def save_csv_from_futures(futures, output_file_path):
# Open the output CSV file in write mode
with open(output_file_path, 'w', newline='') as output_file:
writer = None


# Will be initialized with the CSV header from the first response

for future in cf.as_completed(futures):
response = future.result()
if response.status_code == 200:
# Convert the content to a string
content = response.content.decode('utf-8')
# Use csv.reader to read the content
csv_reader = csv.reader(content.splitlines())
header, *rows = list(csv_reader)
# Initialize the CSV writer with the header from the first response
if writer is None:
writer = csv.writer(output_file)
writer.writerow(header)
# Write rows to the output CSV file
writer.writerows(rows)

Notably, the conversion exploits single or multithreaded DBSQL processes while always leaning on futures for external links, reflecting a tailored approach to parallel processing.

  • JSON Conversion: The transformation from Arrow to JSON leverages the pyarrow library. Despite PyArrow’s robust capabilities in data manipulation and conversion, it necessitates an intermediate transformation to pandas DataFrames for JSON conversion due to the absence of a direct method for this specific format conversion within PyArrow.
#Databricks SQL Connector JSON Conversion

def convert_to_json(result: pa.Table):
df = result.to_pandas()
original_data = df.to_json(orient='index', date_format='iso').encode()
original_size_mb = len(original_data) / (1024 ** 2)
print(f"Processed {len(result)} rows, size {original_size_mb:.0f}MB")

#Save the JSON to a File
with open(file_path, 'wb') as f:
f.write(original_data)
return original_size_mb
  • CSV Conversion: For CSV formats, pyarrow.csv provides a direct pathway from Arrow to CSV, showcasing the library’s versatility and direct support for CSV conversions.
#Databricks SQL Connector CSV Conversion

def convert_to_csv(result: pa.Table, file_path: str):
pc.write_csv(result, file_path)
file_size_bytes = os.path.getsize(file_path)
file_size_mb = file_size_bytes / (1024 ** 2)
print(f"Processed {len(result)} rows")
return file_size_mb

All templates are designed to save files directly onto a Databricks Volume, and for multithreaded processing with the DBSQL Connector, this design choice also aims to maximise disk throughput by saving a single file for each iteration, as this approach tries to underscore the commitment of such experiments to performance optimisation.

A Testbed Engineered for Precision:

The backbone of our experiments is a carefully configured testbed, comprising Databricks Notebooks strategically situated in the same region as our compute resources: a Large SQL Serverless instance dedicated to retrieval tasks and a robust single-node cluster for conversions. The Compute Cluster leverages a Single Node i3.xlarge configuration with 30GB of RAM and 4 CPU Cores and is equipped with the 13.3LTS Databricks runtime, including Apache Spark™ 3.4.1 and Scala 2.12.

This dual-compute setup minimizes potential network delays and aligns the computing power with the task at hand, as the large SQL Serverless instances efficiently handle the retrieval. At the same time, the single-node cluster, with its specified hardware specs, is optimized for the conversion tasks:

#Multithreading CSV Conversion
# Initialize a queue to hold results from threads
results_queue = queue.Queue()




#Multi_Thread_JSON
start_multi_thread=time.time()


chunck_index=0
while True:
result = cursor.fetchmany_arrow(multi_thread_fetch)


if len(result) == 0:
break

file_path=f"/file_path.csv"
thread = Thread(target=convert_to_csv, args=(result,results_queue, file_path))
thread.start()
threads.append(thread)
chunck_index+=1


for thread in threads:
thread.join()

end_multi_thread=time.time()

File Size and Network Traffic

In addressing the impact of our data processing strategies, we delve into network and file size analyses across 90 experimental runs. We are utilising scapy.all import rdpcap (https://scapy.readthedocs.io/en/latest/usage.html) to meticulously measure the network dump in MB, offering a granular view of the network traffic induced by each processing strategy.

#Packet Sizing with SCAPY (needs Root grants)

def start_tcpdump(capture_file="capture.pcap"):
command = ["sudo", "tcpdump", "-w", capture_file, "-i", "any", "-U", "not port 22"]
process = subprocess.Popen(command)
return process

def stop_tcpdump(process):
process.terminate()
process.wait() # Wait for the process to terminate

def analyze_capture(capture_file="capture.pcap"):
command = ["tcpdump", "-r", capture_file, "-nn", "-q"]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)

def analyze_pcap_with_scapy(pcap_file):
packets = rdpcap(pcap_file)
total_size = sum(len(packet) for packet in packets)/(1024 ** 2)
print(f"Total packets: {len(packets)}")
print(f"Total size: {total_size} MB")

This detailed analysis, coupled with the evaluation of uncompressed Arrow sizes and final disk file sizes, paints a vivid picture of the data lifecycle and its footprint across our infrastructure:

  • This analysis sheds light on the nuanced differences in network traffic generated by the Databricks SQL Connector and external links, each employing distinct compression techniques (LZ4 and GZIP, respectively).
  • Examining uncompressed Arrow sizes, final disk file sizes and the implications of (simulated) GZIP compression on externally linked data offers a holistic view of data volume and network utilisation.

Below is a visualization of total file sizes displayed by connector type:

author screenshot
author screenshot

Timely Insights — Measuring Efficiency in Seconds:

The experiments are performed to be rigorously timed, capturing the amount (in seconds) of total processing, retrieval, and conversion durations. This temporal measurement provides a clear lens through which the efficiency and performance of each template can be assessed, ultimately guiding our understanding of the optimal strategies for data retrieval and conversion within Databricks.

Results

Our test results will be visualised by the interface of Databricks Lakeview, offering a granular view of the performance differentials across several key metrics.

1) Databricks SQL Connector vs. External Links

Our investigation reveals a notable trend: the experimental data shows the Databricks SQL Connector consistently outpacing external links regarding processing speed. The following line charts, illustrating the average total processing time, retrieval time, and conversion time, underscore this discrepancy. As the volume of data grows, so does the performance gap between these two approaches, with the Databricks SQL Connector maintaining its lead.

Benchmark results: DBSQL Connector vs Statement API (Total Processing Time)
Benchmark results: DBSQL Connector vs Statement API (Avg Query Time)
Benchmark results: DBSQL Connector vs Statement API (Avg File Conversion Time)

This divergence can be partially attributed to the more significant network dump associated with external links, which introduces substantial overhead and potentially hampers the speed of data retrieval and processing.

Benchmark Results: DBSQL Connector vs Statement API (Avg Network Dump File Size)
Benchmark Results: DBSQL Connector vs Statement API (Avg Disk File Size)

CSV vs. JSON Conversion Results

Across all tested scenarios, converting data to CSV format emerged as the swifter operation compared to JSON conversion. This efficiency spans the average total processing time and conversion time metrics while it edges to the same values for retrieval time.

Benchmark Results: DBSQL Connector vs Statement API (By Data Type)

The inherent simplicity of the CSV format, coupled with PyArrow’s direct handling of Arrow-to-CSV conversions, likely contributes to this speed advantage. Additionally, CSV files exhibit a smaller footprint on disk storage, further emphasising the format’s efficiency.

3) Impact of Query Buffer Size

The configuration of the query buffer size plays a pivotal role in optimising processing times. The experimental results suggest a clear trend: larger datasets benefit from increased buffer sizes, with substantial time savings observed. For instance, adjusting the buffer size to five times its default value when processing 5 million rows resulted in a time reduction of approximately 2 seconds from the default buffer size setting. This finding highlights the buffer size’s impact on data processing efficiency, especially as data volume grows.

4) Multithreading with the DBSQL Connector

Multithreading with the DBSQL Connector significantly enhances processing times, particularly for CSV files. For datasets comprising 5 million rows, adopting multithreading yielded an average time savings of 24 seconds for CSV conversion compared to single-threaded processing. JSON files also benefited from multithreading, although to a lesser extent, with an average time reduction of 3 seconds. These results underscore the value of parallel processing in accelerating data conversion tasks.

Conversion Time By Parallelism Type

Conclusions

Our comprehensive examination of various data retrieval and processing methodologies within the Databricks environment has yielded insightful revelations, particularly highlighting the Databricks SQL Connector’s superior performance in the contexts of our testbed, data volume, and network requirements when compared to External Links.

The flexibility of the API and the inherent characteristics of different file formats have also surfaced as critical factors influencing processing efficiency. Notably, certain file types, such as CSV, have demonstrated a propensity for quicker processing and storage times compared to others, like JSON:

  • This variance underscores the importance of selecting the appropriate file format based on specific use cases and performance objectives, as it significantly impacts the overall efficiency of data handling operations.

Looking forward, exploring buffer_size_bytes presents an intriguing avenue for further investigation. An in-depth analysis of larger buffer sizes could offer valuable insights into their impact on hosting compute performance and overall processing time, potentially unveiling optimisation strategies that further enhance data processing workflows.

Moreover, conducting these experiments from a client perspective to gauge the effects of canonical networking delays on processing times emerges as a compelling next step. Such an approach could provide a more holistic understanding of the performance dynamics in real-world scenarios where network latency plays a crucial role.

--

--

Databricks SQL SME
DBSQL SME Engineering

One stop shop for all technical how-tos, demos, and best practices for building on Databricks SQL