Towards urban flood susceptibility mapping using machine and deep learning models (part 5): Convolutional neural networks

Omar Seleem
Hydroinformatics
Published in
6 min readFeb 3, 2023

In the last article, we prepared a dataset to map urban flood susceptibility using convolutional neural networks (CNN) (image-based models). This article shows how to read the images, train the CNN model and use it to map urban flood susceptibility. This series of articles summarize and explain (with python code) the paper “ Towards urban flood susceptibility mapping using data-driven models in Berlin, Germany” published in Geomatics, Natural Hazards and Risk. The Python script and a data sample are found here

We prepared two folders in the last article (Flooded and NotFlooded). Each folder includes images representing the predictive feature at each location in the flood inventory. Now we need to read these images and prepare it to be used as input to the model.

Read the data and prepare it for the model

import numpy as np
import matplotlib.pyplot as plt
import rasterio
import os
import cv2
import pandas as pd

# Read the preditive features and the label (Notflooded=0 , flooded=1)
data_path=r"D:\Predictive_features" # created in data_preperation script (last article)
CATEGORIES= ["NotFlooded", "Flooded"]

IMG_SIZE=23

let us check that there is no error in the data by plotting a sample

# Check the images
# plot the first band in the first image
for category in CATEGORIES:
path=os.path.join(data_path,category) # Path to flooded and NotFlooded dir
for img in os.listdir(path):
img_open=rasterio.open(os.path.join(path,img))
img_array=img_open.read(1) # open the image and read the first band
#img_array= np.where(img_array < 0, np.nan,img_array)
#mean=np.nanmean(img_array)
#img_array= np.where(np.isnan(img_array), mean,img_array)
plt.imshow(img_array,cmap="gray")
plt.show()
#print(img_array)
break
break

We used 11 predictive features, each feature is represented as a band in the images. i.e., each image has 11 bands. We need to read the images (11 bands and save them as a NumPy array). I prefer to save each predictive feature in a list of arrays. Therefore, I created an empty list for each predictive feature and then I put them in a list with the same order of the bands in the composite raster images.

# create a list for each predective feature

DEM=[]
Slope=[]
TWI=[]
DTRoad=[]
DTRiver=[]
CN=[]
Rain=[]
Aspect=[]
Curve=[]
Freq=[]
DTDrainage=[]
y=[]
# we have 11 predictive features
# every feature is presented in one band
# create a list which include the 11 predictive feature to
# loop over the predictive features
predictive_features=[DEM, Slope, TWI, DTRoad, DTRiver, CN, Rain, Aspect, Curve, Freq, DTDrainage] # the features muss be in the same order as the bands in the composite raster

Now we can use the below function to read images and save them in the lists. Please note that we need also to save the label (0 if the location is not flooded or 1 if the location is flooded). As the flooded and not-flooded locations are saved in separate folders. I used this to get the label, if we read images from the Notflooded folder then they get a label 0 and 1 for images from the flooded folder.

def create_training_data():
for i in range(len(predictive_features)):
print(i+1)
for category in CATEGORIES:
path= os.path.join(data_path,category) # Path to flooded and NotFlooded dir
class_num= CATEGORIES.index(category)
for img in os.listdir(path):
try:
img_open=rasterio.open(os.path.join(path,img))
print(category,img,class_num)
img_array=img_open.read(i+1)
#img_array= np.where(img_array < 0, np.nan,img_array)
#mean=np.nanmean(img_array)
#img_array= np.where(np.isnan(img_array), mean,img_array)
#new_array = cv2.resize(img_array, (IMG_SIZE, IMG_SIZE)) # resize to normalize data size
predictive_features[i].append(img_array)

if i==0:
y.append(class_num)
#print(class_num)
except Exception as e:
pass
create_training_data()

Now we have all the images saved in lists of Numpy arrays. We need to convert these lists of arrays into arrays with the following dimensions (Number of images * image size * image size * the number of bands). Using the sample data (84 * 23 * 23 * 1). Then, we need to combine all of the predictive features in one array with dimensions ( 84 * 23 * 23 *11 )

# convert the predictive feature lists to numpy arrays
DEM_array=np.array(DEM).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
Slope_array=np.array(Slope).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
TWI_array=np.array(TWI).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
DTRoad_array=np.array(DTRoad).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
DTRiver_array=np.array(DTRiver).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
CN_array=np.array(CN).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
DTDrainage_array=np.array(DTDrainage).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
Aspect_array=np.array(Aspect).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
Curvature_array=np.array(Curve).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
Freq_Curve_array=np.array(Freq).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
Rain_array=np.array(Rain).reshape(-1, IMG_SIZE, IMG_SIZE, 1)

# concatenate all the predicvtive feature arrays into one array
X_array=np.concatenate([DEM_array,Slope_array,TWI_array,DTRoad_array, DTRiver_array,CN_array,Rain_array,Aspect_array, Curvature_array, Freq_Curve_array, DTDrainage_array], axis=-1)

Then we split the data into training (60 %), validation (20 %) and testing (20 %). Firstly I split it into training and testing then later using Keras I will split the training data into training and validation

