Coding Examples

Last updated: December 19th, 2022

Train Raster Models


Train DeepSatV2


Import Necessary Modules

import sys
import os
import numpy as np
import time
import torch
import torch.nn as nn
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from geotorchai.models.raster import DeepSatV2
from geotorchai.datasets.raster import EuroSAT
                                    

Initialize parameters

epoch_nums = 50
learning_rate = 0.0002
batch_size = 16
params = {'batch_size': batch_size, 'shuffle': False, 'drop_last':False, 'num_workers': 0}
validation_split = 0.2
shuffle_dataset = True

initial_checkpoint = PATH_TO_SAVE_MODEL
LOAD_INITIAL = False
random_seed = int(time.time())
                                    

Define method for calculating validation accuracy

def get_validation_accuracy(model, val_generator, device):
    model.eval()
    total_sample = 0
    correct = 0
    for i, sample in enumerate(val_generator):
        inputs, labels, features = sample
        inputs = inputs.to(device)
        features = features.type(torch.FloatTensor).to(device)
        labels = labels.to(device)

        outputs = model(inputs, features)
        total_sample += len(labels)

        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()

    accuracy = 100 * correct / total_sample

    return accuracy
                                    

Define method for initializing model and perform training and testing

def createModelAndTrain():
    
    # load data for calculating mean and standard deviation
    fullData = EuroSAT(root = "data/eurosat", include_additional_features = False)
    
    # calculate mean and standard deviation
    full_loader = DataLoader(fullData, batch_size= batch_size)
    channels_sum, channels_squared_sum, num_batches = 0, 0, 0
    for i, sample in enumerate(full_loader):
        data_temp, _ = sample
        channels_sum += torch.mean(data_temp, dim=[0, 2, 3])
        channels_squared_sum += torch.mean(data_temp**2, dim=[0, 2, 3])
        num_batches += 1

    mean = channels_sum / num_batches
    std = (channels_squared_sum / num_batches - mean ** 2) ** 0.5

    # define the normalize transformation operation
    sat_transform = transforms.Normalize(mean, std)
    fullData = EuroSAT(root = "data/eurosat", include_additional_features = True, transform = sat_transform)
    
    # Split data into train and test
    dataset_size = len(fullData)
    indices = list(range(dataset_size))

    split = int(np.floor(validation_split * dataset_size))
    if shuffle_dataset:
        np.random.seed(random_seed)
        np.random.shuffle(indices)
    train_indices, val_indices = indices[split:], indices[:split]

    train_sampler = SubsetRandomSampler(train_indices)
    valid_sampler = SubsetRandomSampler(val_indices)

    training_generator = DataLoader(fullData, **params, sampler=train_sampler)
    val_generator = DataLoader(fullData, **params, sampler=valid_sampler)

    # Total iterations
    total_iters = 5
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    test_accuracy = []
    total_time = 0
    epoch_runnned = 0

    for iteration in range(total_iters):
        # init model
        model = DeepSatV2(13, 64, 64, 10, len(fullData.ADDITIONAL_FEATURES))

        loss_fn = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        model.to(device)
        loss_fn.to(device)

        # Perform training
        max_val_accuracy = None
        for e in range(epoch_nums):
            t_start = time.time()
            for i, sample in enumerate(training_generator):
                inputs, labels, features = sample
                inputs = inputs.to(device)
                features = features.type(torch.FloatTensor).to(device)
                labels = labels.to(device)

                # Forward pass
                outputs = model(inputs, features)
                loss = loss_fn(outputs, labels)

                # Backward and optimize
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            t_end = time.time()
            total_time += t_end - t_start
            epoch_runnned += 1
            print('Epoch [{}/{}], Training Loss: {:.4f}'.format(e + 1, epoch_nums, loss.item()))

            val_accuracy = get_validation_accuracy(model, val_generator, device)
            print("Validation Accuracy: ", val_accuracy, "%")

            if max_val_accuracy == None or val_accuracy > max_val_accuracy:
                max_val_accuracy = val_accuracy
                torch.save(model.state_dict(), initial_checkpoint)
                print('best model saved!')

        model.load_state_dict(torch.load(initial_checkpoint, map_location=lambda storage, loc: storage))
        ## Perform the testing on the loaded model similarly to validation shown early.
                                    

Call model training and testin g from main

if __name__ == '__main__':
    
    createModelAndTrain()
                                    


Train SatCNN


Model training steps are almost similar to training of DeepSatV2. Please check the code on our GitHub Repository

Train UNet


Model training steps are almost similar to training of DeepSatV2. Please check the code on our GitHub Repository

Train FCN or Fully Convolutional Network


Model training steps are almost similar to training of DeepSatV2. Please check the code on our GitHub Repository



Train Grid Models


Train STResNet


Import Necessary Modules

import os
import time
import numpy as np
import torch
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torch.nn as nn
from geotorchai.models.grid import STResNet
from geotorchai.datasets.grid import BikeNYCDeepSTN
                                    

Initialize parameters

len_closeness = 3
len_period = 4
len_trend = 4
nb_residual_unit = 4

