File size: 4,785 Bytes
d37b207
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from glob import glob
import os
import utm
import rasterio
from tqdm import tqdm
#from xml.etree import ElementTree as et
import xmltodict

##
def cloud_masking(image,cld):
        cloud_mask = cld > 30
        band_mean = image.mean()
        image[cloud_mask] = band_mean
        return image

##
def load_file(fp):
    """Takes a PosixPath object or string filepath
    and returns np array"""

    return np.array(Image.open(fp.__str__()))

def paths (name): 

    fold_band_10 = glob(name+"/GRANULE/*/IMG_DATA/R10m")[0]
    fold_band_20 = glob(name+"/GRANULE/*/IMG_DATA/R20m")[0]
    fold_band_60 = glob(name+"/GRANULE/*/IMG_DATA/R60m")[0]
    path = name+"/GRANULE/*/IMG_DATA/R10m"+"/*.jp2"
    x = glob(path)
    lists = x[0].split("/")[-1].split("_")
    fixe = lists[0]+'_'+lists[1]

    band_10 = ['B02', 'B03', 'B04','B08']
    band_20 = ['B05', 'B06', 'B07','B8A','B11', 'B12']
    band_60 = ['B01','B09']
    images_name_10m = [fixe+"_"+band+"_10m.jp2" for band in band_10 ]
    images_name_20m = [fixe+"_"+band+"_20m.jp2" for band in band_20 ]
    images_name_60m = [fixe+"_"+band+"_60m.jp2" for band in band_60 ]
    #
    bandes_path_10 = [os.path.join(fold_band_10,img) for img in images_name_10m]
    bandes_path_20 = [os.path.join(fold_band_20,img) for img in images_name_20m]
    bandes_path_60 = [os.path.join(fold_band_60,img) for img in images_name_60m]
    #
    tile_path = name+"/INSPIRE.xml"
    path_cld_20 = glob(name+"/GRANULE/*/QI_DATA/MSK_CLDPRB_20m.jp2")[0]
    path_cld_60 = glob(name+"/GRANULE/*/QI_DATA/MSK_CLDPRB_60m.jp2")[0]

    return bandes_path_10,bandes_path_20,bandes_path_60,tile_path,path_cld_20,path_cld_60

##
def coords_to_pixels(ref, utm, m=10):
    """ Convert UTM coordinates to pixel coordinates"""

    x = int((utm[0] - ref[0])/m)
    y = int((ref[1] - utm[1])/m)

    return x, y

##
def extract_sub_image(bandes_path,tile_path,area,resolution=10, d= 3, cld_path = None):
    
  xml_file=open(tile_path,"r")
  xml_string=xml_file.read()
  python_dict=xmltodict.parse(xml_string)
  tile_coordonnates = python_dict["gmd:MD_Metadata"]["gmd:identificationInfo"]["gmd:MD_DataIdentification"]["gmd:abstract"]["gco:CharacterString"].split()

  # S2 tile coordonnates
  lat,lon = float(tile_coordonnates[0]),float(tile_coordonnates[1])
  tile_coordonnate = [lat,lon]

  refx, refy, _, _ = utm.from_latlon(tile_coordonnate[0], tile_coordonnate[1])
  ax,ay,_,_ = utm.from_latlon(area[1],area[0]) # lat,lon
  
  ref = [refx, refy]
  utm_cord = [ax,ay]
  x,y = coords_to_pixels(ref,utm_cord,resolution)
  
  images = []
  # sub_image_extraction
  for band_path in tqdm(bandes_path, total=len(bandes_path)):
    image = load_file(band_path).astype(np.float32)
    if resolution==60:
        sub_image = image[y,x]
        images.append(sub_image)
   
    else:
        sub_image = image[y-d:y+d,x-d:x+d]
        images.append(sub_image)

  images = np.array(images)
        

 # verify if the study are is cloudy
  if cld_path is not None:
    cld_mask = load_file(cld_path).astype(np.float32)
    cld = cld_mask[y-d:y+d,x-d:x+d]
    # cloud removing
    images = cloud_masking(images,cld)

  if resolution==60:
      return images
  else:
      return images.mean((1,2))
  

def ndvi(area, tile_name):
    """
    polygone: (lon,lat) format
    tile_name: name of tile with the most low cloud coverage
    """
    #Extract tile  coordonnates (lat,long)
    tile_path = tile_name+"/INSPIRE.xml"
    xml_file=open(tile_path,"r")
    xml_string=xml_file.read()
    python_dict=xmltodict.parse(xml_string)
    tile_coordonnates = python_dict["gmd:MD_Metadata"]["gmd:identificationInfo"]["gmd:MD_DataIdentification"]["gmd:abstract"]["gco:CharacterString"].split()

    # S2 tile coordonnates
    lat,lon = float(tile_coordonnates[0]),float(tile_coordonnates[1])
    tile_coordonnate = [lat,lon]

    refx, refy, _, _ = utm.from_latlon(tile_coordonnate[0], tile_coordonnate[1])
    ax,ay,_,_ = utm.from_latlon(area[1],area[0]) # lat,lon
    
    ref = [refx, refy]
    utm_cord = [ax,ay]
    x,y = coords_to_pixels(ref,utm_cord)

    # read images
    path_4 = tile_name+"/GRANULE/*/IMG_DATA/R10m/*_B04_10m.jp2"
    path_8 = tile_name+"/GRANULE/*/IMG_DATA/R10m/*_B08_10m.jp2"
    red_object = rasterio.open(glob(path_4)[0])
    nir_object = rasterio.open(glob(path_8)[0])
    red = red_object.read()
    nir = nir_object.read()
    red,nir = red[0],nir[0]
    # extract area and remove unsigne
    sub_red = red[y-3:y+3,x-3:x+3].astype(np.float16)
    sub_nir = nir[y-3:y+3,x-3:x+3].astype(np.float16)
    
    # NDVI
    ndvi_image = ((sub_nir - sub_red)/(sub_nir+sub_red))
    ndvi_mean_value = ndvi_image.mean()
    
    return ndvi_mean_value