Source code for trace_source.land_sfc

#! /usr/bin/env python3
# coding=utf-8
""""""
"""
Author: radenz@tropos.de

"""

import os
import rasterio
import numpy as np
from affine import Affine
from pyproj import Proj, transform
from fastkml import kml
from shapely.geometry.point import Point
import shapely.vectorized
import toml

from numba import jit

[docs]@jit(nopython=True) def nearest(point, array, delta): """ calculate the index directly Args: point (float): point to search for array (np.array): array to search in delta: step size in array Returns: ``(i, array[i])`` """ ## i = bisect.bisect_left(array, point) #i = int((point - array[0]) / delta) ## print("search nearest ", i, point, " | ", array[max(0,i-3):i+4]) #nearest = min(array[max(0, i - 10):i + 10], key=lambda t: abs(point - t)) #i = np.where(array == nearest)[0][0] i_calc = min(np.round((point - array[0])/delta), array.shape[0]-1) #print(i, i_calc, point, array[i-1:i+2], (point - array[0])/delta) #assert i == i_calc i = int(i_calc) #return (i, nearest) return (i, array[i])
[docs]def argnearest(value, array, delta): """ """ i = np.searchsorted(array, value) - 1 if not i == array.shape[0] - 1: if np.abs(array[i] - value) > np.abs(array[i + 1] - value): i = i + 1 return (i, array[i])
#@jit(nopython=True) def fast_land_sfc(land_sfc_data, lat, lon, lats, longs): land_sfc_category = np.zeros((len(lat),)) land_sfc_category_new = np.zeros((len(lat),)) ilat = np.round((lat - lats[0])/-0.1).astype(int) ilat[ilat >= lats.shape[0]-1] = lats.shape[0]-1 ilat[ilat < 0] = 0 ilon = np.round((lon - longs[0])/0.1).astype(int) ilon[ilon >= longs.shape[0]-1] = longs.shape[0]-1 ilon[ilon < 0] = 0 land_sfc_category = land_sfc_data[ilat, ilon] land_sfc_category[lat == -999.] = -1 return land_sfc_category
[docs]class land_sfc(): """ load the data from the geotiff ``data/resampledLCType.tif`` and concatenate to the simplified scheme =============== ========================= MODIS Category simplified =============== ========================= 0 0 water 1,2,3,4,5,6 1 forest 7,8,9 2 savanna, shrubland 10, 11, 12, 14 3 grass, cropland 13 4 urban 15 5 snow 16 6 barren =============== ========================= """ def __init__(self): filename = os.path.dirname(os.path.abspath(__file__)) +\ '/../data/resampledLCType.tif' #with rasterio.divers(): with rasterio.open(filename, 'r') as src: meta = src.meta width = meta['width'] height = meta['height'] count = meta['count'] dtype = meta['dtype'] self.shape = src.shape self.transform = src.transform T0 = src.transform p1 = Proj(src.crs) print('T0 aka affine transformation ', T0) print('src.crs', src.crs) print('p1', p1) # allocate memory for image im = np.empty([height, width], dtype) # read image into memory print(meta) im[:, :] = src.read(1) T1 = T0 * Affine.translation(0.5, 0.5) # Function to convert pixel row/column index (from 0) to easting/northing at centre print('affine transformation', T1) etemp, northings = T1 * (np.meshgrid(np.arange(1), np.arange(im.shape[0]))) eastings, ntemp = T1 * (np.meshgrid(np.arange(im.shape[1]), np.arange(1))) #print(eastings, northings) # the geotiff provided is already in WGS84 assert src.crs == 'EPSG:4326' # # Project all longitudes, latitudes # p2 = Proj(proj='latlong', datum='WGS84') # _, lats = transform(p1, p2, etemp, northings) # longs, _ = transform(p1, p2, eastings, ntemp) # slow version of the coordinate calculation # cols, rows = np.meshgrid(np.arange(im.shape[1]), np.arange(im.shape[0])) # # Get affine transform for pixel centres # T1 = T0 * Affine.translation(0.5, 0.5) # # Function to convert pixel row/column index (from 0) to easting/northing at centre # rc2en = lambda r, c: (c, r) * T1 # # All eastings and northings (there is probably a faster way to do this) # eastings, northings = np.vectorize(rc2en, otypes=[np.float, np.float])(rows, cols) # #print("eastings", eastings.nbytes, eastings) # #print("northings", northings.nbytes, northings) # # Project all longitudes, latitudes # p2 = Proj(proj='latlong', datum='WGS84') # longs, lats = transform(p1, p2, eastings, northings) if (eastings == eastings[0, :]).all(): self.longs = eastings[0, :] if (northings.T == northings[:, 0]).all(): self.lats = northings[:, 0] self.land_sfc_data = im # high green self.land_sfc_data[(im >= 1) & (im <= 6)] = 1 # med green self.land_sfc_data[(im >= 7) & (im <= 9)] = 2 # low green self.land_sfc_data[(im >= 10) & (im <= 12)] = 3 self.land_sfc_data[im == 14] = 3 # urban self.land_sfc_data[im == 13] = 4 # snow/ice self.land_sfc_data[im == 15] = 5 # desert self.land_sfc_data[im == 16] = 6 self.categories = {0:'water',1:'forest',2:'savanna/shrub', 3:'grass/crop',4:'urban',5:'snow',6:'barren'} #@jit(nopython=True) #@profile
[docs] def get_land_sfc(self, lat, lon): """ get the land use pixel for a given coordinate interpolation to the nearest pixel is done Args: lat (float, array): latitude lon (float, array): longitude Returns: array or int with the land use category """ if isinstance(lat, float): lat = [lat] lon = [lon] if isinstance(lat, np.ma.core.MaskedArray): lat = lat.filled(-999.).tolist() lon = lon.filled(-999.).tolist() elif isinstance(lat, np.ndarray): lat = lat.tolist() lon = lon.tolist() # im[lat, lon] land_sfc_data = self.land_sfc_data if lat: cats = fast_land_sfc(land_sfc_data, lat, lon, self.lats, self.longs) else: cats = np.array([]) return cats
[docs] def get_land_sfc_shape(self, shp): """ get the land use inside a given shape :param shp: shapely Multipolygon .. deprecated:: 0.1 use the ensemble trajectories instead """ import rasterio.features mask = rasterio.features.rasterize( shp, out_shape=self.shape, transform=self.transform) masked_land_sfc = np.ma.MaskedArray(self.land_sfc, mask=np.logical_not(mask), copy=True) #print(masked_land_sfc.compressed()) return masked_land_sfc
@jit() def fast_geonames(geo_names_data, polygons, lat, lon): geo_id = np.zeros((len(lat),)) geo_id[:] = -1 for j, name in geo_names_data.items(): is_within = shapely.vectorized.contains(polygons[name], lon, lat) geo_id[is_within] = j return geo_id
[docs]class named_geography(): """ handle a single trajectory Args: selected_type (str): name in the ``geonames_config.toml`` """ def __init__(self, selected_type): config_file = 'geonames_config.toml' with open(config_file) as config_file: self.config = toml.loads(config_file.read()) filename = os.path.dirname(os.path.abspath(__file__)) +\ '/../' + self.config[selected_type]['filename'] # filename = os.path.dirname(os.path.abspath(__file__)) +\ # '/../prior_examples/geo_names.kml' k = kml.KML() with open(filename, 'r') as f: k.from_string(bytes(bytearray(f.read(), encoding='utf-8'))) docu = list(k.features())[0] polygons = {} for p in list(docu.features()): print(p.name) print(p.geometry) polygons[p.name] = p.geometry self.polygons = polygons # self.geo_names = {0: 'cont_europe', 1: 'sahara', 2: 'arabian_peninsula', # 3: 'far_east_deserts', 4: 'persia', 5: 'india'} self.geo_names = {int(k): v for k, v in self.config[selected_type]['geo_names'].items()} assert set(polygons.keys()) == set(self.geo_names.values()), \ ('not all keys in polygon are matched', polygons.keys(), self.geo_names.values()) print('loaded the named_geography') print('available geo names ', self.geo_names) #@jit(nopython=True) #@profile
[docs] def get_geo_names(self, lat, lon): """ get the names defined in the shapefile for a given coordinate Args: lat (float, array): latitude lon (float, array): longitude Returns: array or int with the geoname category """ if isinstance(lat, float): lat = [lat] lon = [lon] if isinstance(lat, np.ma.core.MaskedArray): lat = lat.filled(-999.).tolist() lon = lon.filled(-999.).tolist() elif isinstance(lat, np.ndarray): lat = lat.tolist() lon = lon.tolist() # im[lat, lon] geo_id = fast_geonames(self.geo_names, self.polygons, lat, lon) return geo_id
if __name__ == '__main__': ng = named_geography() print(ng.get_geo_names(19.67, 22.16)) print(ng.get_geo_names(25.63, 42.00))