map_height, map_width = 21, 12
nb_flow = 2
nb_area = 81
T = 24

epoch_nums = 100
learning_rate = 0.0002
batch_size = 32
params = {'batch_size': batch_size, 'shuffle': False, 'drop_last':False, 'num_workers': 0}

validation_split = 0.1
shuffle_dataset = False
                                    

Define method for calculating three types of errors: MSE, MAE, RMSE

def compute_errors(preds, y_true):
    pred_mean = preds[:, 0:2]
    diff = y_true - pred_mean

    mse = np.mean(diff ** 2)
    rmse = np.sqrt(mse)
    mae = np.mean(np.abs(diff))

    return mse, mae, rmse
                                    

Define method for calculating validation loss

def get_validation_loss(model, val_generator, criterion, device):
    model.eval()
    mean_loss = []
    for i, sample in enumerate(val_generator):
        X_c = sample["x_closeness"].type(torch.FloatTensor).to(device)
        X_p = sample["x_period"].type(torch.FloatTensor).to(device)
        X_t = sample["x_trend"].type(torch.FloatTensor).to(device)
        Y_batch = sample["y_data"].type(torch.FloatTensor).to(device)

        outputs = model(X_c, X_p, X_t)
        mse= criterion(outputs, Y_batch).item()
        mean_loss.append(mse)

    mean_loss = np.mean(mean_loss)
    return mean_loss
                                    

Define method for initializing model and perform training and testing

def createModelAndTrain():

    train_dataset = BikeNYCDeepSTN(root = "data/deepstn")
    test_dataset = BikeNYCDeepSTN(root = "data/deepstn", is_training_data = False)

    min_max_diff = train_dataset.get_min_max_difference()

    dataset_size = len(train_dataset)
    indices = list(range(dataset_size))
    split = int(np.floor(validation_split * dataset_size))
    if shuffle_dataset:
        np.random.seed(random_seed)
        np.random.shuffle(indices)
    train_indices, val_indices = indices[split:], indices[:split]


    train_sampler = SubsetRandomSampler(train_indices)
    valid_sampler = SubsetRandomSampler(val_indices)

    training_generator = DataLoader(train_dataset, **params, sampler=train_sampler)
    val_generator = DataLoader(train_dataset, **params, sampler=valid_sampler)
    test_generator = DataLoader(test_dataset, batch_size=batch_size)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # Total iterations
    total_iters = 5

    test_mae = []
    test_mse = []
    test_rmse = []

    for iteration in range(total_iters):
        model = STResNet((len_closeness, nb_flow, map_height, map_width),
                     (len_period, nb_flow, map_height, map_width),
                     (len_trend, nb_flow , map_height, map_width),
                     external_dim = None, nb_residual_unit = nb_residual_unit)

        if LOAD_INITIAL:
            model.load_state_dict(torch.load(initial_checkpoint, map_location=lambda storage, loc: storage))

        loss_fn = nn.MSELoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        model.to(device)
        loss_fn.to(device)

        min_val_loss = None
        for e in range(epoch_nums):
            for i, sample in enumerate(training_generator):
                X_c = sample["x_closeness"].type(torch.FloatTensor).to(device)
                X_p = sample["x_period"].type(torch.FloatTensor).to(device)
                X_t = sample["x_trend"].type(torch.FloatTensor).to(device)
                Y_batch = sample["y_data"].type(torch.FloatTensor).to(device)

                # Forward pass
                outputs = model(X_c, X_p, X_t)
                loss = loss_fn(outputs, Y_batch)

                # Backward and optimize
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            print('Epoch [{}/{}], Loss: {:.4f}'.format(e + 1, epoch_nums, loss.item()))

            val_loss = get_validation_loss(model, val_generator, loss_fn, device)
            print('Mean validation loss:', val_loss)

            if min_val_loss == None or val_loss < min_val_loss:
                min_val_loss = val_loss
                torch.save(model.state_dict(), initial_checkpoint)
                print('best model saved!')

        model.load_state_dict(torch.load(initial_checkpoint, map_location=lambda storage, loc: storage))
        ## Perform the testing on the loaded model similarly to validation shown early.
                                    

Call model training and testin g from main

if __name__ == '__main__':
    
    createModelAndTrain()
                                    


Train ConvLSTM


Model training steps are almost similar to training of STResNet. Please check the code on our GitHub Repository

Train DeepSTN


Model training steps are almost similar to training of STResNet. Please check the code on our GitHub Repository



Deep Learning with GPU


In order to perform deep learning with GPU, we need to check whether GPU is available at first. We will initialize the device with GPU if it is available. Otherwise, CPU will be used as the default device.

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
                                

Now models, loss functions, and tensors can be loaded into the device by calling .to(device) method. Let's take the first example of deep learning with STResNet. After intializing model and loss_fn, we can load them into the GPU or CPU device with the following code snippets:

model.to(device)
loss_fn.to(device)
                                

Assume that x_data and y_data are two tensors that need to be loaded into GPU or CPU device. We can do that with the following code snippets:

x_data.to(device)
y_data.to(device)
                                


Preprocessing Examples


Preprocess Raster Data


Import Necessary Modules

