Find Median from a Data Stream
Statement
Create a custom data structure that stores a stream of integers, and that finds the median of the integers seen so far in constant time O(1)
.
Constraints
- -10⁵ ≤
n
≤ 10⁵, wheren
is an integer received from the data stream
Solution
"""
production algorithm
"""
from data_structures.heap.heap import Heap
class DataStreamMedian:
def __init__(self):
self.low = Heap()
self.high = Heap()
def insert(self, number):
"""
time complexity O(logn)
space complexity O(1)
"""
if self.low.empty() or self.high.empty():
self.low.push(MaxHeapData(number))
self.rebalance()
return
if number <= self.low.top().data:
self.low.push(MaxHeapData(number))
self.rebalance()
return
self.high.push(MinHeapData(number))
self.rebalance()
def median(self):
"""
time complexity O(1)
space complexity O(1)
"""
if self.low.size() == self.high.size():
return self.low.top().data / 2 + self.high.top().data / 2
return self.low.top().data
def rebalance(self):
"""
time complexity O(logn)
space complexity O(1)
"""
if self.low.size() < self.high.size():
data = self.high.pop().data
self.low.push(MaxHeapData(data))
if self.high.size() + 1 < self.low.size():
data = self.low.pop().data
self.high.push(MinHeapData(data))
def __repr__(self):
return f"(maxheap={self.low}, minheap={self.high})"
class MinHeapData:
def __init__(self, data):
self.data = data
def __lt__(self, other):
return self.data < other.data
def __repr__(self):
return str(self.data)
class MaxHeapData:
def __init__(self, data):
self.data = data
def __lt__(self, other):
return other.data < self.data
def __repr__(self):
return str(self.data)
"""
unit tests
"""
from unittest import TestCase
from algorithms.two_heaps.data_stream_median.data_stream_median import DataStreamMedian
class DataStreamMedianTestCase(TestCase):
def test_insert_strictly_increasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59]
# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)
# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)
def test_median_strictly_increasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59]
[data_stream_median.insert(number) for number in numbers]
# When
actual = data_stream_median.median()
# Then
expected = 55
self.assertEqual(actual, expected)
def test_insert_strictly_decreasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [59, 57, 55, 53, 51]
# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)
# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)
def test_median_strictly_decreasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [59, 57, 55, 53, 51]
[data_stream_median.insert(number) for number in numbers]
# When
actual = data_stream_median.median()
# Then
expected = 55
self.assertEqual(actual, expected)
def test_insert_stricly_increasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59, 61]
# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)
# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59, 61]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)
def test_median_stricly_increasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59, 61]
[data_stream_median.insert(number) for number in numbers]
# When
actual = data_stream_median.median()
# Then
expected = 56
self.assertEqual(actual, expected)
def test_insert_strictly_decreasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [61, 59, 57, 55, 53, 51]
# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)
# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59, 61]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)
def test_median_striclty_decreasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [61, 59, 57, 55, 53, 51]
[data_stream_median.insert(number) for number in numbers]
# When
actual = data_stream_median.median()
# Then
expected = 56
self.assertEqual(actual, expected)
Strategy
Two Heaps.
Explanation
The two heaps strategy is used to create the custom data structure, a max heap stores the low half of integers, and a min heap stores the high half of integers. The two heaps are balanced such that if there’s an odd number of integers seen so far then the max heap has a greater number of integers than the min heap. Therefore, if the size of the max heap and min heap are equal, i.e. there’s an even number of total integers seen so far, then then median is calculated as the mean of the top of the two heaps, otherwise the size of the max heap is greater than the size of the min heap, i.e. there’s an odd number of total integers seen so far, and so the median is at the top of the max heap.
Time Complexity
The time complexity of insertion and rebalance of the two heaps is O(logn)
because these operations involve pushing to and popping from a heap. The time complexity to get the median is O(1)
because the operation involves reading from the top of a heap.
Space Complexity
The auxiliary space complexity of all operations is O(1)
.