Economics of LEGO Sets with Data Science
As a data enthusiast and LEGO fan, I decided to delve into the world of LEGO using historical data. My goal was to understand the trends, pricing, and characteristics of LEGO sets over time. Using datasets from Rebrickable and analysis tools like Pandas, Matplotlib, and Scikit-Learn, I conducted a comprehensive analysis. Here’s a journey through the history and economics of LEGO sets.
Dataset Overview
The datasets used for this analysis include various aspects of LEGO sets, parts, and themes:
- colors.csv: Information on LEGO colors, including unique IDs, names, RGB values, and transparency.
- inventories.csv: Inventory details, including unique IDs, versions, and set numbers.
- inventory_parts.csv: Part inventories, including part numbers, colors, quantities, and spare parts.
- inventory_sets.csv: Information on which inventory is included in which sets.
- part_categories.csv: Part categories and their unique IDs.
- part_relationships.csv: Relationships between different parts.
- parts.csv: Information on LEGO parts, including part numbers, names, and categories.
- sets.csv: Details of LEGO sets, including set numbers, names, release years, themes, and part counts.
- themes.csv: Information on LEGO themes, including unique IDs, names, and parent themes.
Set Analysis: Trends Over the Years
I explored the trends in LEGO sets over the years by visualizing the number of sets released each year and the average number of parts per set.
sets.groupby('year')['name'].nunique().plot(kind='bar')
plt.title("The Numbers of Sets by Year")
plt.xlabel("Year")
plt.ylabel("Numbers")
plt.show()
parts_by_year = sets[['year', 'num_parts']].groupby('year', as_index=False).mean()
parts_by_year.plot(x='year', y='num_parts', color="purple")
plt.title("Average Number of Parts by Year")
plt.xlabel("Year")
plt.ylabel("Parts")
plt.show()
Theme Analysis: Top 10 Themes
Identifying the most popular LEGO themes, I plotted the top 10 themes with the most sets.
set_themes = sets["theme_id"].value_counts()
set_themes = pd.DataFrame({"id": set_themes.index, "count": set_themes.values})
set_themes = pd.merge(set_themes, themes, on="id")
set_themes_no_parent = set_themes[pd.isnull(set_themes["parent_id"])]
set_themes_top_10 = set_themes_no_parent.sort_values(by=["count"], ascending=False)[:10]
top_10 = set_themes_top_10["count"]
top_10.index = set_themes_top_10["name"]
top_10.plot.bar(color="gold", rot=30)
plt.title("Top 10 Themes That Have Most Sets")
plt.show()
Gathering Data with a Scraper
To obtain historical and current data for LEGO sets, I developed a web scraper using Playwright, asyncio, pydantic, and aiohttp. Initially, I intended to use datasets from Rebrickable, but I found that the specific historical pricing data I wanted wasn’t available. Thus, I turned to BrickEconomy, a website that provides detailed information on LEGO sets, including historical prices. The scraper automates the data collection process, ensuring we have comprehensive data for analysis.
Setting Up the Environment
First, we need to install the required packages:
!pip install playwright asyncio pydantic aiohttp
!playwright install
Imports and Initial Setup
The necessary libraries are imported, and the initial setup is done. Playwright is used for web scraping, asyncio for asynchronous programming, pydantic for data validation, and aiohttp for asynchronous HTTP requests.
import csv
from pydantic import BaseModel
from typing import Dict, List, Optional
from playwright.async_api import async_playwright
import asyncio
import json
import re
from datetime import datetime
Data Models
We define data models using pydantic to structure the data we will scrape. These models help ensure the data is clean and well-organized.
class SetDetails(BaseModel):
name: str
value: str
class HistoryEntry(BaseModel):
date: datetime
number: float
tooltip: Optional[str]
annotation: Optional[str]
annotationText: Optional[str]
class NewEntry(BaseModel):
date: datetime
value1: float
value2: float
value3: float
value4: float
description: Optional[str] = None
class LegoSet(BaseModel):
details: List[SetDetails]
pricing: List[SetDetails]
quick_buy: List[SetDetails]
set_predictions: List[SetDetails]
set_facts: str
subtheme_analysis: List[SetDetails]
Scraper Class
The LegoAPI
class is responsible for scraping the data from BrickEconomy. It initializes with a list of LEGO set numbers, navigates to the BrickEconomy website, and extracts the required information.
class LegoAPI:
root_url = "https://www.brickeconomy.com"
def __init__(self, set_list):
self.set_list = set_list
self.output_file = "lego_sets.csv"
async def start(self):
try:
with open(self.set_list, "r") as f:
set_list = [line.rstrip() for line in f.readlines()]
except Exception as e:
print("Error opening input file")
raise e
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False)
page = await browser.new_page()
for set_num in set_list:
search_url = f"{self.root_url}/search?query={set_num}"
await page.wait_for_load_state("load")
await page.goto(search_url)
try:
possible_links = await page.query_selector_all(
"#ContentPlaceHolder1_ctlSetsOverview_GridViewSets > tbody > tr:nth-child(2) > td.ctlsets-left > div.mb-5 > h4 > a"
)
except Exception as e:
raise ValueError(f"Error parsing HTML: {e}")
if not possible_links:
raise ValueError(f"No links found for set number: {set_num}")
for link in possible_links:
href = await link.get_attribute("href")
print(href)
test_num = href.split("/")[2].split("-")[0]
print(test_num)
if str(test_num) in str(set_num):
set_details = href.split("/")[2:4]
await page.goto(self.root_url + href)
await page.wait_for_load_state("load")
await self.parse_history(page, set_num)
await self.parse_set(page, set_details)
await browser.close()
Initialization and Input Handling:
- The constructor (
__init__
) initializes the class with a list of LEGO set numbers and the output file name. - The
start
method reads the set numbers from a file and starts the Playwright browser.
Navigation and Data Extraction:
- For each set number, the scraper navigates to the search results page on BrickEconomy.
- It extracts links to individual set pages and checks if the set number matches.
- The scraper then navigates to the set’s page and calls methods to parse historical data and set details.
The data is in a script data at the end of the html
Parsing Historical Data
The parse_history
method extracts historical pricing data from the set's page.
async def parse_history(self, page, set_num):
try:
script_tags = await page.query_selector_all("script")
desired_script_content = None
for script_tag in script_tags:
script_content = await script_tag.inner_text()
if "data.addRows([" in script_content:
desired_script_content = script_content
break
if desired_script_content:
pattern = r"data\.addRows\((\[.*?\]\));"
matches = re.findall(pattern, desired_script_content, re.DOTALL)
if matches:
history_data = matches[0].replace("\n", "").replace("null", "'null'")
history_entries = []
pattern_date = re.compile(r"new Date\((\d+), (\d+), (\d+)\), (\d+\.?\d*), '([^']*)', '([^']*)'(?:, '([^']*)')?(?:, '([^']*)')?")
for match in pattern_date.finditer(history_data):
year, month, day = map(int, match.groups()[:3])
month += 1
date = datetime(year, month, day)
value = match.group(4)
currency_value = match.group(5)
status = match.group(6) if match.group(6) else None
description = match.group(7) if match.group(7) else None
history_entries.append(
HistoryEntry(
date=date,
number=value,
tooltip=currency_value,
annotation=status,
annotationText=description,
)
)
with open(f"{set_num}_history.csv", mode="w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
writer.writerow(
["Date", "Value", "Currency Value", "Status", "Description"]
)
for entry in history_entries:
writer.writerow(
[
entry.date,
entry.number,
entry.tooltip,
entry.annotation,
entry.annotationText,
]
)
if len(matches) > 1:
new_data = matches[1].replace("\n", "").replace("null", "'null'")
pattern_new = re.compile(r"new Date\((\d+), (\d+), (\d+)\), (\d+\.?\d*), (\d+\.?\d*), (\d+\.?\d*), (\d+\.?\d*), '([^']*)'")
new_entries = []
for match in pattern_new.finditer(new_data):
year, month, day = map(int, match.groups()[:3])
month += 1
date = datetime(year, month, day)
value1, value2, value3, value4 = map(float, match.groups()[3:7])
description = match.group(8)
new_entries.append(
NewEntry(
date=date,
value1=value1,
value2=value2,
value3=value3,
value4=value4,
description=description,
)
)
with open(f"{set_num}_new.csv", mode="w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
writer.writerow(
["Date", "Value 1", "Value 2", "Value 3", "Value 4", "Description"]
)
for entry in new_entries:
writer.writerow(
[
entry.date,
entry.value1,
entry.value2,
entry.value3,
entry.value4,
entry.description,
]
)
else:
pass
else:
print("Could not find 'data.addRows([...]);' in the script content.")
else:
print("Script tag with 'data.addRows([' not found.")
except Exception as e:
print(f"An error occurred while extracting data: {e}")
Extracting Script Content:
- The method searches for a script tag containing historical data in the
data.addRows
format.
Parsing and Writing Data:
- If found, it extracts the data and parses it using regular expressions to create
HistoryEntry
objects. - The data is then written to a CSV file.
Parsing Set Details
The parse_set
method extracts various details about the LEGO set, including pricing, quick buy options, predictions, and subtheme analysis.
async def parse_set(self, page, set_details):
set_details_div = await page.query_selector("div#ContentPlaceHolder1_SetDetails")
set_details_rows = await set_details_div.query_selector_all(".row.rowlist")
set_info = []
for row in set_details_rows:
key_element = await row.query_selector(".text-muted")
value_element = await row.query_selector(".col-xs-7")
if key_element and value_element:
key = await key_element.inner_text()
value = await value_element.inner_text()
set_info.append(SetDetails(name=key.strip(), value=value.strip()))
set_pricing_div = await page.query_selector("div#ContentPlaceHolder1_PanelSetPricing")
pricing_rows = await set_pricing_div.query_selector_all(".row.rowlist")
pricing_info = []
for row in pricing_rows:
key_element = await row.query_selector(".text-muted")
value_element = await row.query_selector(".col-xs-7")
if key_element and value_element:
key = await key_element.inner_text()
value = await value_element.inner_text()
pricing_info.append(SetDetails(name=key.strip(), value=value.strip()))
quick_buy_div = await page.query_selector("div#ContentPlaceHolder1_PanelSetBuying")
quick_buy_rows = await quick_buy_div.query_selector_all(".row.rowlist")
quick_buy_info = []
for row in quick_buy_rows:
key_element = await row.query_selector(".text-muted")
value_element = await row.query_selector(".col-xs-7")
if key_element and value_element:
key = await key_element.inner_text()
value = await value_element.inner_text()
quick_buy_info.append(SetDetails(name=key.strip(), value=value.strip()))
set_predictions_div = await page.query_selector("div#ContentPlaceHolder1_PanelSetPredictions")
set_predictions_rows = await set_predictions_div.query_selector_all(".row.rowlist")
set_predictions_info = []
for row in set_predictions_rows:
key_element = await row.query_selector(".text-muted")
value_element = await row.query_selector(".col-xs-7")
if key_element and value_element:
key = await key_element.inner_text()
value = await value_element.inner_text()
set_predictions_info.append(SetDetails(name=key.strip(), value=value.strip()))
set_facts_div = await page.query_selector("div#ContentPlaceHolder1_PanelSetFacts")
if set_facts_div:
set_facts = await set_facts_div.inner_text()
set_facts = set_facts.strip()
else:
set_facts = "No set facts available"
subtheme_analysis_div = await page.query_selector("div#ContentPlaceHolder1_PanelSetAnalysis")
subtheme_analysis_rows = await subtheme_analysis_div.query_selector_all(".row.rowlist")
subtheme_analysis_info = []
for row in subtheme_analysis_rows:
key_element = await row.query_selector(".text-muted")
value_element = await row.query_selector(".col-xs-7")
if key_element and value_element:
key = await key_element.inner_text()
value = await value_element.inner_text()
subtheme_analysis_info.append(SetDetails(name=key.strip(), value=value.strip()))
lego_set = LegoSet(
details=set_info,
pricing=pricing_info,
quick_buy=quick_buy_info,
set_predictions=set_predictions_info,
set_facts=set_facts,
subtheme_analysis=subtheme_analysis_info,
)
await self.write_to_csv(lego_set)
Extracting Set Details:
- The method extracts various details about the set, such as general information, pricing, quick buy options, predictions, and subtheme analysis.
Creating and Writing Data:
- These details are stored in
SetDetails
objects and combined into aLegoSet
object, which is then written to a CSV file.
Writing Data to CSV
The write_to_csv
method writes the scraped data to a CSV file.
async def write_to_csv(self, lego_set):
with open(self.output_file, mode="a", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
writer.writerow(
[
"Details",
"Pricing",
"Quick Buy",
"Set Predictions",
"Set Facts",
"Subtheme Analysis",
]
)
max_length = max(
len(lego_set.details),
len(lego_set.pricing),
len(lego_set.quick_buy),
len(lego_set.set_predictions),
len(lego_set.subtheme_analysis),
)
for i in range(max_length):
row = [
lego_set.details[i].value if i < len(lego_set.details) else "",
lego_set.pricing[i].value if i < len(lego_set.pricing) else "",
lego_set.quick_buy[i].value if i < len(lego_set.quick_buy) else "",
lego_set.set_predictions[i].value if i < len(lego_set.set_predictions) else "",
lego_set.set_facts if i == 0 else "",
lego_set.subtheme_analysis[i].value if i < len(lego_set.subtheme_analysis) else "",
]
writer.writerow(row)
Preparing the CSV File:
- The method opens the CSV file in append mode and writes the headers.
Writing Rows:
- It writes rows of data to the CSV file, ensuring that all details are included, even if some lists are shorter than others.
Running the Scraper
Finally, the main function runs the scraper.
async def main():
with open("lego_sets.csv", mode="w", newline="", encoding="utf-8") as file:
pass
lego_api = LegoAPI("set_list.txt")
await lego_api.start()
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(main(), loop)
By using this scraper, I was able to gather detailed historical and current data on LEGO sets, enabling comprehensive analysis and insights into the world of LEGO.
001–1_history.csv
001–1_new.csv
Price Analysis: Historical and Future Predictions
I focused on the historical pricing of a specific LEGO set (001–1) and predicted its future prices using linear regression.
Historical Data Cleaning
Cleaning the historical data is a crucial step to ensure the accuracy and reliability of the analysis. Here’s a detailed breakdown of the data cleaning process:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
import numpy as np
from scipy import stats
from scipy.stats import norm, skew
df = pd.read_csv("data/lego_sets.csv")
# Remove duplicate rows
df.drop_duplicates(inplace=True)
# Handle missing values in other columns
df['Pricing'] = df['Pricing'].fillna(0)
# Convert "Details" column to string data type
df['Details'] = df['Details'].astype(str)
# Standardize currency values to USD
# Ensure the column containing currency values exists and has the correct name
if 'Pricing' in df.columns:
# Remove currency symbols and any other non-numeric characters
df['Pricing'] = df['Pricing'].str.replace('[^\d.]', '', regex=True)
# Convert to float
df['Pricing'] = pd.to_numeric(df['Pricing'], errors='coerce')
if 'Set Predictions' in df.columns:
df['Set Predictions'] = df['Set Predictions'].astype(str)
df['Set Predictions'].fillna('', inplace=True)
df['Set Predictions'] = df['Set Predictions'].str.replace('[^\d.]', '', regex=True)
df['Set Predictions'] = pd.to_numeric(df['Set Predictions'], errors='coerce')
# Drop irrelevant columns
df.drop(['Subtheme Analysis'], axis=1, inplace=True)
# Rename columns for clarity
df.rename(columns={'Details': 'Set Number', 'Set Facts': 'Facts'}, inplace=True)
# Remove leading and trailing whitespace from the "Facts" column
df['Facts'] = df['Facts'].str.strip().replace('\n', '', regex=True)
# Replace non-numeric characters in "Quick Buy" with empty strings
if 'Quick Buy' in df.columns and df['Quick Buy'].dtype == 'object':
df['Quick Buy'] = df['Quick Buy'].str.replace('[^\d.]', '', regex=True)
# Convert "Quick Buy" column to numeric, coercing errors to NaN
if 'Quick Buy' in df.columns:
df['Quick Buy'] = pd.to_numeric(df['Quick Buy'], errors='coerce')
# Drop rows with NaN values in the "Quick Buy" column
df.dropna(subset=['Quick Buy'], inplace=True)
# Find the Lego set with the lowest price in the "Quick Buy" section
if 'Quick Buy' in df.columns:
lowest_price_set = df.loc[df['Quick Buy'].idxmin()]
# Step 1: Remove duplicate rows
df_0011.drop_duplicates(inplace=True)
# Step 2: Handle missing values in the 'Currency Value' column
df_0011['Currency Value'] = df_0011['Currency Value'].fillna(0)
# Step 3: Standardize currency values to USD
# Remove currency symbols and any other non-numeric characters
df_0011['Currency Value'] = df_0011['Currency Value'].str.replace('[^\d.]', '', regex=True)
# Convert to float
df_0011['Currency Value'] = pd.to_numeric(df_0011['Currency Value'], errors='coerce')
# Step 4: Drop unnecessary columns
df_0011.drop(['Value', 'Status'], axis=1, inplace=True)
df_0011['Date'] = pd.to_datetime(df_0011['Date'], errors='coerce')
# Step 5: Rename columns for clarity
df_0011.rename(columns={'Currency Value': 'USD Value', 'Description': 'Set Description'}, inplace=True)
# remove NAN values
df_0011['USD Value'] = df_0011['USD Value'].fillna(0)
df_0011 = df_0011[df_0011['USD Value'] != 0]
# remove nan values description remplace by ''
df_0011['Set Description'] = df_0011['Set Description'].fillna('')
Linear Regression and Future Predictions
Once the historical data is cleaned, we can use it to make future price predictions using linear regression.
X = df_0011['Date'].map(pd.Timestamp.toordinal).values.reshape(-1, 1)
y = df_0011['USD Value'].values
# Fit the model
model = LinearRegression().fit(X, y)
# Prepare future dates and predict
future_dates = pd.date_range('2008-12-31', '2030-12-31', freq='Y')
future_dates_ordinal = future_dates.map(pd.Timestamp.toordinal).values.reshape(-1, 1)
future_predictions = model.predict(future_dates_ordinal)
# Plotting, making sure to convert ordinal dates back to datetime for readability
plt.figure(figsize=(10, 6))
plt.scatter(df_0011['Date'], df_0011['USD Value'], color='blue', label='Data points')
plt.plot(df_0011['Date'], model.predict(X), color='red', label='Linear regression')
plt.plot(future_dates, future_predictions, color='green', label='Future predictions', linestyle='--')
plt.title('Linear Regression of Lego Set 001-1')
plt.xlabel('Date')
plt.ylabel('USD Value')
plt.xticks(rotation=45)
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
Once the historical data is cleaned, we can use it to make future price predictions using linear regression.
# Create subplots
fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(12, 12))
# Plot 1: Gain Over Time
axes[0].plot(df_0011['Date'], df_0011['Gain'], marker='o', linestyle='-', color='b', label='Gain')
axes[0].set_title('Gain Over Time')
axes[0].set_xlabel('Date')
axes[0].set_ylabel('Gain')
axes[0].grid(True)
axes[0].legend()
# Add trend line to the first plot
slope, intercept, r_value, p_value, std_err = stats.linregress(df_0011.index, df_0011['Gain'])
axes[0].plot(df_0011['Date'], intercept + slope * df_0011.index, color='red', label='Trend line')
# Annotate the trend line equation
axes[0].text(df_0011['Date'].iloc[5], df_0011['Gain'].iloc[5], f'y = {slope:.2f}x + {intercept:.2f}', fontsize=10, color='black')
# Rotate x-axis labels for better readability
axes[0].tick_params(axis='x', rotation=45)
# Annotate the maximum gain if df_0011 is not empty
if not df_0011.empty:
max_gain_index = df_0011['Gain'].idxmax()
if not pd.isna(max_gain_index):
max_gain_date = df_0011.loc[max_gain_index, 'Date']
max_gain_value = df_0011.loc[max_gain_index, 'Gain']
axes[0].annotate('Max Gain', xy=(max_gain_date, max_gain_value),
xytext=(max_gain_date - pd.Timedelta(days=100), max_gain_value + 0.5),
arrowprops=dict(facecolor='black', arrowstyle='->'))
# Highlight points of interest if df_0011 is not empty
if not df_0011.empty:
points_of_interest_index = [5, 11, 13, 23]
points_of_interest = df_0011.iloc[points_of_interest_index]
axes[0].scatter(points_of_interest['Date'], points_of_interest['Gain'], color='red', label='Points of Interest')
# Plot 2: USD Value Over Time
axes[1].scatter(df_0011['Date'], df_0011['USD Value'], color='blue', label='Data points')
axes[1].set_title('USD Value Over Time')
axes[1].set_xlabel('Date')
axes[1].set_ylabel('USD Value')
axes[1].legend()
axes[1].grid(True)
# Linear Regression for USD Value over Time
X = df_0011['Date'].astype(int).values.reshape(-1, 1) # Convert datetime to numerical representation
y = df_0011['USD Value'].values
model = LinearRegression().fit(X, y)
axes[1].plot(df_0011['Date'], model.predict(X), color='red', label='Linear regression')
# Rotate x-axis labels for better readability
for ax in axes:
ax.tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
Conclusion
This analysis provided a detailed look into the world of LEGO, from color distribution to historical pricing trends and future predictions. With data science tools, we can uncover fascinating insights about beloved toys, making the data journey both educational and enjoyable.