from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from geotorchai.preprocessing import SparkRegistration, load_geotiff_image, write_geotiff_image
from geotorchai.preprocessing.raster import RasterProcessing as rp
                                    

Initialize SparkSession and register the session on Apache sedona

spark = SparkSession.builder.master("local[*]").appName("Sedona App").config("spark.serializer", KryoSerializer.getName).config("spark.kryo.registrator", SedonaKryoRegistrator.getName).config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.0-incubating,org.datasyslab:geotools-wrapper:geotools-24.0").getOrCreate()
SedonaRegistrator.registerAll(spark)
sc = spark.sparkContext
sc.setSystemProperty("sedona.global.charset", "utf8")
                                    

Register the SparkSession on GeoTorchAI

SparkRegistration.set_spark_session(spark)
                                    

Perform preprocessing on raster images

raster_df = load_geotiff_image("data/raster_data",  options_dict = {"readToCRS": "EPSG:4326"})
raster_df.show()

band1_df = rp.get_raster_band(raster_df, 1, "data", "nBands", new_column_name = "new_band", return_full_dataframe = False)
band1_df.show()

norm_diff_df = rp.get_normalized_difference_index(raster_df, 2, 1, "data", "nBands", new_column_name = "norm_band", return_full_dataframe = False)
norm_diff_df.show()

appended_df = rp.append_normalized_difference_index(raster_df, 2, 1, "data", "nBands")
appended_df.show()

write_geotiff_image(appended_df, "data/raster_data_written", options_dict = {"fieldNBands": "nBands", "writeToCRS": "EPSG:4326"}, num_partitions = 1)
                                    


Preprocess Spatiotemporal Data


Import Necessary Modules

from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

from geotorchai.preprocessing import SparkRegistration, load_geo_data, load_data
from geotorchai.preprocessing.enums import GeoFileType
from geotorchai.preprocessing.enums import AggregationType
from geotorchai.preprocessing.enums import GeoRelationship
from geotorchai.preprocessing.grid import SpacePartition
from geotorchai.preprocessing.grid import STManager as stm
from geotorchai.preprocessing import Adapter
                                    

Initialize SparkSession and register the session on Apache sedona

spark = SparkSession.builder.master("local[*]").appName("Sedona App").config("spark.serializer", KryoSerializer.getName).config("spark.kryo.registrator", SedonaKryoRegistrator.getName).config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.0-incubating,org.datasyslab:geotools-wrapper:geotools-24.0").getOrCreate()
SedonaRegistrator.registerAll(spark)
sc = spark.sparkContext
sc.setSystemProperty("sedona.global.charset", "utf8")
                                    

Register the SparkSession on GeoTorchAI

SparkRegistration.set_spark_session(spark)
                                    

Perform preprocessing on raster images

taxi_csv_path = "data/taxi_trip/yellow_tripdata_2009-01.csv"
taxi_df = load_data(taxi_csv_path, data_format = "csv", header = "true")
taxi_df = taxi_df.select("Trip_Pickup_DateTime", "Start_Lon", "Start_Lat")
taxi_df.show(5, False)

taxi_df = stm.trim_on_datetime(taxi_df, target_column = "Trip_Pickup_DateTime", upper_date = "2009-01-04 15:43:00", lower_date = "2009-01-04 03:31:00")
taxi_df.show(5, False)

taxi_df = stm.get_unix_timestamp(taxi_df, "Trip_Pickup_DateTime", new_column_alias = "converted_unix_time").drop("Trip_Pickup_DateTime")
taxi_df.show(5, False)

taxi_df = stm.add_temporal_steps(taxi_df, timestamp_column = "converted_unix_time", step_duration = 3600, temporal_steps_alias = "timesteps_id").drop("converted_unix_time")
taxi_df.show(5, False)
total_temporal_setps = stm.get_temporal_steps_count(taxi_df, temporal_steps_column = "timesteps_id")

taxi_df = stm.add_spatial_points(taxi_df, lat_column="Start_Lat", lon_column="Start_Lon", new_column_alias="point_loc").drop(*("Start_Lat", "Start_Lon"))
taxi_df.show(5, False)

zones = load_geo_data("data/taxi_trip/taxi_zones_2", GeoFileType.SHAPE_FILE)
zones.CRSTransform("epsg:2263", "epsg:4326")

zones_df = Adapter.rdd_to_spatial_df(zones)
grid_df = SpacePartition.generate_grid_cells(zones_df, "geometry", 50, 50)
grid_df.show(5, False)

column_list = ["point_loc"]
agg_types_list = [AggregationType.COUNT]
alias_list = ["point_cnt"]
st_df = stm.aggregate_st_dfs(grid_df, taxi_df, "geometry", "point_loc", "cell_id", "timesteps_id", GeoRelationship.CONTAINS, column_list, agg_types_list, alias_list)
st_df.show(5, False)

st_tensor = stm.get_st_grid_array(st_df, "timesteps_id", "cell_id", alias_list, temporal_length = total_temporal_setps, height = 50, width = 50, missing_data = 0)
print(st_tensor[0])
print("Tensor shape:")
print(st_tensor.shape)