Find Median from a Data Stream

Ethan Davis
Data Structures and Algorithms DSA
3 min readApr 2, 2024
Data Structures and Algorithms

Statement

Create a custom data structure that stores a stream of integers, and that finds the median of the integers seen so far in constant time O(1).

Constraints

  • -10⁵ ≤ n ≤ 10⁵, where n is an integer received from the data stream

Solution

"""
production algorithm
"""

from data_structures.heap.heap import Heap


class DataStreamMedian:
def __init__(self):
self.low = Heap()
self.high = Heap()

def insert(self, number):
"""
time complexity O(logn)
space complexity O(1)
"""
if self.low.empty() or self.high.empty():
self.low.push(MaxHeapData(number))
self.rebalance()
return
if number <= self.low.top().data:
self.low.push(MaxHeapData(number))
self.rebalance()
return
self.high.push(MinHeapData(number))
self.rebalance()

def median(self):
"""
time complexity O(1)
space complexity O(1)
"""
if self.low.size() == self.high.size():
return self.low.top().data / 2 + self.high.top().data / 2
return self.low.top().data

def rebalance(self):
"""
time complexity O(logn)
space complexity O(1)
"""
if self.low.size() < self.high.size():
data = self.high.pop().data
self.low.push(MaxHeapData(data))
if self.high.size() + 1 < self.low.size():
data = self.low.pop().data
self.high.push(MinHeapData(data))

def __repr__(self):
return f"(maxheap={self.low}, minheap={self.high})"


class MinHeapData:
def __init__(self, data):
self.data = data

def __lt__(self, other):
return self.data < other.data

def __repr__(self):
return str(self.data)


class MaxHeapData:
def __init__(self, data):
self.data = data

def __lt__(self, other):
return other.data < self.data

def __repr__(self):
return str(self.data)
"""
unit tests
"""

from unittest import TestCase
from algorithms.two_heaps.data_stream_median.data_stream_median import DataStreamMedian


class DataStreamMedianTestCase(TestCase):
def test_insert_strictly_increasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59]

# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)

# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)

def test_median_strictly_increasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59]
[data_stream_median.insert(number) for number in numbers]

# When
actual = data_stream_median.median()

# Then
expected = 55
self.assertEqual(actual, expected)

def test_insert_strictly_decreasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [59, 57, 55, 53, 51]

# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)

# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)

def test_median_strictly_decreasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [59, 57, 55, 53, 51]
[data_stream_median.insert(number) for number in numbers]

# When
actual = data_stream_median.median()

# Then
expected = 55
self.assertEqual(actual, expected)

def test_insert_stricly_increasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59, 61]

# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)

# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59, 61]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)

def test_median_stricly_increasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59, 61]
[data_stream_median.insert(number) for number in numbers]

# When
actual = data_stream_median.median()

# Then
expected = 56
self.assertEqual(actual, expected)

def test_insert_strictly_decreasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [61, 59, 57, 55, 53, 51]

# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)

# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59, 61]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)

def test_median_striclty_decreasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [61, 59, 57, 55, 53, 51]
[data_stream_median.insert(number) for number in numbers]

# When
actual = data_stream_median.median()

# Then
expected = 56
self.assertEqual(actual, expected)

Strategy

Two Heaps.

Explanation

The two heaps strategy is used to create the custom data structure, a max heap stores the low half of integers, and a min heap stores the high half of integers. The two heaps are balanced such that if there’s an odd number of integers seen so far then the max heap has a greater number of integers than the min heap. Therefore, if the size of the max heap and min heap are equal, i.e. there’s an even number of total integers seen so far, then then median is calculated as the mean of the top of the two heaps, otherwise the size of the max heap is greater than the size of the min heap, i.e. there’s an odd number of total integers seen so far, and so the median is at the top of the max heap.

Time Complexity

The time complexity of insertion and rebalance of the two heaps is O(logn) because these operations involve pushing to and popping from a heap. The time complexity to get the median is O(1) because the operation involves reading from the top of a heap.

Space Complexity

The auxiliary space complexity of all operations is O(1).

Links

--

--