Sliding Window Median

Ethan Davis
Data Structures and Algorithms DSA
5 min readMay 30, 2024
Data Structures and Algorithms

Statement

Given an integer array s and an integer k, return an array of the median of each subarray of s of size k.

Constraints

  • 1 ≤ ks.length() ≤ 10³
  • -2³¹ ≤ s[i] ≤ 2³¹ — 1

Solution

"""
production algorithm
"""


from collections import defaultdict
from data_structures.heap.heap import Heap


class Solution:
def median_sliding_window(self, nums, k):
"""
time complexity O(nlogn)
space complexity O(n)
"""
n = len(nums)
low, high = Heap(), Heap()
visited = defaultdict(int)
result = list()

# build window
for i in range(k):
low.push(MaxHeapData(nums[i]))
for i in range(k // 2):
high.push(MinHeapData(low.pop().data))

# record result
if k % 2 == 0:
result.append(low.top().data / 2 + high.top().data / 2)
else:
result.append(low.top().data)

# slide window
for i in range(k, n):

# stage pop of low index of window
balance = 0
visited[nums[i - k]] += 1
if nums[i - k] <= low.top().data:
balance -= 1
else:
balance += 1

# push high index of window
if nums[i] <= low.top().data:
low.push(MaxHeapData(nums[i]))
balance += 1
else:
high.push(MinHeapData(nums[i]))
balance -= 1

# rebalance window
if balance < 0:
low.push(MaxHeapData(high.pop().data))
if 0 < balance:
high.push(MinHeapData(low.pop().data))

# pop staged low indexes of window
while not low.empty() and 0 < visited[low.top().data]:
visited[low.top().data] -= 1
low.pop()
while not high.empty() and 0 < visited[high.top().data]:
visited[high.top().data] -= 1
high.pop()

# record result
if k % 2 == 0:
result.append(low.top().data / 2 + high.top().data / 2)
else:
result.append(low.top().data)

return result


class MinHeapData:
def __init__(self, data):
self.data = data

def __lt__(self, other):
return self.data < other.data


class MaxHeapData:
def __init__(self, data):
self.data = data

def __lt__(self, other):
return other.data < self.data
"""
unit tests
"""

from unittest import TestCase
from algorithms.two_heaps.sliding_window_median.solution import Solution


class SolutionTestCase(TestCase):
def test_strictly_increasing_array_even_length_subarray(self):
# Given
nums = [1, 2, 5, 6, 8, 10, 12, 16, 17, 18, 20, 21, 23, 26, 30,
31, 34, 37, 38, 40, 41, 45, 48, 49, 51, 53, 55, 56, 57, 59]
k = 4
solution = Solution()

# When
actual = solution.median_sliding_window(nums, k)

# Then
expected = [3.5, 5.5, 7.0, 9.0, 11.0, 14.0, 16.5, 17.5, 19.0, 20.5, 22.0, 24.5, 28.0,
30.5, 32.5, 35.5, 37.5, 39.0, 40.5, 43.0, 46.5, 48.5, 50.0, 52.0, 54.0, 55.5, 56.5]
self.assertEqual(actual, expected)

def test_strictly_increasing_array_odd_length_subarray(self):
# Given
nums = [1, 2, 5, 6, 8, 10, 12, 16, 17, 18, 20, 21, 23, 26, 30,
31, 34, 37, 38, 40, 41, 45, 48, 49, 51, 53, 55, 56, 57, 59]
k = 5
solution = Solution()

# When
actual = solution.median_sliding_window(nums, k)

# Then
expected = [5, 6, 8, 10, 12, 16, 17, 18, 20, 21, 23, 26,
30, 31, 34, 37, 38, 40, 41, 45, 48, 49, 51, 53, 55, 56]
self.assertEqual(actual, expected)

def test_strictly_decreasing_array_even_length_subarray(self):
# Given
nums = [58, 53, 51, 50, 49, 47, 46, 44, 42, 39, 37, 35, 33, 31,
30, 29, 28, 24, 23, 20, 18, 17, 16, 15, 13, 9, 8, 5, 4, 1]
k = 4
solution = Solution()

# When
actual = solution.median_sliding_window(nums, k)

# Then
expected = [52.0, 50.5, 49.5, 48.0, 46.5, 45.0, 43.0, 40.5, 38.0, 36.0, 34.0, 32.0,
30.5, 29.5, 28.5, 26.0, 23.5, 21.5, 19.0, 17.5, 16.5, 15.5, 14.0, 11.0, 8.5, 6.5, 4.5]
self.assertEqual(actual, expected)

def test_strictly_decreasing_array_odd_length_subarray(self):
# Given
nums = [58, 53, 51, 50, 49, 47, 46, 44, 42, 39, 37, 35, 33, 31,
30, 29, 28, 24, 23, 20, 18, 17, 16, 15, 13, 9, 8, 5, 4, 1]
k = 5
solution = Solution()

# When
actual = solution.median_sliding_window(nums, k)

# Then
expected = [51, 50, 49, 47, 46, 44, 42, 39, 37, 35, 33, 31,
30, 29, 28, 24, 23, 20, 18, 17, 16, 15, 13, 9, 8, 5]
self.assertEqual(actual, expected)

def test_random_array_even_length_subarray(self):
# Given
nums = [38, 20, 9, 31, 55, 44, 42, 18, 34, 5, 16, 37, 52, 47,
26, 6, 2, 36, 11, 43, 59, 15, 10, 51, 56, 48, 7, 33, 27, 28]
k = 4
solution = Solution()

# When
actual = solution.median_sliding_window(nums, k)

# Then
expected = [25.5, 25.5, 37.5, 43.0, 43.0, 38.0, 26.0, 17.0, 25.0, 26.5, 42.0, 42.0, 36.5,
16.0, 16.0, 8.5, 23.5, 39.5, 29.0, 29.0, 33.0, 33.0, 49.5, 49.5, 40.5, 30.0, 27.5]
self.assertEqual(actual, expected)

def test_random_array_odd_length_subarray(self):
# Given
nums = [38, 20, 9, 31, 55, 44, 42, 18, 34, 5, 16, 37, 52, 47,
26, 6, 2, 36, 11, 43, 59, 15, 10, 51, 56, 48, 7, 33, 27, 28]
k = 5
solution = Solution()

# When
actual = solution.median_sliding_window(nums, k)

# Then
expected = [31, 31, 42, 42, 42, 34, 18, 18, 34, 37, 37, 37,
26, 26, 11, 11, 36, 36, 15, 43, 51, 48, 48, 48, 33, 28]
self.assertEqual(actual, expected)

Strategy

Two Heaps.

Explanation

First, the balanced two heaps is built. If the window size is even, then the size of the two heaps will be equal. Otherwise if the window size is odd, then the max heap will have one extra value than the min heap. The result of building the balanced two heaps is the first window, record its median.

Next, for each iteration of the sliding window, record which of the two heaps the low index of the window will be popped from. It’s possible that the low index of the window isn’t at the top of its heap. Also, push the high index of the window to its heaps. If the heap to be popped from, and the heap that was pushed to, are the same, then the two heaps are still balanced. Otherwise the heap to be popped from, and the heap that was pushed to, are not the same, and so rebalance the two heaps by popping the top of the heap that was pushed to, and pushing it to the heap that was popped from.

At the end of each iteration of the sliding window, pop from both heaps while their top is a recorded low index of a window to be popped. Past low indexes can be beneath the top of the two heaps as the median of the sliding window shifts. Rather than operate to find the low index of the window to pop it in each iteration of the sliding window, the heap that it will be popped from is recorded. Then if a low index of a window surfaces, it will be popped at that time. The last step at the end of each iteration of the sliding window, is to record the median of the current window.

Finally, after iteration of the sliding window is completed, all medians of subarrays of length k have been recorded.

Time Complexity

The time complexity of the algorithm depends on the size of the two heaps used. At worst, the size of one of the heaps can grow to n — k. Therefore, the time complexity of the algorithm is O(nlogn).

Space Complexity

At worst, the size of the heaps used can grow to n — k. Therefore, the auxiliary space complexity of the algorithm is O(n).

Links

--

--