(ML) 永豐AI GO競賽-攻房戰

YEN HUNG CHENG
45 min readNov 19, 2023

--

Photo by Rowan Heuvel on Unsplash

最終比賽結果排名為 184,在 Private 的 MAPE 取得了 9.808724 的分數

點選報名後,就可以下載 Dataset

在 csv 的 training_data 中,一共有 22 個 特徵(不包含 ID)

縣市鄉鎮市區路名土地面積使用分區移轉層次總樓層數主要用途主要建材建物型態屋齡、建物面積、車位面積、車位個數、橫坐標、縱坐標、備註、主建物面積、陽台面積、附屬建物面積、單價

粗體字的部分為沒有經過 去識別化的資料

移除無用的資料

我的第一步是將 無用的資料移除(ID、鄉鎮市區、路名、使用分區、橫坐標、縱坐標、備註)

# 將 df_tr 中的 ID、鄉鎮市區、路名、使用分區、橫坐標、縱坐標、備註 移除 
df_tr = df_tr.drop(['ID', '鄉鎮市區', '路名', '使用分區', '橫坐標', '縱坐標', '備註'], axis=1)
df_tt = df_tt.drop(['ID', '鄉鎮市區', '路名', '使用分區', '橫坐標', '縱坐標', '備註'], axis=1)

print(df_tr.columns[:])
print(df_tt.columns[:])

鄉鎮市區 中可以發現,總共有 121 個鄉鎮,由於不太清楚如何處理,我就將其移除

路名 則是有 3058 個路名,我也將其移除

使用分區 大多數為 None,雖然可以透過 全國土地使用分區資料查詢系統 (tcd.gov.tw) 輸入 座標 來查詢,但是有點耗時,所以我將 使用分區 移除

橫坐標 縱座標 將其移除

備註的部分大多都為空白,也將其移除

將 [縣市] 改為平均房價,再轉成 z-score

from sklearn.preprocessing import StandardScaler

# 定義縣市和對應的平均房價
city_to_price = {
'台北市': 74.9,
'新北市': 43.5,
'新竹市': 41.8,
'新竹縣': 36.4,
'基隆市': 35.1,
'桃園市': 33.7,
'台中市': 32.8,
'高雄市': 28.5,
'台南市': 26.9,
'彰化縣': 23.4,
'苗栗縣': 22.9,
'嘉義市': 22.3,
'宜蘭縣': 21.9,
'雲林縣': 21.3,
'屏東縣': 20.6,
'花蓮縣': 19.8,
'金門縣': 18.7
}

# 將 df 中的 "縣市" 欄位替換成對應的平均房價
df_tr['縣市'] = df_tr['縣市'].replace(city_to_price)
df_tt['縣市'] = df_tt['縣市'].replace(city_to_price)

