Import Necessary Modules
import sys
import os
import numpy as np
import time
import torch
import torch.nn as nn
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from geotorchai.models.raster import DeepSatV2
from geotorchai.datasets.raster import EuroSAT
Initialize parameters
epoch_nums = 50
learning_rate = 0.0002
batch_size = 16
params = {'batch_size': batch_size, 'shuffle': False, 'drop_last':False, 'num_workers': 0}
validation_split = 0.2
shuffle_dataset = True
initial_checkpoint = PATH_TO_SAVE_MODEL
LOAD_INITIAL = False
random_seed = int(time.time())
Define method for calculating validation accuracy
def get_validation_accuracy(model, val_generator, device):
model.eval()
total_sample = 0
correct = 0
for i, sample in enumerate(val_generator):
inputs, labels, features = sample
inputs = inputs.to(device)
features = features.type(torch.FloatTensor).to(device)
labels = labels.to(device)
outputs = model(inputs, features)
total_sample += len(labels)
_, predicted = outputs.max(1)
correct += predicted.eq(labels).sum().item()
accuracy = 100 * correct / total_sample
return accuracy
Define method for initializing model and perform training and testing
def createModelAndTrain():
# load data for calculating mean and standard deviation
fullData = EuroSAT(root = "data/eurosat", include_additional_features = False)
# calculate mean and standard deviation
full_loader = DataLoader(fullData, batch_size= batch_size)
channels_sum, channels_squared_sum, num_batches = 0, 0, 0
for i, sample in enumerate(full_loader):
data_temp, _ = sample
channels_sum += torch.mean(data_temp, dim=[0, 2, 3])
channels_squared_sum += torch.mean(data_temp**2, dim=[0, 2, 3])
num_batches += 1
mean = channels_sum / num_batches
std = (channels_squared_sum / num_batches - mean ** 2) ** 0.5
# define the normalize transformation operation
sat_transform = transforms.Normalize(mean, std)
fullData = EuroSAT(root = "data/eurosat", include_additional_features = True, transform = sat_transform)
# Split data into train and test
dataset_size = len(fullData)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))
if shuffle_dataset:
np.random.seed(random_seed)
np.random.shuffle(indices)
train_indices, val_indices = indices[split:], indices[:split]
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)
training_generator = DataLoader(fullData, **params, sampler=train_sampler)
val_generator = DataLoader(fullData, **params, sampler=valid_sampler)
# Total iterations
total_iters = 5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
test_accuracy = []
total_time = 0
epoch_runnned = 0
for iteration in range(total_iters):
# init model
model = DeepSatV2(13, 64, 64, 10, len(fullData.ADDITIONAL_FEATURES))
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
model.to(device)
loss_fn.to(device)
# Perform training
max_val_accuracy = None
for e in range(epoch_nums):
t_start = time.time()
for i, sample in enumerate(training_generator):
inputs, labels, features = sample
inputs = inputs.to(device)
features = features.type(torch.FloatTensor).to(device)
labels = labels.to(device)
# Forward pass
outputs = model(inputs, features)
loss = loss_fn(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
t_end = time.time()
total_time += t_end - t_start
epoch_runnned += 1
print('Epoch [{}/{}], Training Loss: {:.4f}'.format(e + 1, epoch_nums, loss.item()))
val_accuracy = get_validation_accuracy(model, val_generator, device)
print("Validation Accuracy: ", val_accuracy, "%")
if max_val_accuracy == None or val_accuracy > max_val_accuracy:
max_val_accuracy = val_accuracy
torch.save(model.state_dict(), initial_checkpoint)
print('best model saved!')
model.load_state_dict(torch.load(initial_checkpoint, map_location=lambda storage, loc: storage))
## Perform the testing on the loaded model similarly to validation shown early.
Call model training and testin g from main
if __name__ == '__main__':
createModelAndTrain()
Model training steps are almost similar to training of DeepSatV2. Please check the code on our GitHub Repository
Model training steps are almost similar to training of DeepSatV2. Please check the code on our GitHub Repository
Model training steps are almost similar to training of DeepSatV2. Please check the code on our GitHub Repository
Import Necessary Modules
import os
import time
import numpy as np
import torch
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torch.nn as nn
from geotorchai.models.grid import STResNet
from geotorchai.datasets.grid import BikeNYCDeepSTN
Initialize parameters
len_closeness = 3
len_period = 4
len_trend = 4
nb_residual_unit = 4
map_height, map_width = 21, 12
nb_flow = 2
nb_area = 81
T = 24
epoch_nums = 100
learning_rate = 0.0002
batch_size = 32
params = {'batch_size': batch_size, 'shuffle': False, 'drop_last':False, 'num_workers': 0}
validation_split = 0.1
shuffle_dataset = False
Define method for calculating three types of errors: MSE, MAE, RMSE
def compute_errors(preds, y_true):
pred_mean = preds[:, 0:2]
diff = y_true - pred_mean
mse = np.mean(diff ** 2)
rmse = np.sqrt(mse)
mae = np.mean(np.abs(diff))
return mse, mae, rmse
Define method for calculating validation loss
def get_validation_loss(model, val_generator, criterion, device):
model.eval()
mean_loss = []
for i, sample in enumerate(val_generator):
X_c = sample["x_closeness"].type(torch.FloatTensor).to(device)
X_p = sample["x_period"].type(torch.FloatTensor).to(device)
X_t = sample["x_trend"].type(torch.FloatTensor).to(device)
Y_batch = sample["y_data"].type(torch.FloatTensor).to(device)
outputs = model(X_c, X_p, X_t)
mse= criterion(outputs, Y_batch).item()
mean_loss.append(mse)
mean_loss = np.mean(mean_loss)
return mean_loss
Define method for initializing model and perform training and testing
def createModelAndTrain():
train_dataset = BikeNYCDeepSTN(root = "data/deepstn")
test_dataset = BikeNYCDeepSTN(root = "data/deepstn", is_training_data = False)
min_max_diff = train_dataset.get_min_max_difference()
dataset_size = len(train_dataset)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))
if shuffle_dataset:
np.random.seed(random_seed)
np.random.shuffle(indices)
train_indices, val_indices = indices[split:], indices[:split]
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)
training_generator = DataLoader(train_dataset, **params, sampler=train_sampler)
val_generator = DataLoader(train_dataset, **params, sampler=valid_sampler)
test_generator = DataLoader(test_dataset, batch_size=batch_size)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Total iterations
total_iters = 5
test_mae = []
test_mse = []
test_rmse = []
for iteration in range(total_iters):
model = STResNet((len_closeness, nb_flow, map_height, map_width),
(len_period, nb_flow, map_height, map_width),
(len_trend, nb_flow , map_height, map_width),
external_dim = None, nb_residual_unit = nb_residual_unit)
if LOAD_INITIAL:
model.load_state_dict(torch.load(initial_checkpoint, map_location=lambda storage, loc: storage))
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
model.to(device)
loss_fn.to(device)
min_val_loss = None
for e in range(epoch_nums):
for i, sample in enumerate(training_generator):
X_c = sample["x_closeness"].type(torch.FloatTensor).to(device)
X_p = sample["x_period"].type(torch.FloatTensor).to(device)
X_t = sample["x_trend"].type(torch.FloatTensor).to(device)
Y_batch = sample["y_data"].type(torch.FloatTensor).to(device)
# Forward pass
outputs = model(X_c, X_p, X_t)
loss = loss_fn(outputs, Y_batch)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
print('Epoch [{}/{}], Loss: {:.4f}'.format(e + 1, epoch_nums, loss.item()))
val_loss = get_validation_loss(model, val_generator, loss_fn, device)
print('Mean validation loss:', val_loss)
if min_val_loss == None or val_loss < min_val_loss:
min_val_loss = val_loss
torch.save(model.state_dict(), initial_checkpoint)
print('best model saved!')
model.load_state_dict(torch.load(initial_checkpoint, map_location=lambda storage, loc: storage))
## Perform the testing on the loaded model similarly to validation shown early.
Call model training and testin g from main
if __name__ == '__main__':
createModelAndTrain()
Model training steps are almost similar to training of STResNet. Please check the code on our GitHub Repository
Model training steps are almost similar to training of STResNet. Please check the code on our GitHub Repository
In order to perform deep learning with GPU, we need to check whether GPU is available at first. We will initialize the device with GPU if it is available. Otherwise, CPU will be used as the default device.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
Now models, loss functions, and tensors can be loaded into the device by calling .to(device) method. Let's take the first example of deep learning with STResNet. After intializing model and loss_fn, we can load them into the GPU or CPU device with the following code snippets:
model.to(device)
loss_fn.to(device)
Assume that x_data and y_data are two tensors that need to be loaded into GPU or CPU device. We can do that with the following code snippets:
x_data.to(device)
y_data.to(device)
Import Necessary Modules
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from geotorchai.preprocessing import SparkRegistration, load_geotiff_image, write_geotiff_image
from geotorchai.preprocessing.raster import RasterProcessing as rp
Initialize SparkSession and register the session on Apache sedona
spark = SparkSession.builder.master("local[*]").appName("Sedona App").config("spark.serializer", KryoSerializer.getName).config("spark.kryo.registrator", SedonaKryoRegistrator.getName).config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.0-incubating,org.datasyslab:geotools-wrapper:geotools-24.0").getOrCreate()
SedonaRegistrator.registerAll(spark)
sc = spark.sparkContext
sc.setSystemProperty("sedona.global.charset", "utf8")
Register the SparkSession on GeoTorchAI
SparkRegistration.set_spark_session(spark)
Perform preprocessing on raster images
raster_df = load_geotiff_image("data/raster_data", options_dict = {"readToCRS": "EPSG:4326"})
raster_df.show()
band1_df = rp.get_raster_band(raster_df, 1, "data", "nBands", new_column_name = "new_band", return_full_dataframe = False)
band1_df.show()
norm_diff_df = rp.get_normalized_difference_index(raster_df, 2, 1, "data", "nBands", new_column_name = "norm_band", return_full_dataframe = False)
norm_diff_df.show()
appended_df = rp.append_normalized_difference_index(raster_df, 2, 1, "data", "nBands")
appended_df.show()
write_geotiff_image(appended_df, "data/raster_data_written", options_dict = {"fieldNBands": "nBands", "writeToCRS": "EPSG:4326"}, num_partitions = 1)
Import Necessary Modules
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from geotorchai.preprocessing import SparkRegistration, load_geo_data, load_data
from geotorchai.preprocessing.enums import GeoFileType
from geotorchai.preprocessing.enums import AggregationType
from geotorchai.preprocessing.enums import GeoRelationship
from geotorchai.preprocessing.grid import SpacePartition
from geotorchai.preprocessing.grid import STManager as stm
from geotorchai.preprocessing import Adapter
Initialize SparkSession and register the session on Apache sedona
spark = SparkSession.builder.master("local[*]").appName("Sedona App").config("spark.serializer", KryoSerializer.getName).config("spark.kryo.registrator", SedonaKryoRegistrator.getName).config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.0-incubating,org.datasyslab:geotools-wrapper:geotools-24.0").getOrCreate()
SedonaRegistrator.registerAll(spark)
sc = spark.sparkContext
sc.setSystemProperty("sedona.global.charset", "utf8")
Register the SparkSession on GeoTorchAI
SparkRegistration.set_spark_session(spark)
Perform preprocessing on raster images
taxi_csv_path = "data/taxi_trip/yellow_tripdata_2009-01.csv"
taxi_df = load_data(taxi_csv_path, data_format = "csv", header = "true")
taxi_df = taxi_df.select("Trip_Pickup_DateTime", "Start_Lon", "Start_Lat")
taxi_df.show(5, False)
taxi_df = stm.trim_on_datetime(taxi_df, target_column = "Trip_Pickup_DateTime", upper_date = "2009-01-04 15:43:00", lower_date = "2009-01-04 03:31:00")
taxi_df.show(5, False)
taxi_df = stm.get_unix_timestamp(taxi_df, "Trip_Pickup_DateTime", new_column_alias = "converted_unix_time").drop("Trip_Pickup_DateTime")
taxi_df.show(5, False)
taxi_df = stm.add_temporal_steps(taxi_df, timestamp_column = "converted_unix_time", step_duration = 3600, temporal_steps_alias = "timesteps_id").drop("converted_unix_time")
taxi_df.show(5, False)
total_temporal_setps = stm.get_temporal_steps_count(taxi_df, temporal_steps_column = "timesteps_id")
taxi_df = stm.add_spatial_points(taxi_df, lat_column="Start_Lat", lon_column="Start_Lon", new_column_alias="point_loc").drop(*("Start_Lat", "Start_Lon"))
taxi_df.show(5, False)
zones = load_geo_data("data/taxi_trip/taxi_zones_2", GeoFileType.SHAPE_FILE)
zones.CRSTransform("epsg:2263", "epsg:4326")
zones_df = Adapter.rdd_to_spatial_df(zones)
grid_df = SpacePartition.generate_grid_cells(zones_df, "geometry", 50, 50)
grid_df.show(5, False)
column_list = ["point_loc"]
agg_types_list = [AggregationType.COUNT]
alias_list = ["point_cnt"]
st_df = stm.aggregate_st_dfs(grid_df, taxi_df, "geometry", "point_loc", "cell_id", "timesteps_id", GeoRelationship.CONTAINS, column_list, agg_types_list, alias_list)
st_df.show(5, False)
st_tensor = stm.get_st_grid_array(st_df, "timesteps_id", "cell_id", alias_list, temporal_length = total_temporal_setps, height = 50, width = 50, missing_data = 0)
print(st_tensor[0])
print("Tensor shape:")
print(st_tensor.shape)