Find Median from a Data Stream

3 min readApr 2, 2024
Create a custom data structure that stores a stream of integers, and that finds the median of the integers seen so far in constant time O(1).


  • -10⁵ ≤ n ≤ 10⁵, where n is an integer received from the data stream


production algorithm

from data_structures.heap.heap import Heap

class DataStreamMedian:
def __init__(self):
self.low = Heap()
self.high = Heap()

def insert(self, number):
time complexity O(logn)
space complexity O(1)
if self.low.empty() or self.high.empty():
if number <=

def median(self):
time complexity O(1)
space complexity O(1)
if self.low.size() == self.high.size():
return / 2 + / 2

def rebalance(self):
time complexity O(logn)
space complexity O(1)
if self.low.size() < self.high.size():
data = self.high.pop().data
if self.high.size() + 1 < self.low.size():
data = self.low.pop().data

def __repr__(self):
return f"(maxheap={self.low}, minheap={self.high})"

class MinHeapData:
def __init__(self, data): = data

def __lt__(self, other):
return <

def __repr__(self):
return str(

class MaxHeapData:
def __init__(self, data): = data

def __lt__(self, other):
return <

def __repr__(self):
return str(
unit tests

from unittest import TestCase
from algorithms.two_heaps.data_stream_median.data_stream_median import DataStreamMedian

class DataStreamMedianTestCase(TestCase):
def test_insert_strictly_increasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59]

# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)

# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)

def test_median_strictly_increasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59]
[data_stream_median.insert(number) for number in numbers]

# When
actual = data_stream_median.median()

# Then
expected = 55
self.assertEqual(actual, expected)

def test_insert_strictly_decreasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [59, 57, 55, 53, 51]

# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)

# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)

def test_median_strictly_decreasing_odd_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [59, 57, 55, 53, 51]
[data_stream_median.insert(number) for number in numbers]

# When
actual = data_stream_median.median()

# Then
expected = 55
self.assertEqual(actual, expected)

def test_insert_stricly_increasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59, 61]

# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)

# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59, 61]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)

def test_median_stricly_increasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [51, 53, 55, 57, 59, 61]
[data_stream_median.insert(number) for number in numbers]

# When
actual = data_stream_median.median()

# Then
expected = 56
self.assertEqual(actual, expected)

def test_insert_strictly_decreasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [61, 59, 57, 55, 53, 51]

# When
[data_stream_median.insert(number) for number in numbers]
actual = str(data_stream_median)

# Then
expected_maxheap = [55, 53, 51]
expected_minheap = [57, 59, 61]
expected = f"(maxheap={expected_maxheap}, minheap={expected_minheap})"
self.assertEqual(actual, expected)

def test_median_striclty_decreasing_even_parity_numbers(self):
# Given
data_stream_median = DataStreamMedian()
numbers = [61, 59, 57, 55, 53, 51]
[data_stream_median.insert(number) for number in numbers]

# When
actual = data_stream_median.median()

# Then
expected = 56
self.assertEqual(actual, expected)


Two Heaps.


The two heaps strategy is used to create the custom data structure, a max heap stores the low half of integers, and a min heap stores the high half of integers. The two heaps are balanced such that if there’s an odd number of integers seen so far then the max heap has a greater number of integers than the min heap. Therefore, if the size of the max heap and min heap are equal, i.e. there’s an even number of total integers seen so far, then then median is calculated as the mean of the top of the two heaps, otherwise the size of the max heap is greater than the size of the min heap, i.e. there’s an odd number of total integers seen so far, and so the median is at the top of the max heap.

Time Complexity

The time complexity of insertion and rebalance of the two heaps is O(logn) because these operations involve pushing to and popping from a heap. The time complexity to get the median is O(1) because the operation involves reading from the top of a heap.

Space Complexity

The auxiliary space complexity of all operations is O(1).