# 將 縣市 轉 z-score 標準化
features = df_tr[['縣市']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tr['縣市'] = scaled_features[:, 0]

features = df_tt[['縣市']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['縣市'] = scaled_features[:, 0]


print(df_tr.columns[:])
print(df_tt.columns[:])

將[轉移層數 總樓層數] 轉成 z-score

# 假設 df 是一個 DataFrame,其中包含了「移轉層次」和「總樓層數」兩個特徵
features = df_tr[['移轉層次', '總樓層數']]

scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)

# 如果需要,可以將標準化後的特徵添加回原始的 DataFrame
df_tr['移轉層次'] = scaled_features[:, 0]
df_tr['總樓層數'] = scaled_features[:, 1]


features = df_tt[['移轉層次', '總樓層數']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['移轉層次'] = scaled_features[:, 0]
df_tt['總樓層數'] = scaled_features[:, 1]



print(df_tr.columns[:])
print(df_tr.shape)

print(df_tt.columns[:])
print(df_tt.shape)

將 [主要用途] 改為平均房價,再轉為 z-score

# 定義主要用途和對應的平均房價
use_to_price = {
'店鋪': 136.5,
'商業用': 121.2,
'辦公室': 106.8,
'住商用': 87.9,
'集合住宅': 69.9,
'住家用': 67.4,
'國民住宅': 64.5,
'一般事務所': 63.8,
'廠房': 50.4,
'工業用': 49.7,
'住工用': 48.6,
'其他': 46.9
}

# 將 df 中的 "主要用途" 欄位替換成對應的平均房價
df_tr['主要用途'] = df_tr['主要用途'].replace(use_to_price)

# 將 主要用途 轉 z-score 標準化
features = df_tr[['主要用途']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tr['主要用途'] = scaled_features[:, 0]
print(df_tr.columns[:])

df_tt['主要用途'] = df_tt['主要用途'].replace(use_to_price)
features = df_tt[['主要用途']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['主要用途'] = scaled_features[:, 0]
print(df_tt.columns[:])

將 [主要建材] 改為平均房價,再轉為 z-score

# 定義主要建材和對應的平均房價
material_to_price = {
'鋼筋混凝土造': 65.4,
'鋼骨造': 60.2,
'鋼筋混凝土加強磚造': 48.6,
'加強磚造': 41.8,
'磚造': 38.4,
'其他': 36.7
}

# 將 df 中的 "主要建材" 欄位替換成對應的平均房價
df_tr['主要建材'] = df_tr['主要建材'].replace(material_to_price)

# 將 主要建材 轉 z-score 標準化
features = df_tr[['主要建材']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tr['主要建材'] = scaled_features[:, 0]
print(df_tr.columns[:])

df_tt['主要建材'] = df_tt['主要建材'].replace(material_to_price)
features = df_tt[['主要建材']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['主要建材'] = scaled_features[:, 0]
print(df_tt.columns[:])

將 [建物型態] 改為平均房價,再轉為 z-score

# 定義建物型態和對應的平均房價
type_to_price = {
'住宅大樓(11層含以上有電梯)': 69.9,
'華廈(10層含以下有電梯)': 63.8,
'公寓(5樓含以下無電梯)': 43.6,
'透天厝': 40.2
}

# 將 df 中的 "建物型態" 欄位替換成對應的平均房價
df_tr['建物型態'] = df_tr['建物型態'].replace(type_to_price)

# 將 建物型態 轉 z-score 標準化
features = df_tr[['建物型態']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tr['建物型態'] = scaled_features[:, 0]
print(df_tr.columns[:])

df_tt['建物型態'] = df_tt['建物型態'].replace(type_to_price)
features = df_tt[['建物型態']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['建物型態'] = scaled_features[:, 0]
print(df_tt.columns[:])

將[屋齡]轉成 Int,再轉為 z-score

# 將資料中的 屋齡 轉換成 Int 型態
df_tr['屋齡'] = df_tr['屋齡'].astype(int)

# 將 屋齡 轉 z-score 標準化
features = df_tr[['屋齡']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tr['屋齡'] = scaled_features[:, 0]
print(df_tr.columns[:])

df_tt['屋齡'] = df_tt['屋齡'].astype(int)
features = df_tt[['屋齡']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['屋齡'] = scaled_features[:, 0]
print(df_tt.columns[:])

將 [車位個數] 轉為 z-score

# 將 車位個數 轉 z-score 標準化
features = df_tr[['車位個數']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tr['車位個數'] = scaled_features[:, 0]
print(df_tr.columns[:])

features = df_tt[['車位個數']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['車位個數'] = scaled_features[:, 0]
print(df_tt.columns[:])

檢查一下整理後的數據

print(df_tr.iloc[0])

Import Some Packages

# PyTorch
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# For data preprocess
import numpy as np
import csv
import os

# For plotting
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure

myseed = 42069 # set a random seed for reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False


np.random.seed(myseed)
torch.manual_seed(myseed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(myseed)

Some Utilities

# CUDA  
# def get_device():
# ''' Get device (if GPU is available, use GPU) '''
# return 'mps' if torch.cuda.is_available() else 'cpu'

# MPS (Apple Metal)
def get_device():
''' Get device (if GPU is available, use GPU) '''
return "mps" if getattr(torch,'has_mps',False) \
else "gpu" if torch.cuda.is_available() else "cpu"




def plot_learning_curve(loss_record, title=''):
''' Plot learning curve of your DNN (train & dev loss) '''
total_steps = len(loss_record['train'])
x_1 = range(total_steps)
x_2 = x_1[::len(loss_record['train']) // len(loss_record['dev'])]
figure(figsize=(6, 4))
plt.plot(x_1, loss_record['train'], c='tab:red', label='train')
plt.plot(x_2, loss_record['dev'], c='tab:cyan', label='dev')
plt.ylim(0.0, 5.)
plt.xlabel('Training steps')
plt.ylabel('MSE loss')
plt.title('Learning curve of {}'.format(title))
plt.legend()
plt.show()

Preprocess

  • train : for training
  • dev : for validation
  • test : for testing (w/o target value)
class CustomDataset(Dataset):
''' Generic Dataset for loading and preprocessing any dataset from a Pandas DataFrame '''
def __init__(self, df, mode='train', target_only=True):
self.mode = mode

if mode == 'test':
# Testing data
data = df.values
self.data = torch.FloatTensor(data)
else:
# Training data (train/dev sets)
target = df.iloc[:, -1].values
data = df.values[:,:-1] # Remove the last column which is the target

# Splitting training data into train & dev sets
if mode == 'train':
indices = [i for i in range(len(data)) if i % 10!= 0]
elif mode == 'dev':
indices = [i for i in range(len(data)) if i % 10== 0]

# Convert data into PyTorch tensors
self.data = torch.FloatTensor(data[indices])
self.target = torch.FloatTensor(target[indices])

self.dim = self.data.shape[1]

print('Finished reading the {} set of Custom Dataset ({} samples found, each dim = {})'
.format(mode, len(self.data), self.dim))

def __getitem__(self, index):
# Returns one sample at a time
if self.mode in ['train', 'dev']:
# For training
return self.data[index], self.target[index]
else:
# For testing (no target)
return self.data[index]

def __len__(self):
# Returns the size of the dataset
return len(self.data)

DataLoader

def prep_dataloader(df, mode, batch_size, n_jobs=0, target_only=False):
''' Generates a dataset, then is put into a dataloader. '''
dataset = CustomDataset(df, mode=mode, target_only=target_only) # Construct dataset
dataloader = DataLoader(
dataset, batch_size,
shuffle=(mode == 'train'), drop_last=False,
num_workers=n_jobs, pin_memory=True) # Construct dataloader
return dataloader

Deep Neural Network

class NeuralNet(nn.Module):
''' A simple fully-connected deep neural network '''
def __init__(self, input_dim):
super(NeuralNet, self).__init__()

self.net = nn.Sequential(
nn.Linear(input_dim, 16),
nn.BatchNorm1d(16),
nn.Dropout(p=0.2),
nn.ReLU(),
nn.Linear(16, 1)
)

# Mean squared error loss
self.criterion = nn.MSELoss(reduction='mean')

def forward(self, x):
''' Given input of size (batch_size x input_dim), compute output of the network '''
return self.net(x).squeeze(1)

def cal_loss(self, pred, target):
''' Calculate loss '''
regularization_loss = 0
for param in model.parameters():
# TODO: you may implement L1/L2 regularization here
regularization_loss += torch.sum(param ** 2)
return self.criterion(pred, target) + 0.00075 * regularization_loss

Training

def train(tr_set, dv_set, model, config, device):
''' DNN training '''

n_epochs = config['n_epochs'] # Maximum number of epochs

# Setup optimizer
optimizer = getattr(torch.optim, config['optimizer'])(
model.parameters(), **config['optim_hparas'])

min_mse = 1000.
loss_record = {'train': [], 'dev': []} # for recording training loss
early_stop_cnt = 0
epoch = 0
while epoch < n_epochs:
model.train() # set model to training mode
for x, y in tr_set: # iterate through the dataloader
optimizer.zero_grad() # set gradient to zero
x, y = x.to(device), y.to(device) # move data to device (cpu/cuda)
pred = model(x) # forward pass (compute output)
mse_loss = model.cal_loss(pred, y) # compute loss
mse_loss.backward() # compute gradient (backpropagation)
optimizer.step() # update model with optimizer
loss_record['train'].append(mse_loss.detach().cpu().item())

# After each epoch, test your model on the validation (development) set.
dev_mse = dev(dv_set, model, device)
if dev_mse < min_mse:
# Save model if your model improved
min_mse = dev_mse
print('Saving model (epoch = {:4d}, loss = {:.4f})'
.format(epoch + 1, min_mse))
torch.save(model.state_dict(), config['save_path']) # Save model to specified path
early_stop_cnt = 0
else:
early_stop_cnt += 1

epoch += 1
loss_record['dev'].append(dev_mse)
if early_stop_cnt > config['early_stop']:
# Stop training if your model stops improving for "config['early_stop']" epochs.
break

print('Finished training after {} epochs'.format(epoch))
return min_mse, loss_record

Validation

def dev(dv_set, model, device):
model.eval() # set model to evalutation mode
total_loss = 0
for x, y in dv_set: # iterate through the dataloader
x, y = x.to(device), y.to(device) # move data to device (cpu/cuda)
with torch.no_grad(): # disable gradient calculation
pred = model(x) # forward pass (compute output)
mse_loss = model.cal_loss(pred, y) # compute loss
total_loss += mse_loss.detach().cpu().item() * len(x) # accumulate loss
total_loss = total_loss / len(dv_set.dataset) # compute averaged loss

return total_loss

Testing

def test(tt_set, model, device):
model.eval() # set model to evalutation mode
preds = []
for x in tt_set: # iterate through the dataloader
x = x.to(device) # move data to device (cpu/cuda)
with torch.no_grad(): # disable gradient calculation
pred = model(x) # forward pass (compute output)
preds.append(pred.detach().cpu()) # collect prediction
preds = torch.cat(preds, dim=0).numpy() # concatenate all predictions and convert to a numpy array
return preds

Setup Hyper-parameters

device = get_device()                 # get the current available device ('cpu' or 'cuda')
print(device)

os.makedirs('models', exist_ok=True) # The trained model will be saved to ./models/
target_only = True # TODO: Using 40 states & 2 tested_positive features

# TODO: How to tune these hyper-parameters to improve your model's performance?
config = {
'n_epochs': 500, # maximum number of epochs
'batch_size': 270, # mini-batch size for dataloader
'optimizer': 'Adam', # optimization algorithm (optimizer in torch.optim)
'optim_hparas': { # hyper-parameters for the optimizer (depends on which optimizer you are using)
'lr': 0.001, # learning rate of SGD
# 'momentum': 0.9 # momentum for SGD
},
'early_stop': 200, # early stopping epochs (the number epochs since your model's last improvement)
'save_path': 'models/model.pth' # your model will be saved here

Load data and model

tr_set = prep_dataloader(df_tr, 'train', config['batch_size'], target_only=target_only)
dv_set = prep_dataloader(df_tr, 'dev', config['batch_size'], target_only=target_only)
tt_set = prep_dataloader(df_tt, 'test', config['batch_size'], target_only=target_only)
model = NeuralNet(tr_set.dataset.dim).to(device)  # Construct model and move to device

Start Training!

model_loss, model_loss_record = train(tr_set, dv_set, model, config, device)
plot_learning_curve(model_loss_record, title='deep model')

Testing

def save_pred(preds, file):
''' Save predictions to specified file with custom IDs '''
print('Saving results to {}'.format(file))
with open(file, 'w') as fp:
writer = csv.writer(fp)
writer.writerow(['ID', 'predicted_price'])
for i, p in enumerate(preds):
writer.writerow(['PU-{}'.format(i + 1), p])

preds = test(tt_set, model, device) # predict COVID-19 cases with your model

save_pred(preds, 'pred.csv') # save prediction file to pred.csv

第一次繳交

看起來預測的並不是很理想,在 Public 的 MAPE 高達了 18.793825,後來有嘗試許多的方法都不是很理想,其中也嘗試了將縣市特徵轉為 one hot encoding,但 MAPE 始終在 18 左右無法做的更好

之後我與做過房價預測的朋友討論了一下,他發現我移除了很關鍵的特徵,也就是橫坐標與縱坐標,這對於房價是非常重要的特徵,所以我接下來將 橫坐標與縱坐標加回,隨後增加神經元的數目與加深網路,並移除了regularization 與 Dropout,在 Public 的 MAPE 也從 18 下降至 11.68087

最終的程式碼

# 刪除 train 資料中的 嘉義縣 雲林縣
df_tr = df_tr[df_tr['縣市'] != '嘉義縣']
df_tr = df_tr[df_tr['縣市'] != '雲林縣']

查看目標值和數字特徵之間的關係

import numpy as np

import pandas as pd
pd.set_option('display.float_format', lambda x: '{:.2f}'.format(x))

import seaborn as sns
color = sns.color_palette()
sns.set_style('darkgrid')


import matplotlib.pyplot as plt
plt.rcParams["font.sans-serif"] = ["SimHei"]
plt.rcParams["axes.unicode_minus"] = False # 正常顯示負號



#分離數字特徵和類別特徵
num_features = []
cate_features = []

for col in df_tt.columns:
if df_tt[col].dtype == 'object':
cate_features.append(col)
else:
num_features.append(col)
print('number of numeric features:', len(num_features))
print('number of categorical features:', len(cate_features))


plt.figure(figsize=(16, 20))
plt.subplots_adjust(hspace=0.3, wspace=0.3)

for i, feature in enumerate(num_features):
plt.subplot(9, 4, i+1)
sns.scatterplot(x=feature, y='單價', data=df_tr, alpha=0.5)
plt.xlabel(feature)
plt.ylabel('單價')
plt.show()

移除使用不到的特徵

df_tr = df_tr.drop(['ID','縣市',  '路名', '使用分區', '備註', '陽台面積', '附屬建物面積', '車位面積', '車位個數',], axis=1)
df_tt = df_tt.drop(['ID','縣市', '路名', '使用分區', '備註', '陽台面積', '附屬建物面積', '車位面積', '車位個數', ], axis=1)
最終使用到的特徵

接下來我上網抓取了每個鄉鎮市區的平均房價(抓取的資料來自於2023/10)

處理 鄉鎮市區 的房價

from sklearn.preprocessing import StandardScaler

# 讀取 鄉鎮市區.csv 的資料進來
df_town = pd.read_csv('鄉鎮.csv')
print(df_town.shape)

# 使用 df_town 的 鄉鎮市區 與 平均房價 (新台幣/坪) 建立一個 dict
town_to_price = {}
for i in range(len(df_town)):
town_to_price[df_town['鄉鎮市區'][i]] = df_town['平均房價 (新台幣/坪)'][i]

print(town_to_price)

# 將 df_tr 中的 "鄉鎮市區" 欄位替換成對應的平均房價
df_tr['鄉鎮市區'] = df_tr['鄉鎮市區'].replace(town_to_price)

# 將 鄉鎮市區 轉 z-score 標準化
features = df_tr[['鄉鎮市區']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tr['鄉鎮市區'] = scaled_features[:, 0]



# 將 df_tt 中的 "鄉鎮市區" 欄位替換成對應的平均房價
df_tt['鄉鎮市區'] = df_tt['鄉鎮市區'].replace(town_to_price)

# 將 鄉鎮市區 轉 z-score 標準化
features = df_tt[['鄉鎮市區']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['鄉鎮市區'] = scaled_features[:, 0]

將移轉層次 總樓層數 使用 PCA 降為 1 個特徵

from sklearn.decomposition import PCA

pca = PCA(n_components=1)

tr_features = df_tr[['移轉層次', '總樓層數']]
tt_features = df_tt[['移轉層次', '總樓層數']]

pca.fit(tr_features)
pca.fit(tt_features)

# 查看轉換後的特徵
pca_tr_features = pca.transform(tr_features)
pca_tt_features = pca.transform(tt_features)


# 將 PCA 轉換後的特徵添加到數據集中
df_tr['PCA_Floor'] = pca_tr_features
df_tt['PCA_Floor'] = pca_tt_features

# 移除 移轉層次 總樓層數
df_tr = df_tr.drop(['移轉層次', '總樓層數'], axis=1)
df_tt = df_tt.drop(['移轉層次', '總樓層數'], axis=1)

將 土地面積 建物面積 主建建物面積 使用 PCA 進行特徵融合

from sklearn.decomposition import PCA
pca = PCA(n_components=1)
# 將 df_tr 中的 土地面積 建物面積 主建物面積 使用 PCA 降為 1 個特徵
pca = PCA(n_components=1)

tr_features = df_tr[['土地面積', '建物面積', '主建物面積']]
tt_features = df_tt[['土地面積', '建物面積', '主建物面積']]

pca.fit(tr_features)
pca.fit(tt_features)

# 查看轉換後的特徵
pca_tr_features = pca.transform(tr_features)
pca_tt_features = pca.transform(tt_features)

# 將 PCA 轉換後的特徵添加到數據集中
df_tr['PCA_Area'] = pca_tr_features
df_tt['PCA_Area'] = pca_tt_features

# 移除 土地面積 建物面積 主建物面積
df_tr = df_tr.drop(['土地面積', '建物面積', '主建物面積'], axis=1)
df_tt = df_tt.drop(['土地面積', '建物面積', '主建物面積'], axis=1)

主要用途 使用 label encoding 轉換成數值

# 主要用途 使用 label encoding 轉換成數值
code_dic = {'其他': 0, '住工用': 1, '工業用': 2, '廠房': 3, '一般事務所': 4, '國民住宅': 5, '住家用': 6, '集合住宅': 7, '住商用': 8, '辦公室': 9, '商業用': 10, '店鋪': 11}

df_tr['主要用途'] = df_tr['主要用途'].map(code_dic)
df_tt['主要用途'] = df_tt['主要用途'].map(code_dic)

主要建材 使用 label encoding 轉換成數值

code_dic = {'其他': 2, '磚造': 0, '加強磚造': 3, '鋼筋混凝土加強磚造': 1, '鋼骨造': 5, '鋼筋混凝土造': 4}

df_tr['主要建材'] = df_tr['主要建材'].map(code_dic)
df_tt['主要建材'] = df_tt['主要建材'].map(code_dic)

建物型態 使用 label encoding 轉換成數值

code_dic = {'透天厝': 0, '公寓(5樓含以下無電梯)': 1, '華廈(10層含以下有電梯)': 2, '住宅大樓(11層含以上有電梯)': 3}

df_tr['建物型態'] = df_tr['建物型態'].map(code_dic)
df_tt['建物型態'] = df_tt['建物型態'].map(code_dic)

將[屋齡]轉成 Int,再轉為 z-score

from sklearn.preprocessing import StandardScaler
# 將資料中的 屋齡 轉換成 Int 型態
df_tr['屋齡'] = df_tr['屋齡'].astype(int)

# 將 屋齡 轉 z-score 標準化
features = df_tr[['屋齡']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tr['屋齡'] = scaled_features[:, 0]
print(df_tr.columns[:])

df_tt['屋齡'] = df_tt['屋齡'].astype(int)
features = df_tt[['屋齡']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['屋齡'] = scaled_features[:, 0]
print(df_tt.columns[:])

將橫坐標 縱坐標 轉成 z-score

# 將橫坐標 轉成 z-score
features = df_tr[['橫坐標']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tr['橫坐標'] = scaled_features[:, 0]
print(df_tr.columns[:])

features = df_tt[['橫坐標']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['橫坐標'] = scaled_features[:, 0]
print(df_tt.columns[:])

# 將 縱坐標 轉 z-score 標準化
features = df_tr[['縱坐標']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tr['縱坐標'] = scaled_features[:, 0]
print(df_tr.columns[:])

features = df_tt[['縱坐標']]
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
df_tt['縱坐標'] = scaled_features[:, 0]
print(df_tt.columns[:])
# 將 df_tr 中的 單價 移到最後ㄧ欄
df_tr = df_tr[[col for col in df_tr if col != '單價'] + ['單價']]
print(df_tr.columns[:])

查看目標值和數字特徵之間的關係

import numpy as np

import pandas as pd
pd.set_option('display.float_format', lambda x: '{:.2f}'.format(x))

import seaborn as sns
color = sns.color_palette()
sns.set_style('darkgrid')

import matplotlib.pyplot as plt
plt.rcParams["font.sans-serif"] = ["SimHei"]
plt.rcParams["axes.unicode_minus"] = False # 正常顯示負號

#分離數字特徵和類別特徵
num_features = []
cate_features = []

for col in df_tt.columns:
if df_tt[col].dtype == 'object':
cate_features.append(col)
else:
num_features.append(col)
print('number of numeric features:', len(num_features))
print('number of categorical features:', len(cate_features))

plt.figure(figsize=(16, 20))
plt.subplots_adjust(hspace=0.3, wspace=0.3)

for i, feature in enumerate(num_features):
plt.subplot(9, 4, i+1)
sns.scatterplot(x=feature, y='單價', data=df_tr, alpha=0.5)
plt.xlabel(feature)
plt.ylabel('單價')
plt.show()

主要用途 離群值處理

# 移除左上角的 異常點
df_tr = df_tr.drop(df_tr[(df_tr['主要用途']<1) & (df_tr['單價']>10)].index)

sns.scatterplot(x='主要用途', y='單價', data=df_tr)

鄉鎮市區 離群值處理

# 移除左上角的 異常點
df_tr = df_tr.drop(df_tr[(df_tr['鄉鎮市區']<-0.5) & (df_tr['單價']>10)].index)

sns.scatterplot(x='鄉鎮市區', y='單價', data=df_tr)

主要建材 離群值處理

# 移除 主要建材 左上角的 異常點
df_tr = df_tr.drop(df_tr[(df_tr['主要建材']<=2) & (df_tr['單價']>9)].index)

sns.scatterplot(x='主要建材', y='單價', data=df_tr)

橫坐標 離群值處理

# 移除 橫坐標 中間的 異常點
df_tr = df_tr.drop(df_tr[(df_tr['橫坐標']<=-2) & (df_tr['單價']>9)].index)

sns.scatterplot(x='橫坐標', y='單價', data=df_tr)

縱坐標 離群值處理

# 移除 縱坐標 中間的 異常點
df_tr = df_tr.drop(df_tr[(df_tr['縱坐標']<=-2) & (df_tr['單價']>9)].index)

sns.scatterplot(x='縱坐標', y='單價', data=df_tr)
# 將單價大於 6 的資料移除
df_tr = df_tr[df_tr['單價']<=6]
sns.distplot(df_tr['單價'])

特徵之間的相關關係

# 所有特徵之間的相關關係
numeric_columns = df_tr.select_dtypes(include=['float64', 'int64'])
corrs = numeric_columns.corr()
plt.figure(figsize=(16, 16))
sns.heatmap(corrs)

分析與目標值相關度最高的十個變量

cols_10 = corrs.nlargest(10, '單價')['單價'].index
corrs_10 = df_tr[cols_10].corr()
plt.figure(figsize=(6, 6))
sns.heatmap(corrs_10, annot=True)

使用 ExtraTreesRegressor 進行訓練

import numpy as np
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import Lasso, LassoCV, ElasticNet, ElasticNetCV, Ridge, RidgeCV
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, ExtraTreesRegressor
from xgboost import XGBRegressor
import xgboost as xgb


import torch
from torch import nn
import torch.optim as optim
import pandas as pd
from sklearn.model_selection import train_test_split


# 獲取訓練集合驗證數據
x_train, y_train = [], []
x_valid, y_valid = [], []

x_test = []

for batch in tr_set:
x_batch, y_batch = batch
x_train.append(x_batch)
y_train.append(y_batch)

for batch in dv_set:
x_batch, y_batch = batch
x_valid.append(x_batch)
y_valid.append(y_batch)

for batch in tt_set:
x_batch = batch
x_test.append(x_batch)

x_train = torch.cat(x_train)
x_valid = torch.cat(x_valid)
y_train = torch.cat(y_train)
y_valid = torch.cat(y_valid)

x_test = torch.cat(x_test)


# 建立模型列表
models = [
# RandomForestRegressor(n_estimators=250, random_state=42),
ExtraTreesRegressor(n_estimators=550, random_state=42),
# XGBRegressor( random_state=42),
# loss 很低,但是 提交結果很差
# xgb.XGBRegressor(max_depth=5, subsample=0.6, colsample_bytree=0.7, min_child_weight=3, seed=52, gamma=0, reg_alpha=0, reg_lambda=1, learning_rate=0.01, n_estimators=3000, random_state=42)
]

# 訓練和評估每個模型
mse_scores = []

for model in models:
model.fit(x_train, y_train)
y_pred = model.predict(x_valid)
mse = mean_squared_error(y_valid, y_pred)
mse_scores.append(mse)

# 每個模型的MSE
for i, mse in enumerate(mse_scores):
print(f"Model {i + 1} MSE: {mse}")

# 選擇最佳模型
best_model_index = np.argmin(mse_scores)
best_model = models[best_model_index]

print(f"Best Model (Model {best_model_index + 1}) with MSE: {mse_scores[best_model_index]}")

最終我使用了 ExtraTreesRegressor 在 Public 的 MAPE 拿到了最佳的成績是 9.79891,其中也發現了使用 XGBRegressor 能在訓練時,能取得最低的 MSE ,但是每次提交後的結果都不是很理想。

最後的總結:

我嘗試了不同的特徵處理方法,發現將縣市特徵移除,改用鄉鎮市區的平均房價作為特徵,能夠將 MAPE 降低至 10 以下。但是將主要用途的特徵也改為縣市的平均房價,卻沒有達到預期的效果,可能是我獲取的平均房價資料有誤。
另外,我也注意到驗證資料和訓練資料的分佈不一致,訓練資料多了嘉義縣,而正式賽的資料少了雲林縣,我將這兩個縣市的資料分別移除後,MAPE 也有所改善。
在最初的時候,我將很多特徵直接轉為平均房價,這樣對於每個縣市來說,誤差會很大,所以後來我改用 Label Encoding 的方式處理特徵。
我也嘗試過將房屋的年齡分成不同的區間,但是訓練結果並不理想。
以上是我第一次做房市預測的過程和心得。

Reference

--

--