# split the data into training (60%), validation (20%) and testing (20%)

from sklearn.model_selection import train_test_split
x_train,x_test,y_train,y_test=train_test_split(X_array,y,test_size=0.2,random_state=42)

CNN Model: LeNet5 — Architecture

In the paper, we used the LeNet5 architecture as shown in Fig 1. This architecture is simple and suitable when there is not much data to train the model. The model is explained in detail in the paper.

Fig 1. The used LeNet5 architecture
import keras 
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Flatten
from keras.layers import Dense
from keras.layers import Dropout
from keras.callbacks import TensorBoard

NAME = "LeNet"


model = Sequential()
#Layer 1
#Conv Layer 1
model.add(Conv2D(filters = 6,
kernel_size = 5,
strides = 1,
activation = 'relu',
input_shape = (23,23,11)))
#Pooling layer 1
model.add(MaxPooling2D(pool_size = 2, strides = 2))

#add a droupout
model.add(Dropout(0.4))

#Layer 2
#Conv Layer 2
model.add(Conv2D(filters = 16,
kernel_size = 5,
strides = 1,
activation = 'relu',
input_shape = (14,14,6)))
#Pooling Layer 2
model.add(MaxPooling2D(pool_size = 2, strides = 2))

#add a droupout
model.add(Dropout(0.4))


#Flatten
model.add(Flatten())


#Layer 3
#Fully connected layer 1
model.add(Dense(units = 120, activation = 'relu'))

#add a droupout
model.add(Dropout(0.4))


#Layer 4
#Fully connected layer 2
model.add(Dense(units = 84, activation = 'relu'))
model.add(Dropout(0.4))



#Layer 5
#Output Layer
model.add(Dense(units = 1, activation = 'sigmoid'))

from keras.callbacks import ModelCheckpoint, EarlyStopping
checkpoint = ModelCheckpoint("LeNet.h5", monitor='val_loss', verbose=1, save_best_only=True, save_weights_only=False, mode='auto', period=1)
early = EarlyStopping(monitor='val_loss', min_delta=0, patience=20, verbose=1, mode='auto')

tensorboard = TensorBoard(log_dir="logs/{}".format(NAME))

model.compile(optimizer = 'adam', loss = 'binary_crossentropy', metrics = ['accuracy'])


model.summary()
history=model.fit(x_train ,y_train, batch_size=1024, epochs = 1000, validation_split=0.25, callbacks=[checkpoint,early,tensorboard])

Model Evaluation

After training the model, we need first to check if the model is over or underfitting. We can look at the training and validation accuracy and loss at each epoch. The model must have similar performance on both datasets.

#plot the training and validation accuracy and loss at each epoch
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'y', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
plt.plot(epochs, acc, 'y', label='Training acc')
plt.plot(epochs, val_acc, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

Then, we can calculate performance indices as accuracy, kappa, and area under the ROC curve.

_, acc = model.evaluate(x_test, y_test)
print("Accuracy = ", (acc * 100.0), "%")

#Confusion matrix
#We compare labels and plot them based on correct or wrong predictions.
#Since sigmoid outputs probabilities we need to apply threshold to convert to label.

mythreshold=0.5
from sklearn.metrics import confusion_matrix

y_pred = (model.predict(x_test)>= mythreshold).astype(int)
cm=confusion_matrix(y_test, y_pred)
print(cm)

from sklearn.metrics import classification_report
target_names = ['NotFlooded', 'Flooded']
print(classification_report(y_test, y_pred_test, target_names=target_names))

from sklearn.metrics import cohen_kappa_score

cohen_kappa_score(y_test, y_pred, labels=None, weights=None)

#Check the confusion matrix for various thresholds. Which one is good?
#Need to balance positive, negative, false positive and false negative.
#ROC can help identify the right threshold.
#Receiver Operating Characteristic (ROC) Curve is a plot that helps us
#visualize the performance of a binary classifier when the threshold is varied.
#ROC

from sklearn.metrics import roc_curve
y_preds = model.predict(x_test).ravel()

fpr, tpr, thresholds = roc_curve(y_test, y_preds)
plt.figure(1)
plt.plot([0, 1], [0, 1], 'y--')
plt.plot(fpr, tpr, marker='.')
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.show()

#AUC
#Area under the curve (AUC) for ROC plot can be used to understand how well a classifier is performing.
#% chance that the model can distinguish between positive and negative classes.

from sklearn.metrics import auc
auc_value = auc(fpr, tpr)
print("Area under curve, AUC = ", auc_value)

Finally, you can use the same steps to prepare data for the whole study area and predict the flood susceptibility for the whole area. It is not practical to use the CNN with a fine spatial resolution (< 10 m) as mapping the study area will take too much time. We found that the CNN model could only recognize the topographic depressions as areas with high flood susceptibility at the fine resolutions (5 and 2 m) as shown in Fig 2.

Fig 2. Flood susceptibility mapping at different spatial resolutions.

--

--

Omar Seleem
Hydroinformatics

Dr. -Ing | Hydrology | Data scientist | Machine learning