Skip to content

Satellite Data Readers

This module provides specialized readers for various optical satellite missions. All these readers implement the GeoData protocol, which means they provide a consistent interface for spatial operations, data access, and manipulation.

These readers make it easy to work with official data formats from different Earth observation missions, and they can be used with all the functions available in the georeader.read module.

Readers available:

Sentinel-2 Reader

The Sentinel-2 reader provides functionality for reading Sentinel-2 L1C and L2A products in SAFE format. It supports:

  • Direct reading from local files or cloud storage (Google Cloud Storage)
  • Windowed reading for efficient memory usage
  • Conversion from digital numbers to radiance
  • Access to metadata, including viewing geometry and solar angles

Tutorial examples:

API Reference

Sentinel-2 reader inherited from https://github.com/IPL-UV/DL-L8S2-UV.

Authors: Gonzalo Mateo-García, Dan Lopez-Puigdollers

It has several enhancements:

  • Support for S2L2A images
  • It can read directly images from a GCP bucket (for example data from here)
  • Windowed read and read and reproject in the same function (see load_bands_bbox)
  • Creation of the image only involves reading one metadata file (xxx.SAFE/MTD_{self.producttype}.xml)
  • Compatible with georeader.read functions
  • It can read from the pyramid if available.

Sentinel-2 docs

S2Image

Base Sentinel-2 image reader for handling Sentinel-2 satellite products. Do Not use this class directly, use S2ImageL1C or S2ImageL2A instead.

This class provides functionality to read and manipulate Sentinel-2 satellite imagery. It handles the specific format and metadata of Sentinel-2 products, supporting operations like loading bands, masks, and converting digital numbers to radiance.

Parameters:

Name Type Description Default
s2folder str

Path to the Sentinel-2 SAFE product folder.

required
polygon Optional[Polygon]

Polygon defining the area of interest in EPSG:4326. Defaults to None (entire image).

None
granules Optional[Dict[str, str]]

Dictionary mapping band names to file paths. Defaults to None (automatically discovered).

None
out_res int

Output resolution in meters. Must be one of 10, 20, or 60. Defaults to 10.

10
window_focus Optional[Window]

Window to focus on a specific region of the image. Defaults to None (entire image).

None
bands Optional[List[str]]

List of bands to read. If None, all available bands will be loaded based on the product type.

None
metadata_msi Optional[str]

Path to metadata file. If None, it is assumed to be in the SAFE folder.

None

Attributes:

Name Type Description
mission str

Mission identifier (e.g., 'S2A', 'S2B').

producttype str

Product type identifier (e.g., 'MSIL1C', 'MSIL2A').

pdgs str

PDGS Processing Baseline number.

relorbitnum str

Relative Orbit number.

tile_number_field str

Tile Number field.

product_discriminator str

Product Discriminator.

name str

Base name of the product.

folder str

Path to the product folder.

datetime datetime

Acquisition datetime.

metadata_msi str

Path to the MSI metadata file.

out_res int

Output resolution in meters.

bands List[str]

List of bands to read.

dims Tuple[str]

Names of the dimensions ("band", "y", "x").

fill_value_default int

Default fill value (typically 0).

band_check str

Band used as template for reading.

granule_readers Dict[str, RasterioReader]

Dictionary of readers for each band.

window_focus Window

Current window focus.

transform

Affine transform for the window.

crs

Coordinate reference system.

shape

Shape of the data (bands, height, width).

bounds

Bounds of the window.

res Tuple[float, float]

Resolution of the data.

Source code in georeader/readers/S2_SAFE_reader.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
class S2Image:
    """
    Base Sentinel-2 image reader for handling Sentinel-2 satellite products.
    Do Not use this class directly, use S2ImageL1C or S2ImageL2A instead.

    This class provides functionality to read and manipulate Sentinel-2 satellite imagery.
    It handles the specific format and metadata of Sentinel-2 products, supporting operations
    like loading bands, masks, and converting digital numbers to radiance.

    Args:
        s2folder (str): Path to the Sentinel-2 SAFE product folder.
        polygon (Optional[Polygon]): Polygon defining the area of interest in EPSG:4326.
            Defaults to None (entire image).
        granules (Optional[Dict[str, str]]): Dictionary mapping band names to file paths.
            Defaults to None (automatically discovered).
        out_res (int): Output resolution in meters. Must be one of 10, 20, or 60. Defaults to 10.
        window_focus (Optional[rasterio.windows.Window]): Window to focus on a specific
            region of the image. Defaults to None (entire image).
        bands (Optional[List[str]]): List of bands to read. If None, all available bands
            will be loaded based on the product type.
        metadata_msi (Optional[str]): Path to metadata file. If None, it is assumed to be
            in the SAFE folder.

    Attributes:
        mission (str): Mission identifier (e.g., 'S2A', 'S2B').
        producttype (str): Product type identifier (e.g., 'MSIL1C', 'MSIL2A').
        pdgs (str): PDGS Processing Baseline number.
        relorbitnum (str): Relative Orbit number.
        tile_number_field (str): Tile Number field.
        product_discriminator (str): Product Discriminator.
        name (str): Base name of the product.
        folder (str): Path to the product folder.
        datetime (datetime): Acquisition datetime.
        metadata_msi (str): Path to the MSI metadata file.
        out_res (int): Output resolution in meters.
        bands (List[str]): List of bands to read.
        dims (Tuple[str]): Names of the dimensions ("band", "y", "x").
        fill_value_default (int): Default fill value (typically 0).
        band_check (str): Band used as template for reading.
        granule_readers (Dict[str, RasterioReader]): Dictionary of readers for each band.
        window_focus (rasterio.windows.Window): Current window focus.
        transform: Affine transform for the window.
        crs: Coordinate reference system.
        shape: Shape of the data (bands, height, width).
        bounds: Bounds of the window.
        res: Resolution of the data.

    """

    def __init__(
        self,
        s2folder: str,
        polygon: Optional[Polygon] = None,
        granules: Optional[Dict[str, str]] = None,
        out_res: int = 10,
        window_focus: Optional[rasterio.windows.Window] = None,
        bands: Optional[List[str]] = None,
        metadata_msi: Optional[str] = None,
    ):
        """
        Sentinel-2 image reader class.

        Args:
            s2folder: name of the SAFE product expects name
            polygon: in CRS EPSG:4326
            granules: dictionary with granule name and path
            out_res: output resolution in meters one of 10, 20, 60 (default 10)
            window_focus: rasterio window to read. All reads will be based on this window
            bands: list of bands to read. If None all bands are read.
            metadata_msi: path to metadata file. If None it is assumed to be in the SAFE folder

        """
        (
            self.mission,
            self.producttype,
            sensing_date_str,
            self.pdgs,
            self.relorbitnum,
            self.tile_number_field,
            self.product_discriminator,
        ) = s2_name_split(s2folder)

        # Remove last trailing slash
        s2folder = (
            s2folder[:-1]
            if (s2folder.endswith("/") or s2folder.endswith("\\"))
            else s2folder
        )
        self.name = os.path.basename(os.path.splitext(s2folder)[0])

        self.folder = s2folder
        self.datetime = datetime.datetime.strptime(
            sensing_date_str, "%Y%m%dT%H%M%S"
        ).replace(tzinfo=datetime.timezone.utc)

        info_granules_metadata = None

        if metadata_msi is None:
            info_granules_metadata = _get_info_granules_metadata(self.folder)
            if info_granules_metadata is not None:
                self.metadata_msi = info_granules_metadata["metadata_msi"]
                if "metadata_tl" in info_granules_metadata:
                    self.metadata_tl = info_granules_metadata["metadata_tl"]
            else:
                self.metadata_msi = os.path.join(
                    self.folder, f"MTD_{self.producttype}.xml"
                ).replace("\\", "/")

        else:
            self.metadata_msi = metadata_msi

        out_res = int(out_res)

        # TODO increase possible out_res to powers of 2 of 10 meters and 60 meters
        # rst = rasterio.open('gs://gcp-public-data-sentinel-2/tiles/49/S/GV/S2B_MSIL1C_20220527T030539_N0400_R075_T49SGV_20220527T051042.SAFE/GRANULE/L1C_T49SGV_A027271_20220527T031740/IMG_DATA/T49SGV_20220527T030539_B02.jp2')
        # rst.overviews(1) -> [2, 4, 8, 16]
        assert out_res in {10, 20, 60}, "Not valid output resolution.Choose 10, 20, 60"

        # Default resolution to read
        self.out_res = out_res

        if bands is None:
            if self.producttype == "MSIL2A":
                self.bands = list(BANDS_S2_L2A)
            else:
                self.bands = list(BANDS_S2)
        else:
            self.bands = normalize_band_names(bands)

        self.dims = ("band", "y", "x")
        self.fill_value_default = 0

        # Select the band that will be used as template when reading
        self.band_check = None
        for band in self.bands:
            if BANDS_RESOLUTION[band] == self.out_res:
                self.band_check = band
                break

        assert (
            self.band_check is not None
        ), f"Not band found of resolution {self.out_res} in {self.bands}"

        # This dict will be filled by the _get_reader function
        self.granule_readers: Dict[str, RasterioReader] = {}
        self.window_focus = window_focus
        self.root_metadata_msi = None
        self._radio_add_offsets = None
        self._solar_irradiance = None
        self._scale_factor_U = None
        self._quantification_value = None

        # The code below could be only triggered if required
        if not granules:
            # This is useful when copying with cache_product_to_local_dir func
            if info_granules_metadata is None:
                info_granules_metadata = _get_info_granules_metadata(self.folder)

            if info_granules_metadata is not None:
                self.granules = info_granules_metadata["granules"]

            else:
                self.load_metadata_msi()
                bands_elms = self.root_metadata_msi.findall(".//IMAGE_FILE")
                all_granules = [
                    os.path.join(self.folder, b.text + ".jp2").replace("\\", "/")
                    for b in bands_elms
                ]
                if self.producttype == "MSIL2A":
                    self.granules = {j.split("_")[-2]: j for j in all_granules}
                else:
                    self.granules = {
                        j.split("_")[-1].replace(".jp2", ""): j for j in all_granules
                    }
        else:
            self.granules = granules

        self._pol = polygon
        if self._pol is not None:
            self._pol_crs = window_utils.polygon_to_crs(
                self._pol, "EPSG:4326", self.crs
            )
        else:
            self._pol_crs = None

    def cache_product_to_local_dir(
        self,
        path_dest: Optional[str] = None,
        print_progress: bool = True,
        format_bands: Optional[str] = None,
    ) -> "__class__":
        """
        Copy the product to a local directory and return a new instance of the class with the new path

        Args:
            path_dest: path to the destination folder. If None, the current folder ()".") is used
            print_progress: print progress bar. Default True
            format_bands: format of the bands. Default None (keep original format). Options: "COG", "GeoTIFF"

        Returns:
            A new instance of the class pointing to the new path
        """
        if path_dest is None:
            path_dest = "."

        if format_bands is not None:
            assert format_bands in {
                "COG",
                "GeoTIFF",
            }, "Not valid format_bands. Choose 'COG' or 'GeoTIFF'"

        name_with_safe = f"{self.name}.SAFE"
        dest_folder = os.path.join(path_dest, name_with_safe)

        # Copy metadata
        metadata_filename = os.path.basename(self.metadata_msi)
        metadata_output_path = os.path.join(dest_folder, metadata_filename)
        if not os.path.exists(metadata_output_path):
            os.makedirs(dest_folder, exist_ok=True)
            self.load_metadata_msi()
            ET.ElementTree(self.root_metadata_msi).write(metadata_output_path)
            root_metadata_msi = self.root_metadata_msi
        else:
            root_metadata_msi = read_xml(metadata_output_path)

        bands_elms = root_metadata_msi.findall(".//IMAGE_FILE")
        if self.producttype == "MSIL2A":
            granules_name_metadata = {b.text.split("_")[-2]: b.text for b in bands_elms}
        else:
            granules_name_metadata = {b.text.split("_")[-1]: b.text for b in bands_elms}

        new_granules = {}
        with tqdm(total=len(self.bands), disable=not print_progress) as pbar:
            for b in self.bands:
                granule = self.granules[b]
                ext_origin = os.path.splitext(granule)[1]

                if format_bands is not None:
                    if ext_origin.startswith(".tif"):
                        convert = False
                    else:
                        convert = True

                    ext_dst = ".tif"
                else:
                    convert = False
                    ext_dst = ext_origin

                namefile = os.path.splitext(granules_name_metadata[b])[0]
                new_granules[b] = namefile + ext_dst
                new_granules_path = os.path.join(dest_folder, new_granules[b])
                if not os.path.exists(new_granules_path):
                    new_granules_path_tmp = os.path.join(
                        dest_folder, namefile + ext_origin
                    )
                    pbar.set_description(
                        f"Donwloading band {b} from {granule} to {new_granules_path}"
                    )
                    dir_granules_path = os.path.dirname(new_granules_path)
                    os.makedirs(dir_granules_path, exist_ok=True)
                    get_file(granule, new_granules_path_tmp)
                    if convert:
                        image = RasterioReader(new_granules_path_tmp).load().squeeze()
                        if format_bands == "COG":
                            save_cog(image, new_granules_path, descriptions=[b])
                        elif format_bands == "GeoTIFF":
                            save_tiled_geotiff(
                                image, new_granules_path, descriptions=[b]
                            )
                        else:
                            raise NotImplementedError(f"Not implemented {format_bands}")
                        os.remove(new_granules_path_tmp)

                pbar.update(1)

        # Save granules for fast reading
        granules_path = os.path.join(dest_folder, "granules.json").replace("\\", "/")
        if not os.path.exists(granules_path):
            with open(granules_path, "w") as fh:
                json.dump(
                    {"granules": new_granules, "metadata_msi": metadata_filename}, fh
                )

        new_granules_full_path = {
            k: os.path.join(dest_folder, v) for k, v in new_granules.items()
        }

        obj = s2loader(
            s2folder=dest_folder,
            out_res=self.out_res,
            window_focus=self.window_focus,
            bands=self.bands,
            granules=new_granules_full_path,
            polygon=self._pol,
            metadata_msi=metadata_output_path,
        )
        obj.root_metadata_msi = root_metadata_msi
        return obj

    def DN_to_radiance(self, dn_data: Optional[GeoTensor] = None) -> GeoTensor:
        return DN_to_radiance(self, dn_data)

    def load_metadata_msi(self) -> ET.Element:
        if self.root_metadata_msi is None:
            self.root_metadata_msi = read_xml(self.metadata_msi)
        return self.root_metadata_msi

    def footprint(self, crs: Optional[str] = None) -> Polygon:
        if self._pol_crs is None:
            self.load_metadata_msi()
            footprint_txt = self.root_metadata_msi.findall(".//EXT_POS_LIST")[0].text
            coords_split = footprint_txt.split(" ")[:-1]
            self._pol = Polygon(
                [
                    (float(lngstr), float(latstr))
                    for latstr, lngstr in zip(coords_split[::2], coords_split[1::2])
                ]
            )
            self._pol_crs = window_utils.polygon_to_crs(
                self._pol, "EPSG:4326", self.crs
            )

        pol_window = window_utils.window_polygon(
            self._get_reader().window_focus, self.transform
        )

        pol = self._pol_crs.intersection(pol_window)

        if (crs is None) or window_utils.compare_crs(self.crs, crs):
            return pol

        return window_utils.polygon_to_crs(pol, self.crs, crs)

    def radio_add_offsets(self) -> Dict[str, float]:
        if self._radio_add_offsets is None:
            self.load_metadata_msi()
            radio_add_offsets = self.root_metadata_msi.findall(".//RADIO_ADD_OFFSET")
            if len(radio_add_offsets) == 0:
                self._radio_add_offsets = {b: 0 for b in BANDS_S2}
            else:
                self._radio_add_offsets = {
                    BANDS_S2[int(r.attrib["band_id"])]: int(r.text)
                    for r in radio_add_offsets
                }

        return self._radio_add_offsets

    def solar_irradiance(self) -> Dict[str, float]:
        """
        Returns solar irradiance per nanometer: W/m²/nm

        Reads solar irradiance from metadata_msi:
            <SOLAR_IRRADIANCE bandId="0" unit="W/m²/µm">1874.3</SOLAR_IRRADIANCE>
        """
        if self._solar_irradiance is None:
            self.load_metadata_msi()
            sr = self.root_metadata_msi.findall(".//SOLAR_IRRADIANCE")
            self._solar_irradiance = {
                BANDS_S2[int(r.attrib["bandId"])]: float(r.text) / 1_000 for r in sr
            }

        return self._solar_irradiance

    def scale_factor_U(self) -> float:
        if self._scale_factor_U is None:
            self.load_metadata_msi()
            self._scale_factor_U = float(self.root_metadata_msi.find(".//U").text)

        return self._scale_factor_U

    def quantification_value(self) -> int:
        """Returns the quantification value stored in the metadata msi file (this is always: 10_000)"""
        if self._quantification_value is None:
            self.load_metadata_msi()
            self._quantification_value = int(
                self.root_metadata_msi.find(".//QUANTIFICATION_VALUE").text
            )

        return self._quantification_value

    def get_reader(
        self, band_names: Union[str, List[str]], overview_level: Optional[int] = None
    ) -> RasterioReader:
        """
        Provides a RasterioReader object to read all the bands at the same resolution

        Args:
            band_names: List of band names or band. raises assertion error if bands have different resolution.
            overview_level: level of the pyramid to read (same as in rasterio)

        Returns:
            RasterioReader

        """
        if isinstance(band_names, str):
            band_names = [band_names]

        band_names = normalize_band_names(band_names)

        assert all(
            BANDS_RESOLUTION[band_names[0]] == BANDS_RESOLUTION[b] for b in band_names
        ), f"Bands: {band_names} have different resolution"

        reader = RasterioReader(
            [self.granules[band_name] for band_name in band_names],
            window_focus=None,
            stack=False,
            fill_value_default=self.fill_value_default,
            overview_level=overview_level,
        )
        window_in = read.window_from_bounds(reader, self.bounds)
        window_in_rounded = read.round_outer_window(window_in)
        reader.set_window(window_in_rounded)
        return reader

    def _get_reader(self, band_name: Optional[str] = None) -> RasterioReader:
        if band_name is None:
            band_name = self.band_check

        if band_name not in self.granule_readers:
            # TODO handle different out_res than 10, 20, 60?
            if self.out_res == BANDS_RESOLUTION[band_name]:
                overview_level = None
                has_out_res = True
            elif self.out_res == BANDS_RESOLUTION[band_name] * 2:
                # out_res == 20 and BANDS_RESOLUTION[band_name]==10 -> read from first overview
                overview_level = 0
                has_out_res = True
            elif self.out_res > BANDS_RESOLUTION[band_name]:
                # out_res 60 and BANDS_RESOLUTION[band_name] == 10 or BANDS_RESOLUTION[band_name] == 20
                overview_level = 1 if BANDS_RESOLUTION[band_name] == 10 else 0
                has_out_res = False
            else:
                overview_level = None
                has_out_res = False

            # figure out which window_focus to set

            if band_name == self.band_check:
                window_focus = self.window_focus
                set_window_after = False
            elif has_out_res:
                window_focus = self.window_focus
                set_window_after = False
            else:
                set_window_after = True
                window_focus = None

            self.granule_readers[band_name] = RasterioReader(
                self.granules[band_name],
                window_focus=window_focus,
                fill_value_default=self.fill_value_default,
                overview_level=overview_level,
            )
            if set_window_after:
                window_in = read.window_from_bounds(
                    self.granule_readers[band_name], self.bounds
                )
                window_in_rounded = read.round_outer_window(window_in)
                self.granule_readers[band_name].set_window(window_in_rounded)

        return self.granule_readers[band_name]

    @property
    def dtype(self):
        # This is always np.uint16
        reader_band_check = self._get_reader()
        return reader_band_check.dtype

    @property
    def shape(self):
        reader_band_check = self._get_reader()
        return (len(self.bands),) + reader_band_check.shape[-2:]

    @property
    def transform(self):
        reader_band_check = self._get_reader()
        return reader_band_check.transform

    @property
    def crs(self):
        reader_band_check = self._get_reader()
        return reader_band_check.crs

    @property
    def bounds(self):
        reader_band_check = self._get_reader()
        return reader_band_check.bounds

    @property
    def res(self) -> Tuple[float, float]:
        reader_band_check = self._get_reader()
        return reader_band_check.res

    def __str__(self):
        return self.folder

    def __repr__(self) -> str:
        return f""" 
         {self.folder}
         Transform: {self.transform}
         Shape: {self.shape}
         Resolution: {self.res}
         Bounds: {self.bounds}
         CRS: {self.crs}
         bands: {self.bands}
         fill_value_default: {self.fill_value_default}
        """

    def read_from_band_names(self, band_names: List[str]) -> "__class__":
        """
        Read from band names

        Args:
            band_names: List of band names

        Returns:
            Copy of current object with band names set to band_names
        """
        s2obj = s2loader(
            s2folder=self.folder,
            out_res=self.out_res,
            window_focus=self.window_focus,
            bands=band_names,
            granules=self.granules,
            polygon=self._pol,
            metadata_msi=self.metadata_msi,
        )
        s2obj.root_metadata_msi = self.root_metadata_msi
        return s2obj

    def read_from_window(
        self, window: rasterio.windows.Window, boundless: bool = True
    ) -> "__class__":
        # return GeoTensor(values=self.values, transform=self.transform, crs=self.crs)

        reader_ref = self._get_reader()
        rasterio_reader_ref = reader_ref.read_from_window(
            window=window, boundless=boundless
        )
        s2obj = s2loader(
            s2folder=self.folder,
            out_res=self.out_res,
            window_focus=rasterio_reader_ref.window_focus,
            bands=self.bands,
            granules=self.granules,
            polygon=self._pol,
            metadata_msi=self.metadata_msi,
        )
        # Set band check to avoid re-reading
        s2obj.granule_readers[self.band_check] = rasterio_reader_ref
        s2obj.band_check = self.band_check

        s2obj.root_metadata_msi = self.root_metadata_msi

        return s2obj

    def load(self, boundless: bool = True) -> GeoTensor:
        reader_ref = self._get_reader()
        geotensor_ref = reader_ref.load(boundless=boundless)

        array_out = np.full(
            (len(self.bands),) + geotensor_ref.shape[-2:],
            fill_value=geotensor_ref.fill_value_default,
            dtype=np.int32,
        )

        # Deal with NODATA values
        invalids = (geotensor_ref.values == 0) | (geotensor_ref.values == (2**16) - 1)

        radio_add = self.radio_add_offsets()
        for idx, b in enumerate(self.bands):
            if b == self.band_check:

                # Avoid bug of band names without zero before
                if len(b) == 2:
                    b = f"B0{b[-1]}"

                geotensor_iter = geotensor_ref
            else:
                reader_iter = self._get_reader(b)
                if (
                    np.mean(
                        np.abs(np.array(reader_iter.res) - np.array(geotensor_ref.res))
                    )
                    < 1e-6
                ):
                    geotensor_iter = reader_iter.load(boundless=boundless)
                else:
                    geotensor_iter = read.read_reproject_like(
                        reader_iter, geotensor_ref
                    )

            # Important: Adds radio correction! otherwise images after 2022-01-25 shifted (PROCESSING_BASELINE '04.00' or above)
            array_out[idx] = geotensor_iter.values[0].astype(np.int32) + radio_add[b]

        array_out[:, invalids[0]] = self.fill_value_default

        if np.any(array_out < 0):
            raise ValueError("Negative values found in the image")

        array_out = array_out.astype(np.uint16)

        return GeoTensor(
            values=array_out,
            transform=geotensor_ref.transform,
            crs=geotensor_ref.crs,
            fill_value_default=self.fill_value_default,
        )

    @property
    def values(self) -> np.ndarray:
        return self.load().values

    def load_mask(self) -> GeoTensor:
        reader_ref = self._get_reader()
        geotensor_ref = reader_ref.load(boundless=True)
        geotensor_ref.values = (geotensor_ref.values == 0) | (
            geotensor_ref.values == (2**16) - 1
        )
        return geotensor_ref

__init__(s2folder, polygon=None, granules=None, out_res=10, window_focus=None, bands=None, metadata_msi=None)

Sentinel-2 image reader class.

Parameters:

Name Type Description Default
s2folder str

name of the SAFE product expects name

required
polygon Optional[Polygon]

in CRS EPSG:4326

None
granules Optional[Dict[str, str]]

dictionary with granule name and path

None
out_res int

output resolution in meters one of 10, 20, 60 (default 10)

10
window_focus Optional[Window]

rasterio window to read. All reads will be based on this window

None
bands Optional[List[str]]

list of bands to read. If None all bands are read.

None
metadata_msi Optional[str]

path to metadata file. If None it is assumed to be in the SAFE folder

None
Source code in georeader/readers/S2_SAFE_reader.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def __init__(
    self,
    s2folder: str,
    polygon: Optional[Polygon] = None,
    granules: Optional[Dict[str, str]] = None,
    out_res: int = 10,
    window_focus: Optional[rasterio.windows.Window] = None,
    bands: Optional[List[str]] = None,
    metadata_msi: Optional[str] = None,
):
    """
    Sentinel-2 image reader class.

    Args:
        s2folder: name of the SAFE product expects name
        polygon: in CRS EPSG:4326
        granules: dictionary with granule name and path
        out_res: output resolution in meters one of 10, 20, 60 (default 10)
        window_focus: rasterio window to read. All reads will be based on this window
        bands: list of bands to read. If None all bands are read.
        metadata_msi: path to metadata file. If None it is assumed to be in the SAFE folder

    """
    (
        self.mission,
        self.producttype,
        sensing_date_str,
        self.pdgs,
        self.relorbitnum,
        self.tile_number_field,
        self.product_discriminator,
    ) = s2_name_split(s2folder)

    # Remove last trailing slash
    s2folder = (
        s2folder[:-1]
        if (s2folder.endswith("/") or s2folder.endswith("\\"))
        else s2folder
    )
    self.name = os.path.basename(os.path.splitext(s2folder)[0])

    self.folder = s2folder
    self.datetime = datetime.datetime.strptime(
        sensing_date_str, "%Y%m%dT%H%M%S"
    ).replace(tzinfo=datetime.timezone.utc)

    info_granules_metadata = None

    if metadata_msi is None:
        info_granules_metadata = _get_info_granules_metadata(self.folder)
        if info_granules_metadata is not None:
            self.metadata_msi = info_granules_metadata["metadata_msi"]
            if "metadata_tl" in info_granules_metadata:
                self.metadata_tl = info_granules_metadata["metadata_tl"]
        else:
            self.metadata_msi = os.path.join(
                self.folder, f"MTD_{self.producttype}.xml"
            ).replace("\\", "/")

    else:
        self.metadata_msi = metadata_msi

    out_res = int(out_res)

    # TODO increase possible out_res to powers of 2 of 10 meters and 60 meters
    # rst = rasterio.open('gs://gcp-public-data-sentinel-2/tiles/49/S/GV/S2B_MSIL1C_20220527T030539_N0400_R075_T49SGV_20220527T051042.SAFE/GRANULE/L1C_T49SGV_A027271_20220527T031740/IMG_DATA/T49SGV_20220527T030539_B02.jp2')
    # rst.overviews(1) -> [2, 4, 8, 16]
    assert out_res in {10, 20, 60}, "Not valid output resolution.Choose 10, 20, 60"

    # Default resolution to read
    self.out_res = out_res

    if bands is None:
        if self.producttype == "MSIL2A":
            self.bands = list(BANDS_S2_L2A)
        else:
            self.bands = list(BANDS_S2)
    else:
        self.bands = normalize_band_names(bands)

    self.dims = ("band", "y", "x")
    self.fill_value_default = 0

    # Select the band that will be used as template when reading
    self.band_check = None
    for band in self.bands:
        if BANDS_RESOLUTION[band] == self.out_res:
            self.band_check = band
            break

    assert (
        self.band_check is not None
    ), f"Not band found of resolution {self.out_res} in {self.bands}"

    # This dict will be filled by the _get_reader function
    self.granule_readers: Dict[str, RasterioReader] = {}
    self.window_focus = window_focus
    self.root_metadata_msi = None
    self._radio_add_offsets = None
    self._solar_irradiance = None
    self._scale_factor_U = None
    self._quantification_value = None

    # The code below could be only triggered if required
    if not granules:
        # This is useful when copying with cache_product_to_local_dir func
        if info_granules_metadata is None:
            info_granules_metadata = _get_info_granules_metadata(self.folder)

        if info_granules_metadata is not None:
            self.granules = info_granules_metadata["granules"]

        else:
            self.load_metadata_msi()
            bands_elms = self.root_metadata_msi.findall(".//IMAGE_FILE")
            all_granules = [
                os.path.join(self.folder, b.text + ".jp2").replace("\\", "/")
                for b in bands_elms
            ]
            if self.producttype == "MSIL2A":
                self.granules = {j.split("_")[-2]: j for j in all_granules}
            else:
                self.granules = {
                    j.split("_")[-1].replace(".jp2", ""): j for j in all_granules
                }
    else:
        self.granules = granules

    self._pol = polygon
    if self._pol is not None:
        self._pol_crs = window_utils.polygon_to_crs(
            self._pol, "EPSG:4326", self.crs
        )
    else:
        self._pol_crs = None

cache_product_to_local_dir(path_dest=None, print_progress=True, format_bands=None)

Copy the product to a local directory and return a new instance of the class with the new path

Parameters:

Name Type Description Default
path_dest Optional[str]

path to the destination folder. If None, the current folder ()".") is used

None
print_progress bool

print progress bar. Default True

True
format_bands Optional[str]

format of the bands. Default None (keep original format). Options: "COG", "GeoTIFF"

None

Returns:

Type Description
__class__

A new instance of the class pointing to the new path

Source code in georeader/readers/S2_SAFE_reader.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
def cache_product_to_local_dir(
    self,
    path_dest: Optional[str] = None,
    print_progress: bool = True,
    format_bands: Optional[str] = None,
) -> "__class__":
    """
    Copy the product to a local directory and return a new instance of the class with the new path

    Args:
        path_dest: path to the destination folder. If None, the current folder ()".") is used
        print_progress: print progress bar. Default True
        format_bands: format of the bands. Default None (keep original format). Options: "COG", "GeoTIFF"

    Returns:
        A new instance of the class pointing to the new path
    """
    if path_dest is None:
        path_dest = "."

    if format_bands is not None:
        assert format_bands in {
            "COG",
            "GeoTIFF",
        }, "Not valid format_bands. Choose 'COG' or 'GeoTIFF'"

    name_with_safe = f"{self.name}.SAFE"
    dest_folder = os.path.join(path_dest, name_with_safe)

    # Copy metadata
    metadata_filename = os.path.basename(self.metadata_msi)
    metadata_output_path = os.path.join(dest_folder, metadata_filename)
    if not os.path.exists(metadata_output_path):
        os.makedirs(dest_folder, exist_ok=True)
        self.load_metadata_msi()
        ET.ElementTree(self.root_metadata_msi).write(metadata_output_path)
        root_metadata_msi = self.root_metadata_msi
    else:
        root_metadata_msi = read_xml(metadata_output_path)

    bands_elms = root_metadata_msi.findall(".//IMAGE_FILE")
    if self.producttype == "MSIL2A":
        granules_name_metadata = {b.text.split("_")[-2]: b.text for b in bands_elms}
    else:
        granules_name_metadata = {b.text.split("_")[-1]: b.text for b in bands_elms}

    new_granules = {}
    with tqdm(total=len(self.bands), disable=not print_progress) as pbar:
        for b in self.bands:
            granule = self.granules[b]
            ext_origin = os.path.splitext(granule)[1]

            if format_bands is not None:
                if ext_origin.startswith(".tif"):
                    convert = False
                else:
                    convert = True

                ext_dst = ".tif"
            else:
                convert = False
                ext_dst = ext_origin

            namefile = os.path.splitext(granules_name_metadata[b])[0]
            new_granules[b] = namefile + ext_dst
            new_granules_path = os.path.join(dest_folder, new_granules[b])
            if not os.path.exists(new_granules_path):
                new_granules_path_tmp = os.path.join(
                    dest_folder, namefile + ext_origin
                )
                pbar.set_description(
                    f"Donwloading band {b} from {granule} to {new_granules_path}"
                )
                dir_granules_path = os.path.dirname(new_granules_path)
                os.makedirs(dir_granules_path, exist_ok=True)
                get_file(granule, new_granules_path_tmp)
                if convert:
                    image = RasterioReader(new_granules_path_tmp).load().squeeze()
                    if format_bands == "COG":
                        save_cog(image, new_granules_path, descriptions=[b])
                    elif format_bands == "GeoTIFF":
                        save_tiled_geotiff(
                            image, new_granules_path, descriptions=[b]
                        )
                    else:
                        raise NotImplementedError(f"Not implemented {format_bands}")
                    os.remove(new_granules_path_tmp)

            pbar.update(1)

    # Save granules for fast reading
    granules_path = os.path.join(dest_folder, "granules.json").replace("\\", "/")
    if not os.path.exists(granules_path):
        with open(granules_path, "w") as fh:
            json.dump(
                {"granules": new_granules, "metadata_msi": metadata_filename}, fh
            )

    new_granules_full_path = {
        k: os.path.join(dest_folder, v) for k, v in new_granules.items()
    }

    obj = s2loader(
        s2folder=dest_folder,
        out_res=self.out_res,
        window_focus=self.window_focus,
        bands=self.bands,
        granules=new_granules_full_path,
        polygon=self._pol,
        metadata_msi=metadata_output_path,
    )
    obj.root_metadata_msi = root_metadata_msi
    return obj

get_reader(band_names, overview_level=None)

Provides a RasterioReader object to read all the bands at the same resolution

Parameters:

Name Type Description Default
band_names Union[str, List[str]]

List of band names or band. raises assertion error if bands have different resolution.

required
overview_level Optional[int]

level of the pyramid to read (same as in rasterio)

None

Returns:

Type Description
RasterioReader

RasterioReader

Source code in georeader/readers/S2_SAFE_reader.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
def get_reader(
    self, band_names: Union[str, List[str]], overview_level: Optional[int] = None
) -> RasterioReader:
    """
    Provides a RasterioReader object to read all the bands at the same resolution

    Args:
        band_names: List of band names or band. raises assertion error if bands have different resolution.
        overview_level: level of the pyramid to read (same as in rasterio)

    Returns:
        RasterioReader

    """
    if isinstance(band_names, str):
        band_names = [band_names]

    band_names = normalize_band_names(band_names)

    assert all(
        BANDS_RESOLUTION[band_names[0]] == BANDS_RESOLUTION[b] for b in band_names
    ), f"Bands: {band_names} have different resolution"

    reader = RasterioReader(
        [self.granules[band_name] for band_name in band_names],
        window_focus=None,
        stack=False,
        fill_value_default=self.fill_value_default,
        overview_level=overview_level,
    )
    window_in = read.window_from_bounds(reader, self.bounds)
    window_in_rounded = read.round_outer_window(window_in)
    reader.set_window(window_in_rounded)
    return reader

quantification_value()

Returns the quantification value stored in the metadata msi file (this is always: 10_000)

Source code in georeader/readers/S2_SAFE_reader.py
536
537
538
539
540
541
542
543
544
def quantification_value(self) -> int:
    """Returns the quantification value stored in the metadata msi file (this is always: 10_000)"""
    if self._quantification_value is None:
        self.load_metadata_msi()
        self._quantification_value = int(
            self.root_metadata_msi.find(".//QUANTIFICATION_VALUE").text
        )

    return self._quantification_value

read_from_band_names(band_names)

Read from band names

Parameters:

Name Type Description Default
band_names List[str]

List of band names

required

Returns:

Type Description
__class__

Copy of current object with band names set to band_names

Source code in georeader/readers/S2_SAFE_reader.py
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
def read_from_band_names(self, band_names: List[str]) -> "__class__":
    """
    Read from band names

    Args:
        band_names: List of band names

    Returns:
        Copy of current object with band names set to band_names
    """
    s2obj = s2loader(
        s2folder=self.folder,
        out_res=self.out_res,
        window_focus=self.window_focus,
        bands=band_names,
        granules=self.granules,
        polygon=self._pol,
        metadata_msi=self.metadata_msi,
    )
    s2obj.root_metadata_msi = self.root_metadata_msi
    return s2obj

solar_irradiance()

Returns solar irradiance per nanometer: W/m²/nm

Source code in georeader/readers/S2_SAFE_reader.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
def solar_irradiance(self) -> Dict[str, float]:
    """
    Returns solar irradiance per nanometer: W/m²/nm

    Reads solar irradiance from metadata_msi:
        <SOLAR_IRRADIANCE bandId="0" unit="W/m²/µm">1874.3</SOLAR_IRRADIANCE>
    """
    if self._solar_irradiance is None:
        self.load_metadata_msi()
        sr = self.root_metadata_msi.findall(".//SOLAR_IRRADIANCE")
        self._solar_irradiance = {
            BANDS_S2[int(r.attrib["bandId"])]: float(r.text) / 1_000 for r in sr
        }

    return self._solar_irradiance

S2ImageL1C

Bases: S2Image

Sentinel-2 Level 1C (top of atmosphere reflectance) image reader.

This class extends the base S2Image class to handle Sentinel-2 Level 1C products, which provide calibrated and orthorectified top of atmosphere reflectance data. It also provides methods to access viewing and solar angle information.

Parameters:

Name Type Description Default
s2folder str

Path to the Sentinel-2 SAFE product folder.

required
granules Dict[str, str]

Dictionary mapping band names to file paths.

required
polygon Polygon

Polygon defining the area of interest in EPSG:4326.

required
out_res int

Output resolution in meters. Must be one of 10, 20, or 60. Defaults to 10.

10
window_focus Optional[Window]

Window to focus on a specific region of the image. Defaults to None (entire image).

None
bands Optional[List[str]]

List of bands to read. If None, all available bands will be loaded.

None
metadata_msi Optional[str]

Path to metadata file. If None, it is assumed to be in the SAFE folder.

None

Attributes:

Name Type Description
Additional to S2Image attributes
granule_folder str

Path to the granule folder.

msk_clouds_file str

Path to the cloud mask file.

metadata_tl str

Path to the TL metadata file.

root_metadata_tl

Root element of the TL metadata XML.

tileId str

Tile identifier.

satId str

Satellite identifier.

procLevel str

Processing level.

dimsByRes Dict

Dimensions by resolution.

ulxyByRes Dict

Upper-left coordinates by resolution.

tileAnglesNode Dict

Tile angles node from metadata.

mean_sza float

Mean solar zenith angle.

mean_saa float

Mean solar azimuth angle.

mean_vza Dict[str, float]

Mean viewing zenith angle per band.

mean_vaa Dict[str, float]

Mean viewing azimuth angle per band.

vaa Dict[str, GeoTensor]

Viewing azimuth angle as GeoTensor per band.

vza Dict[str, GeoTensor]

Viewing zenith angle as GeoTensor per band.

saa GeoTensor

Solar azimuth angle as GeoTensor.

sza GeoTensor

Solar zenith angle as GeoTensor.

anglesULXY Tuple[float, float]

Upper-left coordinates of the angle grids.

Examples:

>>> # Initialize the S2ImageL1C reader with a data path
>>> s2_l1c = S2ImageL1C('/path/to/S2A_MSIL1C_20170717T235959_N0205_R072_T01WCP_20170718T000256.SAFE',
...                     granules=granules_dict, polygon=aoi_polygon)
>>> # Load all bands
>>> l1c_data = s2_l1c.load()
>>> # Read angle information
>>> s2_l1c.read_metadata_tl()
>>> solar_zenith = s2_l1c.sza
>>> # Convert to radiance
>>> radiance_data = s2_l1c.DN_to_radiance()
Source code in georeader/readers/S2_SAFE_reader.py
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
class S2ImageL1C(S2Image):
    """
    Sentinel-2 Level 1C (top of atmosphere reflectance) image reader.

    This class extends the base S2Image class to handle Sentinel-2 Level 1C products,
    which provide calibrated and orthorectified top of atmosphere reflectance data.
    It also provides methods to access viewing and solar angle information.

    Args:
        s2folder (str): Path to the Sentinel-2 SAFE product folder.
        granules (Dict[str, str]): Dictionary mapping band names to file paths.
        polygon (Polygon): Polygon defining the area of interest in EPSG:4326.
        out_res (int): Output resolution in meters. Must be one of 10, 20, or 60. Defaults to 10.
        window_focus (Optional[rasterio.windows.Window]): Window to focus on a specific
            region of the image. Defaults to None (entire image).
        bands (Optional[List[str]]): List of bands to read. If None, all available bands will be loaded.
        metadata_msi (Optional[str]): Path to metadata file. If None, it is assumed to be
            in the SAFE folder.

    Attributes:
        Additional to S2Image attributes:
        granule_folder (str): Path to the granule folder.
        msk_clouds_file (str): Path to the cloud mask file.
        metadata_tl (str): Path to the TL metadata file.
        root_metadata_tl: Root element of the TL metadata XML.
        tileId (str): Tile identifier.
        satId (str): Satellite identifier.
        procLevel (str): Processing level.
        dimsByRes (Dict): Dimensions by resolution.
        ulxyByRes (Dict): Upper-left coordinates by resolution.
        tileAnglesNode: Tile angles node from metadata.
        mean_sza (float): Mean solar zenith angle.
        mean_saa (float): Mean solar azimuth angle.
        mean_vza (Dict[str, float]): Mean viewing zenith angle per band.
        mean_vaa (Dict[str, float]): Mean viewing azimuth angle per band.
        vaa (Dict[str, GeoTensor]): Viewing azimuth angle as GeoTensor per band.
        vza (Dict[str, GeoTensor]): Viewing zenith angle as GeoTensor per band.
        saa (GeoTensor): Solar azimuth angle as GeoTensor.
        sza (GeoTensor): Solar zenith angle as GeoTensor.
        anglesULXY (Tuple[float, float]): Upper-left coordinates of the angle grids.

    Examples:
        >>> # Initialize the S2ImageL1C reader with a data path
        >>> s2_l1c = S2ImageL1C('/path/to/S2A_MSIL1C_20170717T235959_N0205_R072_T01WCP_20170718T000256.SAFE',
        ...                     granules=granules_dict, polygon=aoi_polygon)
        >>> # Load all bands
        >>> l1c_data = s2_l1c.load()
        >>> # Read angle information
        >>> s2_l1c.read_metadata_tl()
        >>> solar_zenith = s2_l1c.sza
        >>> # Convert to radiance
        >>> radiance_data = s2_l1c.DN_to_radiance()
    """

    def __init__(
        self,
        s2folder,
        granules: Dict[str, str],
        polygon: Polygon,
        out_res: int = 10,
        window_focus: Optional[rasterio.windows.Window] = None,
        bands: Optional[List[str]] = None,
        metadata_msi: Optional[str] = None,
    ):
        super(S2ImageL1C, self).__init__(
            s2folder=s2folder,
            granules=granules,
            polygon=polygon,
            out_res=out_res,
            bands=bands,
            window_focus=window_focus,
            metadata_msi=metadata_msi,
        )

        assert (
            self.producttype == "MSIL1C"
        ), f"Unexpected product type {self.producttype} in image {self.folder}"

        first_granule = self.granules[list(self.granules.keys())[0]]
        self.granule_folder = os.path.dirname(os.path.dirname(first_granule))
        self.msk_clouds_file = os.path.join(
            self.granule_folder, "MSK_CLOUDS_B00.gml"
        ).replace("\\", "/")
        if not hasattr(self, "metadata_tl"):
            self.metadata_tl = os.path.join(self.granule_folder, "MTD_TL.xml").replace(
                "\\", "/"
            )

        self.root_metadata_tl = None

        # Granule in L1C does not include TCI
        # Assert bands in self.granule are ordered as in BANDS_S2
        # assert all(granule[-7:-4] == bname for bname, granule in zip(BANDS_S2, self.granule)), f"some granules are not in the expected order {self.granule}"

    def read_from_window(
        self, window: rasterio.windows.Window, boundless: bool = True
    ) -> "__class__":
        out = super().read_from_window(window, boundless=boundless)

        if self.root_metadata_tl is None:
            return out

        # copy all metadata from the original image
        for atribute in [
            "tileId",
            "root_metadata_tl",
            "satId",
            "procLevel",
            "dimsByRes",
            "ulxyByRes",
            "tileAnglesNode",
            "mean_sza",
            "mean_saa",
            "mean_vza",
            "mean_vaa",
            "vaa",
            "vza",
            "saa",
            "sza",
            "anglesULXY",
        ]:
            setattr(out, atribute, getattr(self, atribute))

        return out

    def cache_product_to_local_dir(
        self,
        path_dest: Optional[str] = None,
        print_progress: bool = True,
        format_bands: Optional[str] = None,
    ) -> "__class__":
        """
        Overrides the parent method to copy the MTD_TL.xml file

        Args:
            path_dest (Optional[str], optional): path to the destination folder. Defaults to None.
            print_progress (bool, optional): whether to print progress. Defaults to True.

        Returns:
            __class__: the cached object
        """
        new_obj = super().cache_product_to_local_dir(
            path_dest=path_dest,
            print_progress=print_progress,
            format_bands=format_bands,
        )

        if os.path.exists(new_obj.metadata_tl):
            # the cached product already exists. returns
            return new_obj

        if self.root_metadata_tl is not None:
            new_obj.root_metadata_tl = self.root_metadata_tl
            ET.ElementTree(new_obj.metadata_tl).write(new_obj.metadata_tl)
            # copy all metadata from the original image
            for atribute in [
                "tileId",
                "root_metadata_tl",
                "satId",
                "procLevel",
                "dimsByRes",
                "ulxyByRes",
                "tileAnglesNode",
                "mean_sza",
                "mean_saa",
                "mean_vza",
                "mean_vaa",
                "vaa",
                "vza",
                "saa",
                "sza",
                "anglesULXY",
            ]:
                if hasattr(self, atribute):
                    setattr(new_obj, atribute, getattr(self, atribute))
        else:
            get_file(self.metadata_tl, new_obj.metadata_tl)

        granule_folder_rel = new_obj.granule_folder.replace("\\", "/").replace(
            new_obj.folder.replace("\\", "/") + "/", ""
        )
        # Add metadata_tl to granules.json
        granules_path = os.path.join(new_obj.folder, "granules.json").replace("\\", "/")
        with open(granules_path, "r") as fh:
            info_granules_metadata = json.load(fh)
        info_granules_metadata["metadata_tl"] = os.path.join(
            granule_folder_rel, "MTD_TL.xml"
        ).replace("\\", "/")
        with open(granules_path, "w") as f:
            json.dump(info_granules_metadata, f)

        return new_obj

    def read_metadata_tl(self):
        """
        Read metadata TILE to parse information about the acquisition and properties of GRANULE bands.

        It populates the following attributes:
            - mean_sza
            - mean_saa
            - mean_vza
            - mean_vaa
            - vaa
            - vza
            - saa
            - sza
            - anglesULXY
            - tileId
            - satId
            - procLevel
            - epsg_code
            - dimsByRes
            - ulxyByRes
            - tileAnglesNode
            - root_metadata_tl

        """
        if self.root_metadata_tl is not None:
            return

        self.root_metadata_tl = read_xml(self.metadata_tl)

        # Stoopid XML namespace prefix
        nsPrefix = self.root_metadata_tl.tag[: self.root_metadata_tl.tag.index("}") + 1]
        nsDict = {"n1": nsPrefix[1:-1]}

        self.mean_sza = float(
            self.root_metadata_tl.find(".//Mean_Sun_Angle/ZENITH_ANGLE").text
        )
        self.mean_saa = float(
            self.root_metadata_tl.find(".//Mean_Sun_Angle/AZIMUTH_ANGLE").text
        )

        generalInfoNode = self.root_metadata_tl.find("n1:General_Info", nsDict)
        # N.B. I am still not entirely convinced that this SENSING_TIME is really
        # the acquisition time, but the documentation is rubbish.
        sensingTimeNode = generalInfoNode.find("SENSING_TIME")
        sensingTimeStr = sensingTimeNode.text.strip()
        # self.datetime = datetime.datetime.strptime(sensingTimeStr, "%Y-%m-%dT%H:%M:%S.%fZ")
        tileIdNode = generalInfoNode.find("TILE_ID")
        tileIdFullStr = tileIdNode.text.strip()
        self.tileId = tileIdFullStr.split("_")[-2]
        self.satId = tileIdFullStr[:3]
        self.procLevel = tileIdFullStr[
            13:16
        ]  # Not sure whether to use absolute pos or split by '_'....

        geomInfoNode = self.root_metadata_tl.find("n1:Geometric_Info", nsDict)
        geocodingNode = geomInfoNode.find("Tile_Geocoding")
        self.epsg_code = geocodingNode.find("HORIZONTAL_CS_CODE").text

        # Dimensions of images at different resolutions.
        self.dimsByRes = {}
        sizeNodeList = geocodingNode.findall("Size")
        for sizeNode in sizeNodeList:
            res = sizeNode.attrib["resolution"]
            nrows = int(sizeNode.find("NROWS").text)
            ncols = int(sizeNode.find("NCOLS").text)
            self.dimsByRes[res] = (nrows, ncols)

        # Upper-left corners of images at different resolutions. As far as I can
        # work out, these coords appear to be the upper left corner of the upper left
        # pixel, i.e. equivalent to GDAL's convention. This also means that they
        # are the same for the different resolutions, which is nice.
        self.ulxyByRes = {}
        posNodeList = geocodingNode.findall("Geoposition")
        for posNode in posNodeList:
            res = posNode.attrib["resolution"]
            ulx = float(posNode.find("ULX").text)
            uly = float(posNode.find("ULY").text)
            self.ulxyByRes[res] = (ulx, uly)

        # Sun and satellite angles.
        # Zenith
        self.tileAnglesNode = geomInfoNode.find("Tile_Angles")
        sunZenithNode = self.tileAnglesNode.find("Sun_Angles_Grid").find("Zenith")
        # <Zenith>
        #  <COL_STEP unit="m">5000</COL_STEP>
        #  <ROW_STEP unit="m">5000</ROW_STEP>
        angleGridXres = float(sunZenithNode.find("COL_STEP").text)
        angleGridYres = float(sunZenithNode.find("ROW_STEP").text)
        sza = self._makeValueArray(sunZenithNode.find("Values_List"))
        mask_nans = np.isnan(sza)
        if np.any(mask_nans):
            from skimage.restoration import inpaint_biharmonic

            sza = inpaint_biharmonic(sza, mask_nans)
        transform_zenith = rasterio.transform.from_origin(
            self.ulxyByRes[str(self.out_res)][0],
            self.ulxyByRes[str(self.out_res)][1],
            angleGridXres,
            angleGridYres,
        )

        self.sza = GeoTensor(sza, transform=transform_zenith, crs=self.epsg_code)

        # Azimuth
        sunAzimuthNode = self.tileAnglesNode.find("Sun_Angles_Grid").find("Azimuth")
        angleGridXres = float(sunAzimuthNode.find("COL_STEP").text)
        angleGridYres = float(sunAzimuthNode.find("ROW_STEP").text)
        saa = self._makeValueArray(sunAzimuthNode.find("Values_List"))
        mask_nans = np.isnan(saa)
        if np.any(mask_nans):
            from skimage.restoration import inpaint_biharmonic

            saa = inpaint_biharmonic(saa, mask_nans)
        transform_azimuth = rasterio.transform.from_origin(
            self.ulxyByRes[str(self.out_res)][0],
            self.ulxyByRes[str(self.out_res)][1],
            angleGridXres,
            angleGridYres,
        )
        self.saa = GeoTensor(saa, transform=transform_azimuth, crs=self.epsg_code)

        # Now build up the viewing angle per grid cell, from the separate layers
        # given for each detector for each band. Initially I am going to keep
        # the bands separate, just to see how that looks.
        # The names of things in the XML suggest that these are view angles,
        # but the numbers suggest that they are angles as seen from the pixel's
        # frame of reference on the ground, i.e. they are in fact what we ultimately want.
        viewingAngleNodeList = self.tileAnglesNode.findall(
            "Viewing_Incidence_Angles_Grids"
        )
        vza = self._buildViewAngleArr(viewingAngleNodeList, "Zenith")
        vaa = self._buildViewAngleArr(viewingAngleNodeList, "Azimuth")

        self.vaa = {}
        for k, varr in vaa.items():
            mask_nans = np.isnan(varr)
            if np.any(mask_nans):
                from skimage.restoration import inpaint_biharmonic

                varr = inpaint_biharmonic(varr, mask_nans)

            self.vaa[k] = GeoTensor(
                varr, transform=transform_azimuth, crs=self.epsg_code
            )

        self.vza = {}
        for k, varr in vza.items():
            mask_nans = np.isnan(varr)
            if np.any(mask_nans):
                from skimage.restoration import inpaint_biharmonic

                varr = inpaint_biharmonic(varr, mask_nans)
            self.vza[k] = GeoTensor(
                varr, transform=transform_zenith, crs=self.epsg_code
            )

        # Make a guess at the coordinates of the angle grids. These are not given
        # explicitly in the XML, and don't line up exactly with the other grids, so I am
        # making a rough estimate. Because the angles don't change rapidly across these
        # distances, it is not important if I am a bit wrong (although it would be nice
        # to be exactly correct!).
        (ulx, uly) = self.ulxyByRes["10"]
        self.anglesULXY = (ulx - angleGridXres / 2.0, uly + angleGridYres / 2.0)

        # Read mean viewing angles for each band.
        self.mean_vaa = {}
        self.mean_vza = {}
        for elm in self.tileAnglesNode.find("Mean_Viewing_Incidence_Angle_List"):
            band_name = BANDS_S2[int(elm.attrib["bandId"])]
            viewing_zenith_angle = float(elm.find("ZENITH_ANGLE").text)
            viewing_azimuth_angle = float(elm.find("AZIMUTH_ANGLE").text)
            self.mean_vza[band_name] = viewing_zenith_angle
            self.mean_vaa[band_name] = viewing_azimuth_angle

    def _buildViewAngleArr(self, viewingAngleNodeList, angleName):
        """
        Build up the named viewing angle array from the various detector strips given as
        separate arrays. I don't really understand this, and may need to re-write it once
        I have worked it out......

        The angleName is one of 'Zenith' or 'Azimuth'.
        Returns a dictionary of 2-d arrays, keyed by the bandId string.
        """
        angleArrDict = {}
        for viewingAngleNode in viewingAngleNodeList:
            band_name = BANDS_S2[int(viewingAngleNode.attrib["bandId"])]
            detectorId = viewingAngleNode.attrib["detectorId"]

            angleNode = viewingAngleNode.find(angleName)
            angleArr = self._makeValueArray(angleNode.find("Values_List"))
            if band_name not in angleArrDict:
                angleArrDict[band_name] = angleArr
            else:
                mask = ~np.isnan(angleArr)
                angleArrDict[band_name][mask] = angleArr[mask]
        return angleArrDict

    @staticmethod
    def _makeValueArray(valuesListNode):
        """
        Take a <Values_List> node from the XML, and return an array of the values contained
        within it. This will be a 2-d numpy array of float32 values (should I pass the dtype in??)

        """
        valuesList = valuesListNode.findall("VALUES")
        vals = []
        for valNode in valuesList:
            text = valNode.text
            vals.append([np.float32(x) for x in text.strip().split()])

        return np.array(vals)

cache_product_to_local_dir(path_dest=None, print_progress=True, format_bands=None)

Overrides the parent method to copy the MTD_TL.xml file

Parameters:

Name Type Description Default
path_dest Optional[str]

path to the destination folder. Defaults to None.

None
print_progress bool

whether to print progress. Defaults to True.

True

Returns:

Name Type Description
__class__ __class__

the cached object

Source code in georeader/readers/S2_SAFE_reader.py
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
def cache_product_to_local_dir(
    self,
    path_dest: Optional[str] = None,
    print_progress: bool = True,
    format_bands: Optional[str] = None,
) -> "__class__":
    """
    Overrides the parent method to copy the MTD_TL.xml file

    Args:
        path_dest (Optional[str], optional): path to the destination folder. Defaults to None.
        print_progress (bool, optional): whether to print progress. Defaults to True.

    Returns:
        __class__: the cached object
    """
    new_obj = super().cache_product_to_local_dir(
        path_dest=path_dest,
        print_progress=print_progress,
        format_bands=format_bands,
    )

    if os.path.exists(new_obj.metadata_tl):
        # the cached product already exists. returns
        return new_obj

    if self.root_metadata_tl is not None:
        new_obj.root_metadata_tl = self.root_metadata_tl
        ET.ElementTree(new_obj.metadata_tl).write(new_obj.metadata_tl)
        # copy all metadata from the original image
        for atribute in [
            "tileId",
            "root_metadata_tl",
            "satId",
            "procLevel",
            "dimsByRes",
            "ulxyByRes",
            "tileAnglesNode",
            "mean_sza",
            "mean_saa",
            "mean_vza",
            "mean_vaa",
            "vaa",
            "vza",
            "saa",
            "sza",
            "anglesULXY",
        ]:
            if hasattr(self, atribute):
                setattr(new_obj, atribute, getattr(self, atribute))
    else:
        get_file(self.metadata_tl, new_obj.metadata_tl)

    granule_folder_rel = new_obj.granule_folder.replace("\\", "/").replace(
        new_obj.folder.replace("\\", "/") + "/", ""
    )
    # Add metadata_tl to granules.json
    granules_path = os.path.join(new_obj.folder, "granules.json").replace("\\", "/")
    with open(granules_path, "r") as fh:
        info_granules_metadata = json.load(fh)
    info_granules_metadata["metadata_tl"] = os.path.join(
        granule_folder_rel, "MTD_TL.xml"
    ).replace("\\", "/")
    with open(granules_path, "w") as f:
        json.dump(info_granules_metadata, f)

    return new_obj

read_metadata_tl()

Read metadata TILE to parse information about the acquisition and properties of GRANULE bands.

It populates the following attributes
  • mean_sza
  • mean_saa
  • mean_vza
  • mean_vaa
  • vaa
  • vza
  • saa
  • sza
  • anglesULXY
  • tileId
  • satId
  • procLevel
  • epsg_code
  • dimsByRes
  • ulxyByRes
  • tileAnglesNode
  • root_metadata_tl
Source code in georeader/readers/S2_SAFE_reader.py
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
def read_metadata_tl(self):
    """
    Read metadata TILE to parse information about the acquisition and properties of GRANULE bands.

    It populates the following attributes:
        - mean_sza
        - mean_saa
        - mean_vza
        - mean_vaa
        - vaa
        - vza
        - saa
        - sza
        - anglesULXY
        - tileId
        - satId
        - procLevel
        - epsg_code
        - dimsByRes
        - ulxyByRes
        - tileAnglesNode
        - root_metadata_tl

    """
    if self.root_metadata_tl is not None:
        return

    self.root_metadata_tl = read_xml(self.metadata_tl)

    # Stoopid XML namespace prefix
    nsPrefix = self.root_metadata_tl.tag[: self.root_metadata_tl.tag.index("}") + 1]
    nsDict = {"n1": nsPrefix[1:-1]}

    self.mean_sza = float(
        self.root_metadata_tl.find(".//Mean_Sun_Angle/ZENITH_ANGLE").text
    )
    self.mean_saa = float(
        self.root_metadata_tl.find(".//Mean_Sun_Angle/AZIMUTH_ANGLE").text
    )

    generalInfoNode = self.root_metadata_tl.find("n1:General_Info", nsDict)
    # N.B. I am still not entirely convinced that this SENSING_TIME is really
    # the acquisition time, but the documentation is rubbish.
    sensingTimeNode = generalInfoNode.find("SENSING_TIME")
    sensingTimeStr = sensingTimeNode.text.strip()
    # self.datetime = datetime.datetime.strptime(sensingTimeStr, "%Y-%m-%dT%H:%M:%S.%fZ")
    tileIdNode = generalInfoNode.find("TILE_ID")
    tileIdFullStr = tileIdNode.text.strip()
    self.tileId = tileIdFullStr.split("_")[-2]
    self.satId = tileIdFullStr[:3]
    self.procLevel = tileIdFullStr[
        13:16
    ]  # Not sure whether to use absolute pos or split by '_'....

    geomInfoNode = self.root_metadata_tl.find("n1:Geometric_Info", nsDict)
    geocodingNode = geomInfoNode.find("Tile_Geocoding")
    self.epsg_code = geocodingNode.find("HORIZONTAL_CS_CODE").text

    # Dimensions of images at different resolutions.
    self.dimsByRes = {}
    sizeNodeList = geocodingNode.findall("Size")
    for sizeNode in sizeNodeList:
        res = sizeNode.attrib["resolution"]
        nrows = int(sizeNode.find("NROWS").text)
        ncols = int(sizeNode.find("NCOLS").text)
        self.dimsByRes[res] = (nrows, ncols)

    # Upper-left corners of images at different resolutions. As far as I can
    # work out, these coords appear to be the upper left corner of the upper left
    # pixel, i.e. equivalent to GDAL's convention. This also means that they
    # are the same for the different resolutions, which is nice.
    self.ulxyByRes = {}
    posNodeList = geocodingNode.findall("Geoposition")
    for posNode in posNodeList:
        res = posNode.attrib["resolution"]
        ulx = float(posNode.find("ULX").text)
        uly = float(posNode.find("ULY").text)
        self.ulxyByRes[res] = (ulx, uly)

    # Sun and satellite angles.
    # Zenith
    self.tileAnglesNode = geomInfoNode.find("Tile_Angles")
    sunZenithNode = self.tileAnglesNode.find("Sun_Angles_Grid").find("Zenith")
    # <Zenith>
    #  <COL_STEP unit="m">5000</COL_STEP>
    #  <ROW_STEP unit="m">5000</ROW_STEP>
    angleGridXres = float(sunZenithNode.find("COL_STEP").text)
    angleGridYres = float(sunZenithNode.find("ROW_STEP").text)
    sza = self._makeValueArray(sunZenithNode.find("Values_List"))
    mask_nans = np.isnan(sza)
    if np.any(mask_nans):
        from skimage.restoration import inpaint_biharmonic

        sza = inpaint_biharmonic(sza, mask_nans)
    transform_zenith = rasterio.transform.from_origin(
        self.ulxyByRes[str(self.out_res)][0],
        self.ulxyByRes[str(self.out_res)][1],
        angleGridXres,
        angleGridYres,
    )

    self.sza = GeoTensor(sza, transform=transform_zenith, crs=self.epsg_code)

    # Azimuth
    sunAzimuthNode = self.tileAnglesNode.find("Sun_Angles_Grid").find("Azimuth")
    angleGridXres = float(sunAzimuthNode.find("COL_STEP").text)
    angleGridYres = float(sunAzimuthNode.find("ROW_STEP").text)
    saa = self._makeValueArray(sunAzimuthNode.find("Values_List"))
    mask_nans = np.isnan(saa)
    if np.any(mask_nans):
        from skimage.restoration import inpaint_biharmonic

        saa = inpaint_biharmonic(saa, mask_nans)
    transform_azimuth = rasterio.transform.from_origin(
        self.ulxyByRes[str(self.out_res)][0],
        self.ulxyByRes[str(self.out_res)][1],
        angleGridXres,
        angleGridYres,
    )
    self.saa = GeoTensor(saa, transform=transform_azimuth, crs=self.epsg_code)

    # Now build up the viewing angle per grid cell, from the separate layers
    # given for each detector for each band. Initially I am going to keep
    # the bands separate, just to see how that looks.
    # The names of things in the XML suggest that these are view angles,
    # but the numbers suggest that they are angles as seen from the pixel's
    # frame of reference on the ground, i.e. they are in fact what we ultimately want.
    viewingAngleNodeList = self.tileAnglesNode.findall(
        "Viewing_Incidence_Angles_Grids"
    )
    vza = self._buildViewAngleArr(viewingAngleNodeList, "Zenith")
    vaa = self._buildViewAngleArr(viewingAngleNodeList, "Azimuth")

    self.vaa = {}
    for k, varr in vaa.items():
        mask_nans = np.isnan(varr)
        if np.any(mask_nans):
            from skimage.restoration import inpaint_biharmonic

            varr = inpaint_biharmonic(varr, mask_nans)

        self.vaa[k] = GeoTensor(
            varr, transform=transform_azimuth, crs=self.epsg_code
        )

    self.vza = {}
    for k, varr in vza.items():
        mask_nans = np.isnan(varr)
        if np.any(mask_nans):
            from skimage.restoration import inpaint_biharmonic

            varr = inpaint_biharmonic(varr, mask_nans)
        self.vza[k] = GeoTensor(
            varr, transform=transform_zenith, crs=self.epsg_code
        )

    # Make a guess at the coordinates of the angle grids. These are not given
    # explicitly in the XML, and don't line up exactly with the other grids, so I am
    # making a rough estimate. Because the angles don't change rapidly across these
    # distances, it is not important if I am a bit wrong (although it would be nice
    # to be exactly correct!).
    (ulx, uly) = self.ulxyByRes["10"]
    self.anglesULXY = (ulx - angleGridXres / 2.0, uly + angleGridYres / 2.0)

    # Read mean viewing angles for each band.
    self.mean_vaa = {}
    self.mean_vza = {}
    for elm in self.tileAnglesNode.find("Mean_Viewing_Incidence_Angle_List"):
        band_name = BANDS_S2[int(elm.attrib["bandId"])]
        viewing_zenith_angle = float(elm.find("ZENITH_ANGLE").text)
        viewing_azimuth_angle = float(elm.find("AZIMUTH_ANGLE").text)
        self.mean_vza[band_name] = viewing_zenith_angle
        self.mean_vaa[band_name] = viewing_azimuth_angle

S2ImageL2A

Bases: S2Image

Sentinel-2 Level 2A (surface reflectance) image reader.

This class extends the base S2Image class to handle Sentinel-2 Level 2A products, which provide surface reflectance data with atmospheric corrections applied.

Parameters:

Name Type Description Default
s2folder str

Path to the Sentinel-2 SAFE product folder.

required
granules Dict[str, str]

Dictionary mapping band names to file paths.

required
polygon Polygon

Polygon defining the area of interest in EPSG:4326.

required
out_res int

Output resolution in meters. Must be one of 10, 20, or 60. Defaults to 10.

10
window_focus Optional[Window]

Window to focus on a specific region of the image. Defaults to None (entire image).

None
bands Optional[List[str]]

List of bands to read. If None, the default L2A bands (excluding B10) will be loaded.

None
metadata_msi Optional[str]

Path to metadata file. If None, it is assumed to be in the SAFE folder.

None

Attributes:

Name Type Description
mission str

Mission identifier (e.g., 'S2A', 'S2B').

producttype str

Product type identifier (e.g., 'MSIL2A').

pdgs str

PDGS Processing Baseline number.

relorbitnum str

Relative Orbit number.

tile_number_field str

Tile Number field.

product_discriminator str

Product Discriminator.

name str

Base name of the product.

folder str

Path to the product folder.

datetime datetime

Acquisition datetime.

metadata_msi str

Path to the MSI metadata file.

out_res int

Output resolution in meters.

bands List[str]

List of bands to read.

dims Tuple[str]

Names of the dimensions ("band", "y", "x").

fill_value_default int

Default fill value (typically 0).

band_check str

Band used as template for reading.

granule_readers Dict[str, RasterioReader]

Dictionary of readers for each band.

window_focus Window

Current window focus.

Examples:

>>> # Initialize the S2ImageL2A reader with a data path
>>> s2_l2a = S2ImageL2A('/path/to/S2A_MSIL2A_20170717T235959_N0205_R072_T01WCP_20170718T000256.SAFE',
...                     granules=granules_dict, polygon=aoi_polygon)
>>> # Load all bands
>>> l2a_data = s2_l2a.load()
Source code in georeader/readers/S2_SAFE_reader.py
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
class S2ImageL2A(S2Image):
    """
    Sentinel-2 Level 2A (surface reflectance) image reader.

    This class extends the base S2Image class to handle Sentinel-2 Level 2A products,
    which provide surface reflectance data with atmospheric corrections applied.

    Args:
        s2folder (str): Path to the Sentinel-2 SAFE product folder.
        granules (Dict[str, str]): Dictionary mapping band names to file paths.
        polygon (Polygon): Polygon defining the area of interest in EPSG:4326.
        out_res (int): Output resolution in meters. Must be one of 10, 20, or 60. Defaults to 10.
        window_focus (Optional[rasterio.windows.Window]): Window to focus on a specific
            region of the image. Defaults to None (entire image).
        bands (Optional[List[str]]): List of bands to read. If None, the default L2A bands
            (excluding B10) will be loaded.
        metadata_msi (Optional[str]): Path to metadata file. If None, it is assumed to be
            in the SAFE folder.

    Attributes:
        mission (str): Mission identifier (e.g., 'S2A', 'S2B').
        producttype (str): Product type identifier (e.g., 'MSIL2A').
        pdgs (str): PDGS Processing Baseline number.
        relorbitnum (str): Relative Orbit number.
        tile_number_field (str): Tile Number field.
        product_discriminator (str): Product Discriminator.
        name (str): Base name of the product.
        folder (str): Path to the product folder.
        datetime (datetime): Acquisition datetime.
        metadata_msi (str): Path to the MSI metadata file.
        out_res (int): Output resolution in meters.
        bands (List[str]): List of bands to read.
        dims (Tuple[str]): Names of the dimensions ("band", "y", "x").
        fill_value_default (int): Default fill value (typically 0).
        band_check (str): Band used as template for reading.
        granule_readers (Dict[str, RasterioReader]): Dictionary of readers for each band.
        window_focus (rasterio.windows.Window): Current window focus.

    Examples:
        >>> # Initialize the S2ImageL2A reader with a data path
        >>> s2_l2a = S2ImageL2A('/path/to/S2A_MSIL2A_20170717T235959_N0205_R072_T01WCP_20170718T000256.SAFE',
        ...                     granules=granules_dict, polygon=aoi_polygon)
        >>> # Load all bands
        >>> l2a_data = s2_l2a.load()
    """

    def __init__(
        self,
        s2folder: str,
        granules: Dict[str, str],
        polygon: Polygon,
        out_res: int = 10,
        window_focus: Optional[rasterio.windows.Window] = None,
        bands: Optional[List[str]] = None,
        metadata_msi: Optional[str] = None,
    ):
        if bands is None:
            bands = BANDS_S2_L2A

        super(S2ImageL2A, self).__init__(
            s2folder=s2folder,
            granules=granules,
            polygon=polygon,
            out_res=out_res,
            bands=bands,
            window_focus=window_focus,
            metadata_msi=metadata_msi,
        )

        assert (
            self.producttype == "MSIL2A"
        ), f"Unexpected product type {self.producttype} in image {self.folder}"

s2loader(s2folder, out_res=10, bands=None, window_focus=None, granules=None, polygon=None, metadata_msi=None)

Loads a S2ImageL2A or S2ImageL1C depending on the product type

Parameters:

Name Type Description Default
s2folder str

.SAFE folder. Expected standard ESA naming convention (see s2_name_split fun)

required
out_res int

default output resolution {10, 20, 60}

10
bands Optional[List[str]]

Bands to read. Default to BANDS_S2 or BANDS_S2_L2A depending on the product type

None
window_focus Optional[Window]

window to read when creating the object

None
granules Optional[Dict[str, str]]

Dict where keys are the band names and values are paths to the band location

None
polygon Optional[Polygon]

polygon with the footprint of the object

None
metadata_msi Optional[str]

path to metadata file

None

Returns:

Type Description
Union[S2ImageL2A, S2ImageL1C]

S2Image reader

Source code in georeader/readers/S2_SAFE_reader.py
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
def s2loader(
    s2folder: str,
    out_res: int = 10,
    bands: Optional[List[str]] = None,
    window_focus: Optional[rasterio.windows.Window] = None,
    granules: Optional[Dict[str, str]] = None,
    polygon: Optional[Polygon] = None,
    metadata_msi: Optional[str] = None,
) -> Union[S2ImageL2A, S2ImageL1C]:
    """
    Loads a S2ImageL2A or S2ImageL1C depending on the product type

    Args:
        s2folder: .SAFE folder. Expected standard ESA naming convention (see s2_name_split fun)
        out_res: default output resolution {10, 20, 60}
        bands: Bands to read. Default to BANDS_S2 or BANDS_S2_L2A depending on the product type
        window_focus: window to read when creating the object
        granules: Dict where keys are the band names and values are paths to the band location
        polygon: polygon with the footprint of the object
        metadata_msi: path to metadata file

    Returns:
        S2Image reader
    """

    _, producttype_nos2, _, _, _, _, _ = s2_name_split(s2folder)

    if producttype_nos2 == "MSIL2A":
        return S2ImageL2A(
            s2folder,
            granules=granules,
            polygon=polygon,
            out_res=out_res,
            bands=bands,
            window_focus=window_focus,
            metadata_msi=metadata_msi,
        )
    elif producttype_nos2 == "MSIL1C":
        return S2ImageL1C(
            s2folder,
            granules=granules,
            polygon=polygon,
            out_res=out_res,
            bands=bands,
            window_focus=window_focus,
            metadata_msi=metadata_msi,
        )

    raise NotImplementedError(f"Don't know how to load {producttype_nos2} products")

s2_public_bucket_path(s2file, check_exists=False, mode='gcp')

Returns the expected patch in the public bucket of the S2 file

Parameters:

Name Type Description Default
s2file str

safe file (e.g. S2B_MSIL1C_20220527T030539_N0400_R075_T49SGV_20220527T051042.SAFE)

required
check_exists bool

check if the file exists in the bucket, This will not work if GOOGLE_APPLICATION_CREDENTIALS and/or GS_USER_PROJECT env variables are not set. Default to False

False
mode str

"gcp" or "rest"

'gcp'

Returns:

Type Description
str

full path to the file (e.g. gs://gcp-public-data-sentinel-2/tiles/49/S/GV/S2B_MSIL1C_20220527T030539_N0400_R075_T49SGV_20220527T051042.SAFE)

Source code in georeader/readers/S2_SAFE_reader.py
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
def s2_public_bucket_path(
    s2file: str, check_exists: bool = False, mode: str = "gcp"
) -> str:
    """
    Returns the expected patch in the public bucket of the S2 file

    Args:
        s2file: safe file (e.g.  S2B_MSIL1C_20220527T030539_N0400_R075_T49SGV_20220527T051042.SAFE)
        check_exists: check if the file exists in the bucket, This will not work if GOOGLE_APPLICATION_CREDENTIALS and/or GS_USER_PROJECT
            env variables are not set. Default to False
        mode: "gcp" or "rest"

    Returns:
        full path to the file (e.g. gs://gcp-public-data-sentinel-2/tiles/49/S/GV/S2B_MSIL1C_20220527T030539_N0400_R075_T49SGV_20220527T051042.SAFE)
    """
    (
        mission,
        producttype,
        sensing_date_str,
        pdgs,
        relorbitnum,
        tile_number_field,
        product_discriminator,
    ) = s2_name_split(s2file)
    s2file = s2file[:-1] if s2file.endswith("/") else s2file

    if not s2file.endswith(".SAFE"):
        s2file += ".SAFE"

    basename = os.path.basename(s2file)
    if mode == "gcp":
        s2folder = f"{FULL_PATH_PUBLIC_BUCKET_SENTINEL_2}tiles/{tile_number_field[:2]}/{tile_number_field[2]}/{tile_number_field[3:]}/{basename}"
    elif mode == "rest":
        s2folder = f"https://storage.googleapis.com/gcp-public-data-sentinel-2/tiles/{tile_number_field[:2]}/{tile_number_field[2]}/{tile_number_field[3:]}/{basename}"
    else:
        raise NotImplementedError(f"Mode {mode} unknown")

    if check_exists and (mode == "gcp"):
        fs = get_filesystem(s2folder)

        if not fs.exists(s2folder):
            raise FileNotFoundError(f"Sentinel-2 file not found in {s2folder}")

    return s2folder

read_srf(satellite, srf_file=SRF_FILE_DEFAULT, cache=True)

Process the spectral response function file. If the file is not provided it downloads it from https://sentinel.esa.int/web/sentinel/user-guides/sentinel-2-msi/document-library/-/asset_publisher/Wk0TKajiISaR/content/sentinel-2a-spectral-responses

This function requires the fsspec package and pandas and openpyxl for reading excel files.

Parameters:

Name Type Description Default
satellite str

satellite name (S2A, S2B or S2C)

required
srf_file str

path to the srf file

SRF_FILE_DEFAULT
cache bool

if True, the srf is cached for future calls. Default True

True

Returns:

Type Description
DataFrame

pd.DataFrame: spectral response function for each of the bands of S2

Source code in georeader/readers/S2_SAFE_reader.py
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
def read_srf(
    satellite: str, srf_file: str = SRF_FILE_DEFAULT, cache: bool = True
) -> pd.DataFrame:
    """
    Process the spectral response function file. If the file is not provided
    it downloads it from https://sentinel.esa.int/web/sentinel/user-guides/sentinel-2-msi/document-library/-/asset_publisher/Wk0TKajiISaR/content/sentinel-2a-spectral-responses

    This function requires the fsspec package and pandas and openpyxl for reading excel files.

    Args:
        satellite (str): satellite name (S2A, S2B or S2C)
        srf_file (str): path to the srf file
        cache (bool): if True, the srf is cached for future calls. Default True

    Returns:
        pd.DataFrame: spectral response function for each of the bands of S2
    """
    assert satellite in ["S2A", "S2B", "S2C"], "satellite must be S2A or S2B"

    if cache:
        global SRF_S2
        if satellite in SRF_S2:
            return SRF_S2[satellite]

    if srf_file == SRF_FILE_DEFAULT:
        # home_dir = os.path.join(os.path.expanduser('~'),".georeader")
        home_dir = os.path.join(os.path.expanduser("~"), ".georeader")
        os.makedirs(home_dir, exist_ok=True)
        srf_filename = os.path.basename(srf_file)

        # Decode the url to get the filename. Also, replace spaces with underscores
        import urllib.parse

        srf_filename = urllib.parse.unquote(srf_filename).replace(" ", "_")

        srf_file_local = os.path.join(home_dir, srf_filename)
        if not os.path.exists(srf_file_local):
            import fsspec

            with fsspec.open(srf_file, "rb") as f:
                with open(srf_file_local, "wb") as f2:
                    f2.write(f.read())
        srf_file = srf_file_local

    srf_s2 = pd.read_excel(srf_file, sheet_name=f"Spectral Responses ({satellite})")

    srf_s2 = srf_s2.set_index("SR_WL")

    # remove rows with all values zero
    any_not_cero = np.any((srf_s2 > 1e-6).values, axis=1)
    srf_s2 = srf_s2.loc[any_not_cero]

    # remove the satellite name from the columns
    srf_s2.columns = [c.replace(f"{satellite}_SR_AV_", "") for c in srf_s2.columns]
    srf_s2.columns = normalize_band_names(srf_s2.columns)

    if cache:
        SRF_S2[satellite] = srf_s2

    return srf_s2

Proba-V Reader

The Proba-V reader enables access to Proba-V Level 2A and Level 3 products. It handles:

  • Reading TOA reflectance from HDF5 files
  • Mask handling for clouds, shadows, and invalid pixels
  • Extraction of metadata and acquisition parameters

Tutorial example:

API Reference

Proba-V reader

Unnoficial Proba-V reader. This reader is based in the Proba-V user manual: https://publications.vito.be/2017-1333-probav-products-user-manual.pdf

Author: Gonzalo Mateo-García

ProbaV

Proba-V reader for handling Proba-V satellite products.

This class provides functionality to read and manipulate Proba-V satellite imagery products. It handles the specific format and metadata of Proba-V HDF5 files, supporting operations like loading radiometry data, masks, and cloud information.

Parameters:

Name Type Description Default
hdf5_file str

Path to the HDF5 file containing the Proba-V product.

required
window Optional[Window]

Optional window to focus on a specific region of the image. Defaults to None (entire image).

None
level_name str

Processing level of the product, either "LEVEL2A" or "LEVEL3". Defaults to "LEVEL3".

'LEVEL3'

Attributes:

Name Type Description
hdf5_file str

Path to the HDF5 file.

name str

Basename of the HDF5 file.

camera str

Camera ID (for LEVEL2A products).

res_name str

Resolution name identifier (e.g., '100M', '300M', '1KM').

version str

Product version.

toatoc str

Indicator of whether data is TOA (top of atmosphere) or TOC (top of canopy).

real_transform Affine

Affine transform for the full image.

real_shape Tuple[int, int]

Shape of the full image (height, width).

dtype_radiometry

Data type for radiometry data (typically np.float32).

dtype_sm

Data type for SM (status map) data.

metadata Dict[str, Any]

Dictionary with product metadata.

window_focus Window

Current window focus.

window_data Window

Window representing the full data extent.

start_date datetime

Start acquisition date and time.

end_date datetime

End acquisition date and time.

map_projection_wkt str

WKT representation of the map projection.

crs

Coordinate reference system.

level_name str

Processing level identifier.

Examples:

>>> import rasterio.windows
>>> # Initialize the ProbaV reader with a data path
>>> probav_reader = ProbaV('/path/to/probav_product.HDF5')
>>> # Load radiometry data
>>> bands = probav_reader.load_radiometry()
>>> # Get cloud mask
>>> cloud_mask = probav_reader.load_sm_cloud_mask()
>>> # Focus on a specific window
>>> window = rasterio.windows.Window(col_off=100, row_off=100, width=200, height=200)
>>> probav_reader.set_window(window)
Source code in georeader/readers/probav_image_operational.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
class ProbaV:
    """
    Proba-V reader for handling Proba-V satellite products.

    This class provides functionality to read and manipulate Proba-V satellite imagery products.
    It handles the specific format and metadata of Proba-V HDF5 files, supporting operations
    like loading radiometry data, masks, and cloud information.

    Args:
        hdf5_file (str): Path to the HDF5 file containing the Proba-V product.
        window (Optional[rasterio.windows.Window]): Optional window to focus on a specific
            region of the image. Defaults to None (entire image).
        level_name (str): Processing level of the product, either "LEVEL2A" or "LEVEL3".
            Defaults to "LEVEL3".

    Attributes:
        hdf5_file (str): Path to the HDF5 file.
        name (str): Basename of the HDF5 file.
        camera (str): Camera ID (for LEVEL2A products).
        res_name (str): Resolution name identifier (e.g., '100M', '300M', '1KM').
        version (str): Product version.
        toatoc (str): Indicator of whether data is TOA (top of atmosphere) or TOC (top of canopy).
        real_transform (rasterio.Affine): Affine transform for the full image.
        real_shape (Tuple[int, int]): Shape of the full image (height, width).
        dtype_radiometry: Data type for radiometry data (typically np.float32).
        dtype_sm: Data type for SM (status map) data.
        metadata (Dict[str, Any]): Dictionary with product metadata.
        window_focus (rasterio.windows.Window): Current window focus.
        window_data (rasterio.windows.Window): Window representing the full data extent.
        start_date (datetime): Start acquisition date and time.
        end_date (datetime): End acquisition date and time.
        map_projection_wkt (str): WKT representation of the map projection.
        crs: Coordinate reference system.
        level_name (str): Processing level identifier.

    Examples:
        >>> import rasterio.windows
        >>> # Initialize the ProbaV reader with a data path
        >>> probav_reader = ProbaV('/path/to/probav_product.HDF5')
        >>> # Load radiometry data
        >>> bands = probav_reader.load_radiometry()
        >>> # Get cloud mask
        >>> cloud_mask = probav_reader.load_sm_cloud_mask()
        >>> # Focus on a specific window
        >>> window = rasterio.windows.Window(col_off=100, row_off=100, width=200, height=200)
        >>> probav_reader.set_window(window)
    """

    def __init__(
        self,
        hdf5_file: str,
        window: Optional[rasterio.windows.Window] = None,
        level_name: str = "LEVEL3",
    ):
        self.hdf5_file = hdf5_file
        self.name = os.path.basename(self.hdf5_file)
        if level_name == "LEVEL2A":
            matches = re.match(
                r"PROBAV_L2A_\d{8}_\d{6}_(\d)_(\d..?M)_(V\d0\d)", self.name
            )
            if matches is not None:
                self.camera, self.res_name, self.version = matches.groups()
            self.toatoc = "TOA"
        elif level_name == "LEVEL3":
            matches = re.match(
                r"PROBAV_S1_(TO.)_.{6}_\d{8}_(\d..?M)_(V\d0\d)", self.name
            )
            if matches is not None:
                self.toatoc, self.res_name, self.version = matches.groups()
        else:
            raise NotImplementedError(f"Unknown level name {level_name}")

        try:
            with h5py.File(self.hdf5_file, "r") as input_f:
                # reference metadata: http://www.vito-eodata.be/PDF/image/PROBAV-Products_User_Manual.pdf
                valores_blue = (
                    input_f[f"{level_name}/RADIOMETRY/BLUE/{self.toatoc}"]
                    .attrs["MAPPING"][3:7]
                    .astype(np.float64)
                )
                self.real_transform = Affine(
                    a=valores_blue[2],
                    b=0,
                    c=valores_blue[0],
                    d=0,
                    e=-valores_blue[3],
                    f=valores_blue[1],
                )
                self.real_shape = input_f[
                    f"{level_name}/RADIOMETRY/BLUE/{self.toatoc}"
                ].shape
                # self.dtype_radiometry = input_f[f"{level_name}/RADIOMETRY/RED/{self.toatoc}"].dtype

                # Set to float because we're converting the image to TOA when reading (see read_radiometry function)
                self.dtype_radiometry = np.float32
                self.dtype_sm = input_f[f"{level_name}/QUALITY/SM"].dtype
                self.metadata = dict(input_f.attrs)
        except OSError as e:
            raise FileNotFoundError("Error opening file %s" % self.hdf5_file)

        if window is None:
            self.window_focus = rasterio.windows.Window(
                row_off=0,
                col_off=0,
                width=self.real_shape[1],
                height=self.real_shape[0],
            )
        else:
            self.window_focus = rasterio.windows.Window(
                row_off=0,
                col_off=0,
                width=self.real_shape[1],
                height=self.real_shape[0],
            )

        self.window_data = rasterio.windows.Window(
            row_off=0, col_off=0, width=self.real_shape[1], height=self.real_shape[0]
        )

        if "OBSERVATION_END_DATE" in self.metadata:
            self.end_date = datetime.strptime(
                " ".join(
                    self.metadata["OBSERVATION_END_DATE"].astype(str).tolist()
                    + self.metadata["OBSERVATION_END_TIME"].astype(str).tolist()
                ),
                "%Y-%m-%d %H:%M:%S",
            ).replace(tzinfo=timezone.utc)
            self.start_date = datetime.strptime(
                " ".join(
                    self.metadata["OBSERVATION_START_DATE"].astype(str).tolist()
                    + self.metadata["OBSERVATION_START_TIME"].astype(str).tolist()
                ),
                "%Y-%m-%d %H:%M:%S",
            ).replace(tzinfo=timezone.utc)
            self.map_projection_wkt = " ".join(
                self.metadata["MAP_PROJECTION_WKT"].astype(str).tolist()
            )

        # Proba-V images are lat/long
        self.crs = rasterio.crs.CRS({"init": "epsg:4326"})

        # Proba-V images have four bands
        self.level_name = level_name

    def _get_window_pad(
        self, boundless: bool = True
    ) -> Tuple[rasterio.windows.Window, Optional[List]]:
        window_read = rasterio.windows.intersection(self.window_focus, self.window_data)

        if boundless:
            _, pad_width = window_utils.get_slice_pad(
                self.window_data, self.window_focus
            )
            need_pad = any(p != 0 for p in pad_width["x"] + pad_width["y"])
            if need_pad:
                pad_list_np = []
                for k in ["y", "x"]:
                    if k in pad_width:
                        pad_list_np.append(pad_width[k])
                    else:
                        pad_list_np.append((0, 0))
            else:
                pad_list_np = None
        else:
            pad_list_np = None

        return window_read, pad_list_np

    def footprint(self, crs: Optional[str] = None) -> Polygon:
        # TODO load footprint from metadata?
        pol = window_utils.window_polygon(self.window_focus, self.transform)
        if (crs is None) or window_utils.compare_crs(self.crs, crs):
            return pol

        return window_utils.polygon_to_crs(pol, self.crs, crs)

    def valid_footprint(self, crs: Optional[str] = None) -> Polygon:
        valids = self.load_mask()
        return valids.valid_footprint(crs=crs)

    def _load_bands(
        self,
        bands_names: Union[List[str], str],
        boundless: bool = True,
        fill_value_default: Number = 0,
    ) -> geotensor.GeoTensor:
        window_read, pad_list_np = self._get_window_pad(boundless=boundless)
        slice_ = window_read.toslices()
        if isinstance(bands_names, str):
            bands_names = [bands_names]
            flatten = True
        else:
            flatten = False

        with h5py.File(self.hdf5_file, "r") as input_f:
            bands_arrs = []
            for band in bands_names:
                data = read_band_toa(input_f, band, slice_)
                if pad_list_np is not None:
                    data = np.pad(
                        data,
                        tuple(pad_list_np),
                        mode="constant",
                        constant_values=fill_value_default,
                    )

                bands_arrs.append(data)

        if boundless:
            transform = self.transform
        else:
            transform = rasterio.windows.transform(window_read, self.real_transform)

        if flatten:
            img = bands_arrs[0]
        else:
            img = np.stack(bands_arrs, axis=0)

        return geotensor.GeoTensor(
            img,
            transform=transform,
            crs=self.crs,
            fill_value_default=fill_value_default,
        )

    def save_bands(self, img: np.ndarray):
        """

        Args:
            img: (4, self.real_height, self.real_width, 4) tensor

        Returns:

        """
        assert (
            img.shape[0] == 4
        ), "Unexpected number of channels expected 4 found {}".format(img.shape)
        assert (
            img.shape[1:] == self.real_shape
        ), f"Unexpected shape expected {self.real_shape} found {img.shape[1:]}"

        # TODO save only window_focus?

        with h5py.File(self.hdf5_file, "r+") as input_f:
            for i, b in enumerate(BAND_NAMES):
                band_to_save = img[i]
                mask_band_2_save = np.ma.getmaskarray(img[i])
                band_to_save = np.clip(np.ma.filled(band_to_save, 0), 0, 2)
                band_name = f"{self.level_name}/RADIOMETRY/{b}/{self.toatoc}"
                attrs = input_f[band_name].attrs
                band_to_save *= attrs["SCALE"]
                band_to_save += attrs["OFFSET"]
                band_to_save = np.round(band_to_save).astype(np.int16)
                band_to_save[mask_band_2_save] = -1
                input_f[band_name][...] = band_to_save

    def load_radiometry(
        self, indexes: Optional[List[int]] = None, boundless: bool = True
    ) -> geotensor.GeoTensor:
        if indexes is None:
            indexes = (0, 1, 2, 3)
        bands_names = [
            f"{self.level_name}/RADIOMETRY/{BAND_NAMES[i]}/{self.toatoc}"
            for i in indexes
        ]
        return self._load_bands(
            bands_names, boundless=boundless, fill_value_default=-1 / 2000.0
        )

    def load_sm(self, boundless: bool = True) -> geotensor.GeoTensor:
        """
        Reference of values in `SM` flags.

        From [user manual](http://www.vito-eodata.be/PDF/image/PROBAV-Products_User_Manual.pdf) pag 67
        * Clear  ->    000
        * Shadow ->    001
        * Undefined -> 010
        * Cloud  ->    011
        * Ice    ->    100
        * `2**3` sea/land
        * `2**4` quality swir (0 bad 1 good)
        * `2**5` quality nir
        * `2**6` quality red
        * `2**7` quality blue
        * `2**8` coverage swir (0 no 1 yes)
        * `2**9` coverage nir
        * `2**10` coverage red
        * `2**11` coverage blue
        """
        return self._load_bands(
            f"{self.level_name}/QUALITY/SM", boundless=boundless, fill_value_default=0
        )

    def load_mask(self, boundless: bool = True) -> geotensor.GeoTensor:
        """
        Returns the valid mask (False if the pixel is out of swath or is invalid). This function loads the SM band

        Args:
            boundless (bool, optional): boundless option to load the SM band. Defaults to True.

        Returns:
            geotensor.GeoTensor: mask with the same shape as the image
        """
        valids = self.load_sm(boundless=boundless)
        valids.values = ~mask_only_sm(valids.values)
        valids.fill_value_default = False
        return valids

    def load_sm_cloud_mask(
        self, mask_undefined: bool = False, boundless: bool = True
    ) -> geotensor.GeoTensor:
        sm = self.load_sm(boundless=boundless)
        cloud_mask = sm_cloud_mask(sm.values, mask_undefined=mask_undefined)
        return geotensor.GeoTensor(
            cloud_mask, transform=self.transform, crs=self.crs, fill_value_default=0
        )

    def is_recompressed_and_chunked(self) -> bool:
        original_bands = [
            f"{self.level_name}/RADIOMETRY/{b}/{self.toatoc}" for b in BAND_NAMES
        ]
        original_bands.append(f"{self.level_name}/QUALITY/SM")
        with h5py.File(self.hdf5_file, "r") as input_:
            for b in original_bands:
                if input_[b].compression == "szip":
                    return False
                if (input_[b].chunks is None) or (input_[b].chunks[0] == 1):
                    return False
        return True

    def assert_can_be_read(self):
        original_bands = [
            f"{self.level_name}/RADIOMETRY/{b}/{self.toatoc}" for b in BAND_NAMES
        ] + [f"{self.level_name}/QUALITY/SM"]
        with h5py.File(self.hdf5_file, "a") as input_:
            for name in original_bands:
                assert is_compression_available(
                    input_[name]
                ), f"Band {name} cannot be read. Compression: {input_[name].compression}"

    def recompress_bands(
        self,
        chunks: Tuple[int, int] = (512, 512),
        replace: bool = True,
        compression_dest: str = "gzip",
    ):
        original_bands = {
            b: f"{self.level_name}/RADIOMETRY/{b}/{self.toatoc}" for b in BAND_NAMES
        }
        original_bands.update({"SM": f"{self.level_name}/QUALITY/SM"})
        copy_bands = {k: v + "_NEW" for (k, v) in original_bands.items()}
        with h5py.File(self.hdf5_file, "a") as input_:
            for b in original_bands.keys():
                assert_compression_available(input_[original_bands[b]])
                data = input_[original_bands[b]][:]
                if copy_bands[b] in input_:
                    del input_[copy_bands[b]]

                ds = input_.create_dataset(
                    copy_bands[b],
                    data=data,
                    chunks=chunks,
                    compression=compression_dest,
                )

                attrs_copy = input_[original_bands[b]].attrs
                for k, v in attrs_copy.items():
                    ds.attrs[k] = v

                if replace:
                    del input_[original_bands[b]]
                    input_[original_bands[b]] = input_[copy_bands[b]]
                    del input_[copy_bands[b]]

    @property
    def transform(self) -> Affine:
        return rasterio.windows.transform(self.window_focus, self.real_transform)

    @property
    def res(self) -> Tuple[float, float]:
        return window_utils.res(self.transform)

    @property
    def height(self) -> int:
        return self.window_focus.height

    @property
    def width(self) -> int:
        return self.window_focus.width

    @property
    def bounds(self) -> Tuple[float, float, float, float]:
        return window_utils.window_bounds(self.window_focus, self.real_transform)

    def set_window(
        self,
        window: rasterio.windows.Window,
        relative: bool = True,
        boundless: bool = True,
    ):
        if relative:
            self.window_focus = rasterio.windows.Window(
                col_off=window.col_off + self.window_focus.col_off,
                row_off=window.row_off + self.window_focus.row_off,
                height=window.height,
                width=window.width,
            )
        else:
            self.window_focus = window

        if not boundless:
            self.window_focus = rasterio.windows.intersection(
                self.window_data, self.window_focus
            )

    def __copy__(self) -> "__class__":
        return ProbaV(
            self.hdf5_file, window=self.window_focus, level_name=self.level_name
        )

    def read_from_window(
        self, window: Optional[rasterio.windows.Window] = None, boundless: bool = True
    ) -> "__class__":
        copy = self.__copy__()
        copy.set_window(window=window, boundless=boundless)

        return copy

    def __repr__(self) -> str:
        return f""" 
         File: {self.hdf5_file}
         Transform: {self.transform}
         Shape: {self.height}, {self.width}
         Resolution: {self.res}
         Bounds: {self.bounds}
         CRS: {self.crs}
         Level: {self.level_name}
         TOA/TOC: {self.toatoc}
         Resolution name : {self.res_name}
        """

load_mask(boundless=True)

Returns the valid mask (False if the pixel is out of swath or is invalid). This function loads the SM band

Parameters:

Name Type Description Default
boundless bool

boundless option to load the SM band. Defaults to True.

True

Returns:

Type Description
GeoTensor

geotensor.GeoTensor: mask with the same shape as the image

Source code in georeader/readers/probav_image_operational.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def load_mask(self, boundless: bool = True) -> geotensor.GeoTensor:
    """
    Returns the valid mask (False if the pixel is out of swath or is invalid). This function loads the SM band

    Args:
        boundless (bool, optional): boundless option to load the SM band. Defaults to True.

    Returns:
        geotensor.GeoTensor: mask with the same shape as the image
    """
    valids = self.load_sm(boundless=boundless)
    valids.values = ~mask_only_sm(valids.values)
    valids.fill_value_default = False
    return valids

load_sm(boundless=True)

Reference of values in SM flags.

From user manual pag 67 * Clear -> 000 * Shadow -> 001 * Undefined -> 010 * Cloud -> 011 * Ice -> 100 * 2**3 sea/land * 2**4 quality swir (0 bad 1 good) * 2**5 quality nir * 2**6 quality red * 2**7 quality blue * 2**8 coverage swir (0 no 1 yes) * 2**9 coverage nir * 2**10 coverage red * 2**11 coverage blue

Source code in georeader/readers/probav_image_operational.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def load_sm(self, boundless: bool = True) -> geotensor.GeoTensor:
    """
    Reference of values in `SM` flags.

    From [user manual](http://www.vito-eodata.be/PDF/image/PROBAV-Products_User_Manual.pdf) pag 67
    * Clear  ->    000
    * Shadow ->    001
    * Undefined -> 010
    * Cloud  ->    011
    * Ice    ->    100
    * `2**3` sea/land
    * `2**4` quality swir (0 bad 1 good)
    * `2**5` quality nir
    * `2**6` quality red
    * `2**7` quality blue
    * `2**8` coverage swir (0 no 1 yes)
    * `2**9` coverage nir
    * `2**10` coverage red
    * `2**11` coverage blue
    """
    return self._load_bands(
        f"{self.level_name}/QUALITY/SM", boundless=boundless, fill_value_default=0
    )

save_bands(img)

Parameters:

Name Type Description Default
img ndarray

(4, self.real_height, self.real_width, 4) tensor

required

Returns:

Source code in georeader/readers/probav_image_operational.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def save_bands(self, img: np.ndarray):
    """

    Args:
        img: (4, self.real_height, self.real_width, 4) tensor

    Returns:

    """
    assert (
        img.shape[0] == 4
    ), "Unexpected number of channels expected 4 found {}".format(img.shape)
    assert (
        img.shape[1:] == self.real_shape
    ), f"Unexpected shape expected {self.real_shape} found {img.shape[1:]}"

    # TODO save only window_focus?

    with h5py.File(self.hdf5_file, "r+") as input_f:
        for i, b in enumerate(BAND_NAMES):
            band_to_save = img[i]
            mask_band_2_save = np.ma.getmaskarray(img[i])
            band_to_save = np.clip(np.ma.filled(band_to_save, 0), 0, 2)
            band_name = f"{self.level_name}/RADIOMETRY/{b}/{self.toatoc}"
            attrs = input_f[band_name].attrs
            band_to_save *= attrs["SCALE"]
            band_to_save += attrs["OFFSET"]
            band_to_save = np.round(band_to_save).astype(np.int16)
            band_to_save[mask_band_2_save] = -1
            input_f[band_name][...] = band_to_save

ProbaVRadiometry

Bases: ProbaV

A specialized ProbaV reader class focused on radiometry data.

This class extends the base ProbaV class to provide a simplified interface for working with radiometry bands from Proba-V products.

Parameters:

Name Type Description Default
hdf5_file str

Path to the HDF5 file containing the Proba-V product.

required
window Optional[Window]

Optional window to focus on a specific region of the image. Defaults to None (entire image).

None
level_name str

Processing level of the product. Defaults to "LEVEL2A".

'LEVEL2A'
indexes Optional[List[int]]

Optional list of band indices to load. If None, all four bands (0=BLUE, 1=RED, 2=NIR, 3=SWIR) will be loaded. Defaults to None.

None

Attributes:

Name Type Description
dims Tuple[str]

Names of the dimensions ("band", "y", "x").

indexes List[int]

List of band indices to load.

dtype

Data type of the radiometry data.

count int

Number of bands to be loaded.

shape Tuple[int, int, int]

Shape of the data (bands, height, width).

values ndarray

The radiometry data values.

Examples:

>>> # Initialize the ProbaVRadiometry reader with a data path
>>> probav_rad = ProbaVRadiometry('/path/to/probav_product.HDF5')
>>> # Load only RED and NIR bands
>>> probav_rad_rn = ProbaVRadiometry('/path/to/probav_product.HDF5', indexes=[1, 2])
>>> # Get the data as a GeoTensor
>>> geotensor_data = probav_rad.load()
Source code in georeader/readers/probav_image_operational.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
class ProbaVRadiometry(ProbaV):
    """
    A specialized ProbaV reader class focused on radiometry data.

    This class extends the base ProbaV class to provide a simplified interface
    for working with radiometry bands from Proba-V products.

    Args:
        hdf5_file (str): Path to the HDF5 file containing the Proba-V product.
        window (Optional[rasterio.windows.Window]): Optional window to focus on a specific
            region of the image. Defaults to None (entire image).
        level_name (str): Processing level of the product. Defaults to "LEVEL2A".
        indexes (Optional[List[int]]): Optional list of band indices to load. If None,
            all four bands (0=BLUE, 1=RED, 2=NIR, 3=SWIR) will be loaded. Defaults to None.

    Attributes:
        dims (Tuple[str]): Names of the dimensions ("band", "y", "x").
        indexes (List[int]): List of band indices to load.
        dtype: Data type of the radiometry data.
        count (int): Number of bands to be loaded.
        shape (Tuple[int, int, int]): Shape of the data (bands, height, width).
        values (np.ndarray): The radiometry data values.

    Examples:
        >>> # Initialize the ProbaVRadiometry reader with a data path
        >>> probav_rad = ProbaVRadiometry('/path/to/probav_product.HDF5')
        >>> # Load only RED and NIR bands
        >>> probav_rad_rn = ProbaVRadiometry('/path/to/probav_product.HDF5', indexes=[1, 2])
        >>> # Get the data as a GeoTensor
        >>> geotensor_data = probav_rad.load()
    """

    def __init__(
        self,
        hdf5_file: str,
        window: Optional[rasterio.windows.Window] = None,
        level_name: str = "LEVEL2A",
        indexes: Optional[List[int]] = None,
    ):
        super().__init__(hdf5_file=hdf5_file, window=window, level_name=level_name)
        self.dims = ("band", "y", "x")

        # let read only some bands?
        if indexes is None:
            self.indexes = [0, 1, 2, 3]
        else:
            self.indexes = indexes

        self.dtype = self.dtype_radiometry

    @property
    def count(self):
        return len(self.indexes)

    def load(self, boundless: bool = True) -> geotensor.GeoTensor:
        return self.load_radiometry(boundless=boundless, indexes=self.indexes)

    @property
    def shape(self) -> Tuple:
        return self.count, self.window_focus.height, self.window_focus.width

    @property
    def width(self) -> int:
        return self.window_focus.width

    @property
    def height(self) -> int:
        return self.window_focus.height

    @property
    def values(self) -> np.ndarray:
        return self.load_radiometry(boundless=True, indexes=self.indexes).values

    def __copy__(self) -> "__class__":
        return ProbaVRadiometry(
            self.hdf5_file,
            window=self.window_focus,
            level_name=self.level_name,
            indexes=self.indexes,
        )

ProbaVSM

Bases: ProbaV

A specialized ProbaV reader class focused on Status Map (SM) data.

This class extends the base ProbaV class to provide a simplified interface for working with the status map band from Proba-V products. The SM band contains information about the pixel quality, cloud status, etc.

Parameters:

Name Type Description Default
hdf5_file str

Path to the HDF5 file containing the Proba-V product.

required
window Optional[Window]

Optional window to focus on a specific region of the image. Defaults to None (entire image).

None
level_name str

Processing level of the product. Defaults to "LEVEL2A".

'LEVEL2A'

Attributes:

Name Type Description
dims Tuple[str]

Names of the dimensions ("y", "x").

dtype

Data type of the SM data.

shape Tuple[int, int]

Shape of the SM data (height, width).

values ndarray

The SM data values.

Examples:

>>> # Initialize the ProbaVSM reader with a data path
>>> probav_sm = ProbaVSM('/path/to/probav_product.HDF5')
>>> # Get the SM data as a GeoTensor
>>> sm_data = probav_sm.load()
>>> # Extract cloud information
>>> cloud_mask = sm_cloud_mask(sm_data.values)
Source code in georeader/readers/probav_image_operational.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
class ProbaVSM(ProbaV):
    """
    A specialized ProbaV reader class focused on Status Map (SM) data.

    This class extends the base ProbaV class to provide a simplified interface
    for working with the status map band from Proba-V products. The SM band
    contains information about the pixel quality, cloud status, etc.

    Args:
        hdf5_file (str): Path to the HDF5 file containing the Proba-V product.
        window (Optional[rasterio.windows.Window]): Optional window to focus on a specific
            region of the image. Defaults to None (entire image).
        level_name (str): Processing level of the product. Defaults to "LEVEL2A".

    Attributes:
        dims (Tuple[str]): Names of the dimensions ("y", "x").
        dtype: Data type of the SM data.
        shape (Tuple[int, int]): Shape of the SM data (height, width).
        values (np.ndarray): The SM data values.

    Examples:
        >>> # Initialize the ProbaVSM reader with a data path
        >>> probav_sm = ProbaVSM('/path/to/probav_product.HDF5')
        >>> # Get the SM data as a GeoTensor
        >>> sm_data = probav_sm.load()
        >>> # Extract cloud information
        >>> cloud_mask = sm_cloud_mask(sm_data.values)
    """

    def __init__(
        self,
        hdf5_file: str,
        window: Optional[rasterio.windows.Window] = None,
        level_name: str = "LEVEL2A",
    ):
        super().__init__(hdf5_file=hdf5_file, window=window, level_name=level_name)
        self.dims = ("y", "x")
        self.dtype = self.dtype_sm

    def load(self, boundless: bool = True) -> geotensor.GeoTensor:
        return self.load_sm(boundless=boundless)

    @property
    def shape(self) -> Tuple:
        return self.window_focus.height, self.window_focus.width

    @property
    def width(self) -> int:
        return self.window_focus.width

    @property
    def height(self) -> int:
        return self.window_focus.height

    @property
    def values(self) -> np.ndarray:
        return self.load_sm(boundless=True).values

    def __copy__(self) -> "__class__":
        return ProbaVSM(
            self.hdf5_file, window=self.window_focus, level_name=self.level_name
        )

SPOT-VGT Reader

The SPOT-VGT reader provides functionality for reading SPOT-VGT products. Features include:

  • HDF4 file format support
  • Handling of radiometry and quality layers
  • Cloud and shadow mask extraction

Note: See the Proba-V tutorial for similar processing workflows as both sensors share similar data structures.

API Reference

SPOT VGT reader

Unofficial reader for SPOT VGT products. The reader is based on the user manual: https://docs.terrascope.be/DataProducts/SPOT-VGT/references/SPOT_VGT_PUM_v1.3.pdf

Authors: Dan Lopez-Puigdollers, Gonzalo Mateo-García

SpotVGT

SPOT-VGT reader for handling SPOT Vegetation satellite products.

This class provides functionality to read and manipulate SPOT-VGT satellite imagery products. It handles the specific format and metadata of SPOT-VGT HDF4 files, supporting operations like loading radiometry data, masks, and cloud information.

Parameters:

Name Type Description Default
hdf4_file str

Path to the HDF4 file or directory containing the SPOT-VGT product.

required
window Optional[Window]

Optional window to focus on a specific region of the image. Defaults to None (entire image).

None

Attributes:

Name Type Description
hdf4_file str

Path to the HDF4 file.

name str

Basename of the HDF4 file.

satelliteID str

Satellite ID extracted from the filename.

station str

Station code extracted from the filename.

productID str

Product ID extracted from the filename.

year, month, day (str

Date components extracted from the filename.

segment str

Segment identifier extracted from the filename.

version str

Product version extracted from the filename.

files List[str]

List of files in the SPOT-VGT product.

files_dict Dict[str, str]

Dictionary mapping band names to file paths.

metadata Dict[str, str]

Metadata extracted from the LOG file.

real_shape Tuple[int, int]

Shape of the full image (height, width).

real_transform Affine

Affine transform for the full image.

dtype_radiometry

Data type for radiometry data (typically np.float32).

window_focus Window

Current window focus.

window_data Window

Window representing the full data extent.

start_date datetime

Start acquisition date and time.

end_date datetime

End acquisition date and time.

crs

Coordinate reference system.

toatoc str

Indicator of whether data is TOA (top of atmosphere).

res_name str

Resolution name identifier (e.g., '1KM').

level_name str

Processing level identifier.

Examples:

>>> import rasterio.windows
>>> # Initialize the SpotVGT reader with a data path
>>> spot_reader = SpotVGT('/path/to/V2KRNP____20140321F146_V003')
>>> # Load radiometry data
>>> bands = spot_reader.load_radiometry()
>>> # Get cloud mask
>>> cloud_mask = spot_reader.load_sm_cloud_mask()
>>> # Focus on a specific window
>>> window = rasterio.windows.Window(col_off=100, row_off=100, width=200, height=200)
>>> spot_reader.set_window(window)
Source code in georeader/readers/spotvgt_image_operational.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
class SpotVGT:
    """
    SPOT-VGT reader for handling SPOT Vegetation satellite products.

    This class provides functionality to read and manipulate SPOT-VGT satellite imagery products.
    It handles the specific format and metadata of SPOT-VGT HDF4 files, supporting operations
    like loading radiometry data, masks, and cloud information.

    Args:
        hdf4_file (str): Path to the HDF4 file or directory containing the SPOT-VGT product.
        window (Optional[rasterio.windows.Window]): Optional window to focus on a specific 
            region of the image. Defaults to None (entire image).

    Attributes:
        hdf4_file (str): Path to the HDF4 file.
        name (str): Basename of the HDF4 file.
        satelliteID (str): Satellite ID extracted from the filename.
        station (str): Station code extracted from the filename.
        productID (str): Product ID extracted from the filename.
        year, month, day (str): Date components extracted from the filename.
        segment (str): Segment identifier extracted from the filename.
        version (str): Product version extracted from the filename.
        files (List[str]): List of files in the SPOT-VGT product.
        files_dict (Dict[str, str]): Dictionary mapping band names to file paths.
        metadata (Dict[str, str]): Metadata extracted from the LOG file.
        real_shape (Tuple[int, int]): Shape of the full image (height, width).
        real_transform (rasterio.Affine): Affine transform for the full image.
        dtype_radiometry: Data type for radiometry data (typically np.float32).
        window_focus (rasterio.windows.Window): Current window focus.
        window_data (rasterio.windows.Window): Window representing the full data extent.
        start_date (dt.datetime): Start acquisition date and time.
        end_date (dt.datetime): End acquisition date and time.
        crs: Coordinate reference system.
        toatoc (str): Indicator of whether data is TOA (top of atmosphere).
        res_name (str): Resolution name identifier (e.g., '1KM').
        level_name (str): Processing level identifier.

    Examples:
        >>> import rasterio.windows
        >>> # Initialize the SpotVGT reader with a data path
        >>> spot_reader = SpotVGT('/path/to/V2KRNP____20140321F146_V003')
        >>> # Load radiometry data
        >>> bands = spot_reader.load_radiometry()
        >>> # Get cloud mask
        >>> cloud_mask = spot_reader.load_sm_cloud_mask()
        >>> # Focus on a specific window
        >>> window = rasterio.windows.Window(col_off=100, row_off=100, width=200, height=200)
        >>> spot_reader.set_window(window)
    """
    def __init__(self, hdf4_file: str, window: Optional[rasterio.windows.Window] = None):
        self.hdf4_file = hdf4_file
        self.name = os.path.basename(self.hdf4_file)
        matches = re.match(r'V(\d{1})(\w{3})(\w{1})____(\d{4})(\d{2})(\d{2})F(\w{3})_V(\d{3})', self.name)
        if matches is not None:
            (self.satelliteID, self.station, self.productID, self.year,
             self.month, self.day, self.segment, self.version) = matches.groups()
        else:
            raise FileNotFoundError("SPOT-VGT product not recognized %s" % self.hdf4_file)

        try:
            self.files = sorted([f for f in glob(os.path.join(self.hdf4_file, '*'))])
            self.files_dict = {re.match(r'V\d{12}_(\w+)',
                                        os.path.basename(self.files[i])).groups()[0]: self.files[i]
                               for i in range(len(self.files))}

            with open(self.files_dict['LOG'], "r") as f:
                self.metadata = {re.split(r'\s+', y)[0]: re.split(r'\s+', y)[1] for y in [x for x in f]}

            self.real_shape = (
                int(self.metadata["IMAGE_LOWER_RIGHT_ROW"]) - int(self.metadata["IMAGE_UPPER_LEFT_ROW"]) - 1,
                int(self.metadata["IMAGE_LOWER_RIGHT_COL"]) - int(self.metadata["IMAGE_UPPER_LEFT_COL"]) - 1)

            bbox = [
                float(self.metadata['CARTO_LOWER_LEFT_X']),
                float(self.metadata['CARTO_LOWER_LEFT_Y']),
                float(self.metadata['CARTO_UPPER_RIGHT_X']),
                float(self.metadata['CARTO_UPPER_RIGHT_Y'])
            ]
            self.real_transform = rasterio.transform.from_bounds(*bbox, width=self.real_shape[1],
                                                                 height=self.real_shape[0])

            self.dtype_radiometry = np.float32

        except OSError as e:
            raise FileNotFoundError("Error reading product %s" % self.hdf4_file)

        if window is None:
            self.window_focus = rasterio.windows.Window(row_off=0, col_off=0,
                                                        width=self.real_shape[1],
                                                        height=self.real_shape[0])
        else:
            self.window_focus = rasterio.windows.Window(row_off=0, col_off=0,
                                                        width=self.real_shape[1],
                                                        height=self.real_shape[0])

        self.window_data = rasterio.windows.Window(row_off=0, col_off=0,
                                                   width=self.real_shape[1],
                                                   height=self.real_shape[0])

        year, month, day = re.match(r'(\d{4})(\d{2})(\d{2})', self.metadata['SEGM_FIRST_DATE']).groups()
        hh, mm, ss = re.match(r'(\d{2})(\d{2})(\d{2})', self.metadata['SEGM_FIRST_TIME']).groups()

        self.start_date = dt.datetime(day=int(day), month=int(month), year=int(year),
                                      hour=int(hh), minute=int(mm), second=int(ss), tzinfo=dt.timezone.utc)

        year, month, day = re.match(r'(\d{4})(\d{2})(\d{2})', self.metadata['SEGM_LAST_DATE']).groups()
        hh, mm, ss = re.match(r'(\d{2})(\d{2})(\d{2})', self.metadata['SEGM_LAST_TIME']).groups()

        self.end_date = dt.datetime(day=int(day), month=int(month), year=int(year),
                                    hour=int(hh), minute=int(mm), second=int(ss), tzinfo=dt.timezone.utc)

        # self.map_projection_wkt

        self.toatoc = "TOA"

        self.res_name = '1KM'

        # SPOT-VGT images are lat/long
        self.crs = rasterio.crs.CRS({'init': 'epsg:4326'})

        # SPOT-VGT images have four bands
        self.level_name = "LEVEL2A"

    def _get_window_pad(self, boundless: bool = True) -> Tuple[rasterio.windows.Window, Optional[List]]:
        window_read = rasterio.windows.intersection(self.window_focus, self.window_data)

        if boundless:
            _, pad_width = window_utils.get_slice_pad(self.window_data, self.window_focus)
            need_pad = any(p != 0 for p in pad_width["x"] + pad_width["y"])
            if need_pad:
                pad_list_np = []
                for k in ["y", "x"]:
                    if k in pad_width:
                        pad_list_np.append(pad_width[k])
                    else:
                        pad_list_np.append((0, 0))
            else:
                pad_list_np = None
        else:
            pad_list_np = None

        return window_read, pad_list_np

    def footprint(self, crs:Optional[str]=None) -> Polygon:
        # TODO load footprint from metadata?
        pol = window_utils.window_polygon(self.window_focus, self.transform)
        if (crs is None) or window_utils.compare_crs(self.crs, crs):
            return pol

        return window_utils.polygon_to_crs(pol, self.crs, crs)

    def valid_footprint(self, crs:Optional[str]=None) -> Polygon:
        valids = self.load_mask()
        return valids.valid_footprint(crs=crs)        

    def _load_bands(self, bands_names: Union[List[str], str], boundless: bool = True,
                    fill_value_default: Number = 0) -> geotensor.GeoTensor:
        window_read, pad_list_np = self._get_window_pad(boundless=boundless)
        slice_ = window_read.toslices()
        if isinstance(bands_names, str):
            bands_names = [bands_names]
            flatten = True
        else:
            flatten = False

        hdf_objs = {b: SD(self.files_dict[b], SDC.READ) for b in bands_names}
        # Read dataset
        # shapes = [hdf_objs[b].datasets()["PIXEL_DATA"][1] for b in bands_names]
        # data = [hdf_objs[b].select("PIXEL_DATA")[slice_] for b in bands_names]

        bands_arrs = []
        # Original slice int32 gives an error. Cast to int
        for band in bands_names:
            data = read_band_toa(hdf_objs, band, (slice(int(slice_[0].start), int(slice_[0].stop), None),
                                                  slice(int(slice_[1].start), int(slice_[1].stop), None)))
            if pad_list_np:
                data = np.pad(data, tuple(pad_list_np), mode="constant", constant_values=fill_value_default)

            bands_arrs.append(data)

        if boundless:
            transform = self.transform
        else:
            transform = rasterio.windows.transform(window_read, self.real_transform)

        if flatten:
            img = bands_arrs[0]
        else:
            img = np.stack(bands_arrs, axis=0)

        return geotensor.GeoTensor(img, transform=transform, crs=self.crs,
                                   fill_value_default=fill_value_default)

    def load_radiometry(self, indexes: Optional[List[int]] = None, boundless: bool = True) -> geotensor.GeoTensor:
        if indexes is None:
            indexes = (0, 1, 2, 3)
        # bands_names = [f"{self.level_name}/RADIOMETRY/{BAND_NAMES[i]}/{self.toatoc}" for i in indexes]
        bands_names = [BANDS_DICT[i] for i in indexes]
        return self._load_bands(bands_names, boundless=boundless, fill_value_default=0)

    def load_sm(self, boundless: bool = True) -> geotensor.GeoTensor:
        """
        Reference of values in `SM` flags.

        From [user manual](https://docs.terrascope.be/DataProducts/SPOT-VGT/references/SPOT_VGT_PUM_v1.3.pdf) pag 46
        * Clear  ->    000
        * Shadow ->    001
        * Undefined -> 010
        * Cloud  ->    011
        * Ice    ->    100
        * `2**3` sea/land
        * `2**4` quality swir (0 bad 1 good)
        * `2**5` quality nir
        * `2**6` quality red
        * `2**7` quality blue
        """
        return self._load_bands('SM', boundless=boundless, fill_value_default=0)

    def load_mask(self, boundless: bool = True) -> geotensor.GeoTensor:
        """
        Returns the valid mask (False if the pixel is out of swath or is invalid). This function loads the SM band

        Args:
            boundless (bool, optional): boundless option to load the SM band. Defaults to True.

        Returns:
            geotensor.GeoTensor: mask with the same shape as the image
        """

        sm = self.load_sm(boundless=boundless)
        valids = sm.copy()
        invalids = mask_only_sm(sm.values)
        valids.values = ~invalids
        valids.fill_value_default = False

        return valids

    def load_sm_cloud_mask(self, mask_undefined:bool=False, boundless:bool=True) -> geotensor.GeoTensor:
        sm = self.load_sm(boundless=boundless)
        cloud_mask = sm_cloud_mask(sm.values, mask_undefined=mask_undefined)
        cloud_mask+=1
        invalids = mask_only_sm(sm.values)

        cloud_mask[invalids] = 0
        return geotensor.GeoTensor(cloud_mask, transform=self.transform, crs=self.crs, fill_value_default=0)

    @property
    def transform(self) -> Affine:
        return rasterio.windows.transform(self.window_focus, self.real_transform)

    @property
    def res(self) -> Tuple[float, float]:
        return window_utils.res(self.transform)

    @property
    def height(self) -> int:
        return self.window_focus.height

    @property
    def width(self) -> int:
        return self.window_focus.width

    @property
    def bounds(self) -> Tuple[float, float, float, float]:
        return window_utils.window_bounds(self.window_focus, self.real_transform)

    def set_window(self, window:rasterio.windows.Window, relative: bool = True, boundless: bool = True):
        if relative:
            self.window_focus = rasterio.windows.Window(col_off=window.col_off + self.window_focus.col_off,
                                                        row_off=window.row_off + self.window_focus.row_off,
                                                        height=window.height, width=window.width)
        else:
            self.window_focus = window

        if not boundless:
            self.window_focus = rasterio.windows.intersection(self.window_data, self.window_focus)

    def __copy__(self) -> '__class__':
        return SpotVGT(self.hdf4_file, window=self.window_focus)

    def read_from_window(self, window: Optional[rasterio.windows.Window] = None, boundless: bool = True) -> '__class__':
        copy = self.__copy__()
        copy.set_window(window=window, boundless=boundless)

        return copy

    def __repr__(self) -> str:
        return f""" 
         File: {self.hdf4_file}
         Transform: {self.transform}
         Shape: {self.height}, {self.width}
         Resolution: {self.res}
         Bounds: {self.bounds}
         CRS: {self.crs}
         Level: {self.level_name}
         TOA/TOC: {self.toatoc}
         Resolution name : {self.res_name}
        """

load_mask(boundless=True)

Returns the valid mask (False if the pixel is out of swath or is invalid). This function loads the SM band

Parameters:

Name Type Description Default
boundless bool

boundless option to load the SM band. Defaults to True.

True

Returns:

Type Description
GeoTensor

geotensor.GeoTensor: mask with the same shape as the image

Source code in georeader/readers/spotvgt_image_operational.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def load_mask(self, boundless: bool = True) -> geotensor.GeoTensor:
    """
    Returns the valid mask (False if the pixel is out of swath or is invalid). This function loads the SM band

    Args:
        boundless (bool, optional): boundless option to load the SM band. Defaults to True.

    Returns:
        geotensor.GeoTensor: mask with the same shape as the image
    """

    sm = self.load_sm(boundless=boundless)
    valids = sm.copy()
    invalids = mask_only_sm(sm.values)
    valids.values = ~invalids
    valids.fill_value_default = False

    return valids

load_sm(boundless=True)

Reference of values in SM flags.

From user manual pag 46 * Clear -> 000 * Shadow -> 001 * Undefined -> 010 * Cloud -> 011 * Ice -> 100 * 2**3 sea/land * 2**4 quality swir (0 bad 1 good) * 2**5 quality nir * 2**6 quality red * 2**7 quality blue

Source code in georeader/readers/spotvgt_image_operational.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def load_sm(self, boundless: bool = True) -> geotensor.GeoTensor:
    """
    Reference of values in `SM` flags.

    From [user manual](https://docs.terrascope.be/DataProducts/SPOT-VGT/references/SPOT_VGT_PUM_v1.3.pdf) pag 46
    * Clear  ->    000
    * Shadow ->    001
    * Undefined -> 010
    * Cloud  ->    011
    * Ice    ->    100
    * `2**3` sea/land
    * `2**4` quality swir (0 bad 1 good)
    * `2**5` quality nir
    * `2**6` quality red
    * `2**7` quality blue
    """
    return self._load_bands('SM', boundless=boundless, fill_value_default=0)

PRISMA Reader

The PRISMA reader handles data from the Italian Space Agency's hyperspectral mission, specifically working with Level 1B radiance data (not atmospherically corrected). PRISMA provides hyperspectral imaging in the 400-2500 nm spectral range, with a spectral resolution of ~12 nm.

Key features:

  • Reading L1B hyperspectral radiance data from HDF5 format files
  • Handling separate VNIR (400-1000 nm) and SWIR (1000-2500 nm) spectral ranges
  • Georeferencing functionality for non-orthorectified data using provided latitude/longitude coordinates
  • On-demand conversion from radiance (mW/m²/sr/nm) to top-of-atmosphere reflectance
  • Spectral response function integration for accurate band simulation
  • Extraction of RGB previews from specific wavelengths
  • Access to satellite and solar geometry information for radiometric calculations

Tutorial examples:

API Reference

PRISMA

PRISMA (PRecursore IperSpettrale della Missione Applicativa) image reader.

This class provides functionality to read and manipulate PRISMA satellite imagery products. It handles the specific format and metadata of PRISMA HDF5 files, supporting operations like loading specific wavelengths, RGB composites, and converting to reflectance.

Parameters:

Name Type Description Default
filename str

Path to the PRISMA HDF5 file.

required

Attributes:

Name Type Description
filename str

Path to the PRISMA file.

swir_cube_dat str

Path to SWIR cube data in HDF5 file.

vni_cube_dat str

Path to VNIR cube data in HDF5 file.

lats ndarray

Latitude values for each pixel.

lons ndarray

Longitude values for each pixel.

attributes_prisma Dict

Dictionary of PRISMA metadata attributes.

nbands_vnir int

Number of VNIR bands.

vnir_range Tuple[float, float]

Wavelength range of VNIR bands (min, max).

nbands_swir int

Number of SWIR bands.

swir_range Tuple[float, float]

Wavelength range of SWIR bands (min, max).

ltoa_swir Optional[NDArray]

SWIR radiance data, lazily loaded.

ltoa_vnir Optional[NDArray]

VNIR radiance data, lazily loaded.

wavelength_swir Optional[NDArray]

SWIR wavelengths, lazily loaded.

fwhm_swir Optional[NDArray]

SWIR FWHM (Full Width at Half Maximum), lazily loaded.

wavelength_vnir Optional[NDArray]

VNIR wavelengths, lazily loaded.

fwhm_vnir Optional[NDArray]

VNIR FWHM, lazily loaded.

vza_swir float

Viewing zenith angle for SWIR.

vza_vnir float

Viewing zenith angle for VNIR.

sza_swir float

Solar zenith angle for SWIR.

sza_vnir float

Solar zenith angle for VNIR.

time_coverage_start datetime

Start time of acquisition.

time_coverage_end datetime

End time of acquisition.

units str

Units of radiance data (mW/m2/sr/nm).

Examples:

>>> # Initialize the PRISMA reader with a data path
>>> prisma = PRISMA('/path/to/prisma_file.he5')
>>> # Load RGB bands
>>> rgb = prisma.load_rgb(as_reflectance=True)
>>> # Load specific wavelengths
>>> bands = prisma.load_wavelengths([850, 1600, 2200], as_reflectance=True)
>>> # Get image metadata
>>> print(prisma)
Source code in georeader/readers/prisma.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
class PRISMA:
    """
    PRISMA (PRecursore IperSpettrale della Missione Applicativa) image reader.

    This class provides functionality to read and manipulate PRISMA satellite imagery products.
    It handles the specific format and metadata of PRISMA HDF5 files, supporting operations
    like loading specific wavelengths, RGB composites, and converting to reflectance.

    Args:
        filename (str): Path to the PRISMA HDF5 file.

    Attributes:
        filename (str): Path to the PRISMA file.
        swir_cube_dat (str): Path to SWIR cube data in HDF5 file.
        vni_cube_dat (str): Path to VNIR cube data in HDF5 file.
        lats (numpy.ndarray): Latitude values for each pixel.
        lons (numpy.ndarray): Longitude values for each pixel.
        attributes_prisma (Dict): Dictionary of PRISMA metadata attributes.
        nbands_vnir (int): Number of VNIR bands.
        vnir_range (Tuple[float, float]): Wavelength range of VNIR bands (min, max).
        nbands_swir (int): Number of SWIR bands.
        swir_range (Tuple[float, float]): Wavelength range of SWIR bands (min, max).
        ltoa_swir (Optional[NDArray]): SWIR radiance data, lazily loaded.
        ltoa_vnir (Optional[NDArray]): VNIR radiance data, lazily loaded.
        wavelength_swir (Optional[NDArray]): SWIR wavelengths, lazily loaded.
        fwhm_swir (Optional[NDArray]): SWIR FWHM (Full Width at Half Maximum), lazily loaded.
        wavelength_vnir (Optional[NDArray]): VNIR wavelengths, lazily loaded.
        fwhm_vnir (Optional[NDArray]): VNIR FWHM, lazily loaded.
        vza_swir (float): Viewing zenith angle for SWIR.
        vza_vnir (float): Viewing zenith angle for VNIR.
        sza_swir (float): Solar zenith angle for SWIR.
        sza_vnir (float): Solar zenith angle for VNIR.
        time_coverage_start (datetime): Start time of acquisition.
        time_coverage_end (datetime): End time of acquisition.
        units (str): Units of radiance data (mW/m2/sr/nm).

    Examples:
        >>> # Initialize the PRISMA reader with a data path
        >>> prisma = PRISMA('/path/to/prisma_file.he5')
        >>> # Load RGB bands
        >>> rgb = prisma.load_rgb(as_reflectance=True)
        >>> # Load specific wavelengths
        >>> bands = prisma.load_wavelengths([850, 1600, 2200], as_reflectance=True)
        >>> # Get image metadata
        >>> print(prisma)
    """

    def __init__(self, filename: str) -> None:
        if not os.path.exists(filename):
            raise FileNotFoundError(f"File {filename} not found")
        self.filename = filename
        self.swir_cube_dat = SWIR_FLAG["swir_cube_dat"][True]
        self.vni_cube_dat = SWIR_FLAG["swir_cube_dat"][False]

        with h5py.File(filename, mode="r") as f:
            dset = f[HE5_COORDS["swir_lat"]]
            self.lats = np.flip(dset[:, :], axis=0)
            dset = f[HE5_COORDS["swir_lon"]]
            self.lons = np.flip(dset[:, :], axis=0)
            self.attributes_prisma = dict(f.attrs)
            sza = f.attrs["Sun_zenith_angle"]

        arr = self.attributes_prisma["List_Cw_Vnir"][
            self.attributes_prisma["List_Cw_Vnir"] > 0
        ]
        self.nbands_vnir = len(arr)
        self.vnir_range = arr.min(), arr.max()
        arr = self.attributes_prisma["List_Cw_Swir"][
            self.attributes_prisma["List_Cw_Swir"] > 0
        ]
        self.swir_range = arr.min(), arr.max()
        self.nbands_swir = len(arr)

        self.ltoa_swir: Optional[NDArray] = None
        self.ltoa_vnir: Optional[NDArray] = None
        self.wavelength_swir: Optional[NDArray] = None
        self.fwhm_swir: Optional[NDArray] = None
        self.wavelength_vnir: Optional[NDArray] = None
        self.fwhm_vnir: Optional[NDArray] = None
        self.vza_swir: float = 0
        self.vza_vnir: float = 0
        self.sza_swir: float = sza
        self.sza_vnir: float = sza

        # self.time_coverage_start = self.attributes_prisma['Product_StartTime']
        self.time_coverage_start = datetime.fromisoformat(
            self.attributes_prisma["Product_StartTime"].decode("utf-8")
        ).replace(tzinfo=timezone.utc)
        self.time_coverage_end = datetime.fromisoformat(
            self.attributes_prisma["Product_StopTime"].decode("utf-8")
        ).replace(tzinfo=timezone.utc)
        self.units = "mW/m2/sr/nm"  # same as W/m^2/SR/um

        self._footprint = griddata.footprint(self.lons, self.lats)
        self._observation_date_correction_factor: Optional[float] = None

    def footprint(self, crs: Optional[str] = None) -> GeoTensor:
        if (crs is None) or compare_crs("EPSG:4326", crs):
            return self._footprint

        return window_utils.polygon_to_crs(
            self._footprint, crs_polygon="EPSG:4326", crs_dst=crs
        )

    @property
    def observation_date_correction_factor(self) -> float:
        if self._observation_date_correction_factor is None:
            self._observation_date_correction_factor = (
                reflectance.observation_date_correction_factor(
                    date_of_acquisition=self.time_coverage_start,
                    center_coords=self.footprint("EPSG:4326").centroid.coords[0],
                )
            )
        return self._observation_date_correction_factor

    @property
    def bounds(self) -> Tuple[float, float, float, float]:
        return self._footprint.bounds

    def load_raw(self, swir_flag: bool) -> NDArray:
        """
        Load the all the data from all the wavelengths for the VNIR or SWIR range.
        This function caches the data, wavelegths and FWHM in the attributes of the class:
            * `ltoa_swir`, `wavelength_swir`, `fwhm_swir`, `vza_swir`, `sza_swir` if `swir_flag` is True
            * `ltoa_vnir`, `wavelength_vnir`, `fwhm_vnir`, `vza_vnir`, `sza_vnir` if `swir_flag` is False

        Args:
            swir_flag (bool): if True it will load the SWIR range, otherwise it will load the VNIR range

        Returns:
            NDArray: 3D array with the reflectance values (H, W, B)
                where N and M are the dimensions of the image and B is the number of bands.
        """

        if swir_flag:
            if all(
                x is not None
                for x in [
                    self.ltoa_swir,
                    self.wavelength_swir,
                    self.fwhm_swir,
                    self.vza_swir,
                    self.sza_swir,
                ]
            ):
                return self.ltoa_swir
        else:
            if all(
                x is not None
                for x in [
                    self.ltoa_vnir,
                    self.wavelength_vnir,
                    self.fwhm_vnir,
                    self.vza_vnir,
                    self.sza_vnir,
                ]
            ):
                return self.ltoa_vnir

        swir_cube_dat = SWIR_FLAG["swir_cube_dat"][swir_flag]
        swir_lab = SWIR_FLAG["swir_lab"][swir_flag]  # True: "Swir", False: "Vnir"

        with h5py.File(self.filename, "r") as f:
            dset = f[swir_cube_dat]

            ltoa_img = np.flip(np.transpose(dset[:, :, :], axes=[0, 2, 1]), axis=0)

            dset = f["/KDP_AUX/Cw_" + swir_lab + "_Matrix"]
            wvl_mat_ini = dset[:, :]

            dset = f["/KDP_AUX/Fwhm_" + swir_lab + "_Matrix"]
            fwhm_mat_ini = dset[:, :]

            sc_fac = f.attrs["ScaleFactor_" + swir_lab]

            of_fac = f.attrs["Offset_" + swir_lab]

            vza = 0.0
            sza = f.attrs["Sun_zenith_angle"]

            ltoa_img = ltoa_img / sc_fac - of_fac

        # Lambda
        wvl_mat_ini = np.flip(wvl_mat_ini, axis=1)
        li_no0 = np.where(wvl_mat_ini[100, :] > 0)[0]
        wvl_mat = np.copy(wvl_mat_ini[:, li_no0])
        wl_center_ini = np.mean(wvl_mat, axis=0)

        # FWHM
        fwhm_mat_ini = np.flip(fwhm_mat_ini, axis=1)
        fwhm_mat = np.copy(fwhm_mat_ini[:, li_no0])

        M, N, B_tot = ltoa_img.shape

        if swir_flag:
            if B_tot == len(wl_center_ini):
                ltoa_img = np.flip(ltoa_img, axis=2)
            else:
                ltoa_img = np.flip(ltoa_img[:, :, :-2], axis=2)

        else:
            if B_tot == len(wl_center_ini):
                ltoa_img = np.flip(ltoa_img, axis=2)
            else:
                ltoa_img = np.flip(ltoa_img[:, :, 3:], axis=2)  # Revisar esto(not sure)

        ltoa_img = np.transpose(ltoa_img, (1, 0, 2))
        if swir_flag:
            self.ltoa_swir = ltoa_img
            self.wavelength_swir = wvl_mat
            self.fwhm_swir = fwhm_mat
            self.vza_swir = vza
            self.sza_swir = sza
        else:
            self.ltoa_vnir = ltoa_img
            self.wavelength_vnir = wvl_mat
            self.fwhm_vnir = fwhm_mat
            self.vza_vnir = vza
            self.sza_vnir = sza

        return ltoa_img

    # def target_spectrum(self, swir_flag:bool) -> NDArray:

    #     if swir_flag:
    #         vza = self.vza_swir
    #         sza = self.sza_swir
    #         band_array = self.wavelength_swir
    #         fwhm_array = self.fwhm_swir
    #         N, M, B = self.ltoa_swir.shape
    #     else:
    #         vza = self.vza_vnir
    #         sza = self.sza_vnir
    #         band_array = self.wavelength_vnir
    #         fwhm_array = self.fwhm_vnir
    #         N, M, B = self.ltoa_vnir.shape

    #     amf = 1.0 / np.cos(vza * np.pi / 180) + 1.0 / np.cos(sza * np.pi / 180)
    #     parent_dir = os.path.dirname(os.path.dirname(__file__))
    #     file_lut_gas = os.path.join(parent_dir, LUT_FILE["name"])

    #     wvl_mod, t_gas_arr, gas_sc_arr, mr_gas_arr = read_luts(
    #         file_lut=file_lut_gas,
    #         t_arr_str=LUT_FILE["t_arr_variable"],
    #         sc_arr_str=LUT_FILE["sc_arr_variable"],
    #         mr_arr_str=LUT_FILE["mr_arr_variable"],
    #         amf=amf,
    #     )

    #     n_wvl = len(wvl_mod)
    #     mr_gas_arr = mr_gas_arr / 1000.0
    #     delta_mr_ref = 1.0

    #     k_spectre = calc_jac_rad(mr_gas_arr, n_wvl, t_gas_arr, delta_mr_ref)
    #     k_array = np.zeros((M, B))
    #     for i in range(0, M):
    #         s = generate_filter(wvl_mod, band_array[i], fwhm_array[i])
    #         k = np.dot(k_spectre, s)
    #         k_array[i] = k

    #     return k_array

    def load_wavelengths(
        self,
        wavelengths: Union[float, List[float], NDArray],
        as_reflectance: bool = True,
        raw: bool = True,
        resolution_dst=30,
        dst_crs: Optional[Any] = None,
        fill_value_default: float = -1,
    ) -> Union[GeoTensor, NDArray]:
        """
        Load the reflectance of the given wavelengths

        Args:
            wavelengths (Union[float, List[float], NDArray]): List of wavelengths to load
            as_reflectance (bool, optional): return the values as reflectance rather than radiance. Defaults to True.
                If False values will have units of W/m^2/SR/um (`self.units`)
            raw (bool, optional): if True it will return the raw values,
                if False it will return the values reprojected to the specified CRS and resolution. Defaults to True.
            resolution_dst (int, optional): if raw is False, it will reproject the values to this resolution. Defaults to 30.
            dst_crs (Optional[Any], optional): if None it will use the corresponding UTM zone.
            fill_value_default (float, optional): fill value. Defaults to -1.

        Returns:
            Union[GeoTensor, NDArray]: if raw is True it will return a NDArray with the values, otherwise it will return a GeoTensor
                with the reprojected values in its `.values` attribute.
        """

        if isinstance(wavelengths, Number):
            wavelengths = np.array([wavelengths])
        else:
            wavelengths = np.array(wavelengths)

        load_swir = any(
            [
                wvl >= self.swir_range[0] and wvl < self.swir_range[1]
                for wvl in wavelengths
            ]
        )
        load_vnir = any(
            [
                wvl >= self.vnir_range[0] and wvl < self.vnir_range[1]
                for wvl in wavelengths
            ]
        )
        if load_swir:
            self.load_raw(swir_flag=True)
            wavelength_swir_mean = np.mean(self.wavelength_swir, axis=0)
            fwhm_swir_mean = np.mean(self.fwhm_swir, axis=0)
        if load_vnir:
            self.load_raw(swir_flag=False)
            wavelength_vnir_mean = np.mean(self.wavelength_vnir, axis=0)
            fwhm_vnir_mean = np.mean(self.fwhm_vnir, axis=0)

        ltoa_img = []
        fwhm = []
        for b in range(len(wavelengths)):
            if (
                wavelengths[b] >= self.swir_range[0]
                and wavelengths[b] < self.swir_range[1]
            ):
                index_band = np.argmin(np.abs(wavelengths[b] - wavelength_swir_mean))
                fwhm.append(fwhm_swir_mean[index_band])
                img = self.ltoa_swir[..., index_band]
            else:
                index_band = np.argmin(np.abs(wavelengths[b] - wavelength_vnir_mean))
                fwhm.append(fwhm_vnir_mean[index_band])
                img = self.ltoa_vnir[..., index_band]

            ltoa_img.append(img)

        # Transpose to row major
        ltoa_img = np.transpose(np.stack(ltoa_img, axis=0), (0, 2, 1))

        if as_reflectance:
            thuiller = reflectance.load_thuillier_irradiance()
            response = reflectance.srf(wavelengths, fwhm, thuiller["Nanometer"].values)

            solar_irradiance_norm = thuiller["Radiance(mW/m2/nm)"].values.dot(
                response
            )  # mW/m$^2$/nm
            solar_irradiance_norm /= 1_000  # W/m$^2$/nm

            ltoa_img = reflectance.radiance_to_reflectance(
                ltoa_img,
                solar_irradiance_norm,
                units=self.units,
                observation_date_corr_factor=self.observation_date_correction_factor,
            )

        if raw:
            return ltoa_img

        return griddata.read_to_crs(
            np.transpose(ltoa_img, (1, 2, 0)),
            lons=self.lons,
            lats=self.lats,
            resolution_dst=resolution_dst,
            dst_crs=dst_crs,
            fill_value_default=fill_value_default,
        )

    def load_rgb(
        self, as_reflectance: bool = True, raw: bool = True
    ) -> Union[GeoTensor, NDArray]:
        return self.load_wavelengths(
            wavelengths=WAVELENGTHS_RGB, as_reflectance=as_reflectance, raw=raw
        )

    def __repr__(self) -> str:
        return f"""
        File: {self.filename}
        Bounds: {self.bounds}
        Time: {self.time_coverage_start}
        VNIR Range: {self.vnir_range} {self.nbands_vnir} bands
        SWIR Range: {self.swir_range} {self.nbands_swir} bands
        """

load_raw(swir_flag)

Load the all the data from all the wavelengths for the VNIR or SWIR range. This function caches the data, wavelegths and FWHM in the attributes of the class: * ltoa_swir, wavelength_swir, fwhm_swir, vza_swir, sza_swir if swir_flag is True * ltoa_vnir, wavelength_vnir, fwhm_vnir, vza_vnir, sza_vnir if swir_flag is False

Parameters:

Name Type Description Default
swir_flag bool

if True it will load the SWIR range, otherwise it will load the VNIR range

required

Returns:

Name Type Description
NDArray NDArray

3D array with the reflectance values (H, W, B) where N and M are the dimensions of the image and B is the number of bands.

Source code in georeader/readers/prisma.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def load_raw(self, swir_flag: bool) -> NDArray:
    """
    Load the all the data from all the wavelengths for the VNIR or SWIR range.
    This function caches the data, wavelegths and FWHM in the attributes of the class:
        * `ltoa_swir`, `wavelength_swir`, `fwhm_swir`, `vza_swir`, `sza_swir` if `swir_flag` is True
        * `ltoa_vnir`, `wavelength_vnir`, `fwhm_vnir`, `vza_vnir`, `sza_vnir` if `swir_flag` is False

    Args:
        swir_flag (bool): if True it will load the SWIR range, otherwise it will load the VNIR range

    Returns:
        NDArray: 3D array with the reflectance values (H, W, B)
            where N and M are the dimensions of the image and B is the number of bands.
    """

    if swir_flag:
        if all(
            x is not None
            for x in [
                self.ltoa_swir,
                self.wavelength_swir,
                self.fwhm_swir,
                self.vza_swir,
                self.sza_swir,
            ]
        ):
            return self.ltoa_swir
    else:
        if all(
            x is not None
            for x in [
                self.ltoa_vnir,
                self.wavelength_vnir,
                self.fwhm_vnir,
                self.vza_vnir,
                self.sza_vnir,
            ]
        ):
            return self.ltoa_vnir

    swir_cube_dat = SWIR_FLAG["swir_cube_dat"][swir_flag]
    swir_lab = SWIR_FLAG["swir_lab"][swir_flag]  # True: "Swir", False: "Vnir"

    with h5py.File(self.filename, "r") as f:
        dset = f[swir_cube_dat]

        ltoa_img = np.flip(np.transpose(dset[:, :, :], axes=[0, 2, 1]), axis=0)

        dset = f["/KDP_AUX/Cw_" + swir_lab + "_Matrix"]
        wvl_mat_ini = dset[:, :]

        dset = f["/KDP_AUX/Fwhm_" + swir_lab + "_Matrix"]
        fwhm_mat_ini = dset[:, :]

        sc_fac = f.attrs["ScaleFactor_" + swir_lab]

        of_fac = f.attrs["Offset_" + swir_lab]

        vza = 0.0
        sza = f.attrs["Sun_zenith_angle"]

        ltoa_img = ltoa_img / sc_fac - of_fac

    # Lambda
    wvl_mat_ini = np.flip(wvl_mat_ini, axis=1)
    li_no0 = np.where(wvl_mat_ini[100, :] > 0)[0]
    wvl_mat = np.copy(wvl_mat_ini[:, li_no0])
    wl_center_ini = np.mean(wvl_mat, axis=0)

    # FWHM
    fwhm_mat_ini = np.flip(fwhm_mat_ini, axis=1)
    fwhm_mat = np.copy(fwhm_mat_ini[:, li_no0])

    M, N, B_tot = ltoa_img.shape

    if swir_flag:
        if B_tot == len(wl_center_ini):
            ltoa_img = np.flip(ltoa_img, axis=2)
        else:
            ltoa_img = np.flip(ltoa_img[:, :, :-2], axis=2)

    else:
        if B_tot == len(wl_center_ini):
            ltoa_img = np.flip(ltoa_img, axis=2)
        else:
            ltoa_img = np.flip(ltoa_img[:, :, 3:], axis=2)  # Revisar esto(not sure)

    ltoa_img = np.transpose(ltoa_img, (1, 0, 2))
    if swir_flag:
        self.ltoa_swir = ltoa_img
        self.wavelength_swir = wvl_mat
        self.fwhm_swir = fwhm_mat
        self.vza_swir = vza
        self.sza_swir = sza
    else:
        self.ltoa_vnir = ltoa_img
        self.wavelength_vnir = wvl_mat
        self.fwhm_vnir = fwhm_mat
        self.vza_vnir = vza
        self.sza_vnir = sza

    return ltoa_img

load_wavelengths(wavelengths, as_reflectance=True, raw=True, resolution_dst=30, dst_crs=None, fill_value_default=-1)

Load the reflectance of the given wavelengths

Parameters:

Name Type Description Default
wavelengths Union[float, List[float], NDArray]

List of wavelengths to load

required
as_reflectance bool

return the values as reflectance rather than radiance. Defaults to True. If False values will have units of W/m^2/SR/um (self.units)

True
raw bool

if True it will return the raw values, if False it will return the values reprojected to the specified CRS and resolution. Defaults to True.

True
resolution_dst int

if raw is False, it will reproject the values to this resolution. Defaults to 30.

30
dst_crs Optional[Any]

if None it will use the corresponding UTM zone.

None
fill_value_default float

fill value. Defaults to -1.

-1

Returns:

Type Description
Union[GeoTensor, NDArray]

Union[GeoTensor, NDArray]: if raw is True it will return a NDArray with the values, otherwise it will return a GeoTensor with the reprojected values in its .values attribute.

Source code in georeader/readers/prisma.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def load_wavelengths(
    self,
    wavelengths: Union[float, List[float], NDArray],
    as_reflectance: bool = True,
    raw: bool = True,
    resolution_dst=30,
    dst_crs: Optional[Any] = None,
    fill_value_default: float = -1,
) -> Union[GeoTensor, NDArray]:
    """
    Load the reflectance of the given wavelengths

    Args:
        wavelengths (Union[float, List[float], NDArray]): List of wavelengths to load
        as_reflectance (bool, optional): return the values as reflectance rather than radiance. Defaults to True.
            If False values will have units of W/m^2/SR/um (`self.units`)
        raw (bool, optional): if True it will return the raw values,
            if False it will return the values reprojected to the specified CRS and resolution. Defaults to True.
        resolution_dst (int, optional): if raw is False, it will reproject the values to this resolution. Defaults to 30.
        dst_crs (Optional[Any], optional): if None it will use the corresponding UTM zone.
        fill_value_default (float, optional): fill value. Defaults to -1.

    Returns:
        Union[GeoTensor, NDArray]: if raw is True it will return a NDArray with the values, otherwise it will return a GeoTensor
            with the reprojected values in its `.values` attribute.
    """

    if isinstance(wavelengths, Number):
        wavelengths = np.array([wavelengths])
    else:
        wavelengths = np.array(wavelengths)

    load_swir = any(
        [
            wvl >= self.swir_range[0] and wvl < self.swir_range[1]
            for wvl in wavelengths
        ]
    )
    load_vnir = any(
        [
            wvl >= self.vnir_range[0] and wvl < self.vnir_range[1]
            for wvl in wavelengths
        ]
    )
    if load_swir:
        self.load_raw(swir_flag=True)
        wavelength_swir_mean = np.mean(self.wavelength_swir, axis=0)
        fwhm_swir_mean = np.mean(self.fwhm_swir, axis=0)
    if load_vnir:
        self.load_raw(swir_flag=False)
        wavelength_vnir_mean = np.mean(self.wavelength_vnir, axis=0)
        fwhm_vnir_mean = np.mean(self.fwhm_vnir, axis=0)

    ltoa_img = []
    fwhm = []
    for b in range(len(wavelengths)):
        if (
            wavelengths[b] >= self.swir_range[0]
            and wavelengths[b] < self.swir_range[1]
        ):
            index_band = np.argmin(np.abs(wavelengths[b] - wavelength_swir_mean))
            fwhm.append(fwhm_swir_mean[index_band])
            img = self.ltoa_swir[..., index_band]
        else:
            index_band = np.argmin(np.abs(wavelengths[b] - wavelength_vnir_mean))
            fwhm.append(fwhm_vnir_mean[index_band])
            img = self.ltoa_vnir[..., index_band]

        ltoa_img.append(img)

    # Transpose to row major
    ltoa_img = np.transpose(np.stack(ltoa_img, axis=0), (0, 2, 1))

    if as_reflectance:
        thuiller = reflectance.load_thuillier_irradiance()
        response = reflectance.srf(wavelengths, fwhm, thuiller["Nanometer"].values)

        solar_irradiance_norm = thuiller["Radiance(mW/m2/nm)"].values.dot(
            response
        )  # mW/m$^2$/nm
        solar_irradiance_norm /= 1_000  # W/m$^2$/nm

        ltoa_img = reflectance.radiance_to_reflectance(
            ltoa_img,
            solar_irradiance_norm,
            units=self.units,
            observation_date_corr_factor=self.observation_date_correction_factor,
        )

    if raw:
        return ltoa_img

    return griddata.read_to_crs(
        np.transpose(ltoa_img, (1, 2, 0)),
        lons=self.lons,
        lats=self.lats,
        resolution_dst=resolution_dst,
        dst_crs=dst_crs,
        fill_value_default=fill_value_default,
    )

EMIT Reader

The EMIT (Earth Surface Mineral Dust Source Investigation) reader provides access to NASA's imaging spectrometer data from the International Space Station. This reader works with Level 1B calibrated radiance data (not atmospherically corrected).

Key features:

  • Reading L1B hyperspectral radiance data from NetCDF4 format files
  • Working with the 380-2500 nm spectral range with 7.4 nm sampling
  • Irregular grid georeferencing through GLT (Geographic Lookup Table)
  • Support for the observation geometry information (solar and viewing angles)
  • Integration with L2A mask products for cloud and shadow detection
  • Quality-aware analysis with cloud, cirrus, and spacecraft flag masks
  • Conversion from radiance (μW/cm²/sr/nm) to top-of-atmosphere reflectance
  • Support for downloading data from NASA DAAC portals
  • Automatic detection and use of appropriate UTM projection

Tutorial example:

API Reference

Module to read EMIT images.

Requires: netCDF4: pip install netCDF4

Some of the functions of this module are based on the official EMIT repo: https://github.com/emit-sds/emit-utils/

EMITImage

Class to read L1B EMIT (Earth Surface Mineral Dust Source Investigation) images.

This class provides functionality to read and manipulate EMIT satellite imagery products. It handles the specific format and metadata of EMIT HDF files, supporting operations like loading radiometry data, masks, and viewing/solar angles.

This class incorporates features from the official EMIT utilities: https://github.com/emit-sds/emit-utils/

Parameters:

Name Type Description Default
filename str

Path to the EMIT .nc file.

required
glt Optional[GeoTensor]

Optional pre-loaded GLT (Geographic Lookup Table). If None, it will be loaded from the file. Defaults to None.

None
band_selection Optional[Union[int, Tuple[int, ...], slice]]

Optional band selection. Defaults to slice(None) (all bands).

slice(None)

Attributes:

Name Type Description
filename str

Path to the EMIT file.

nc_ds Dataset

netCDF4 dataset for the EMIT file.

_nc_ds_obs Optional[Dataset]

netCDF4 dataset for observation data.

_nc_ds_l2amask Optional[Dataset]

netCDF4 dataset for L2A mask.

real_transform Affine

Affine transform for the image.

time_coverage_start datetime

Start time of the acquisition.

time_coverage_end datetime

End time of the acquisition.

dtype

Data type of the radiometry data.

dims Tuple[str]

Names of dimensions ("band", "y", "x").

fill_value_default

Default fill value.

nodata

No data value.

units str

Units of the radiometry data.

glt GeoTensor

Geographic Lookup Table as a GeoTensor.

valid_glt ndarray

Boolean mask of valid GLT values.

glt_relative GeoTensor

Relative Geographic Lookup Table.

window_raw Window

Window for raw data.

bandname_dimension str

Dimension name for bands.

band_selection

Current band selection.

wavelengths ndarray

Wavelengths of the selected bands.

fwhm ndarray

Full Width at Half Maximum for each band.

Examples:

>>> from georeader.readers.emit import EMITImage, download_product
>>> link = 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL1BRAD.001/EMIT_L1B_RAD_001_20220828T051941_2224004_006/EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'
>>> filepath = download_product(link)
>>> emit = EMITImage(filepath)
>>> # Reproject to UTM
>>> crs_utm = georeader.get_utm_epsg(emit.footprint("EPSG:4326"))
>>> emit_utm = emit.to_crs(crs_utm)
>>> # Load as reflectance
>>> reflectance = emit_utm.load(as_reflectance=True)
>>> # Get cloud mask
>>> cloud_mask = emit.nc_ds_l2amask
Source code in georeader/readers/emit.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
class EMITImage:
    """
    Class to read L1B EMIT (Earth Surface Mineral Dust Source Investigation) images.

    This class provides functionality to read and manipulate EMIT satellite imagery products.
    It handles the specific format and metadata of EMIT HDF files, supporting operations
    like loading radiometry data, masks, and viewing/solar angles.

    This class incorporates features from the official EMIT utilities:
    https://github.com/emit-sds/emit-utils/

    Args:
        filename (str): Path to the EMIT .nc file.
        glt (Optional[GeoTensor]): Optional pre-loaded GLT (Geographic Lookup Table).
            If None, it will be loaded from the file. Defaults to None.
        band_selection (Optional[Union[int, Tuple[int, ...], slice]]): Optional band selection.
            Defaults to slice(None) (all bands).

    Attributes:
        filename (str): Path to the EMIT file.
        nc_ds (netCDF4.Dataset): netCDF4 dataset for the EMIT file.
        _nc_ds_obs (Optional[netCDF4.Dataset]): netCDF4 dataset for observation data.
        _nc_ds_l2amask (Optional[netCDF4.Dataset]): netCDF4 dataset for L2A mask.
        real_transform (rasterio.Affine): Affine transform for the image.
        time_coverage_start (datetime): Start time of the acquisition.
        time_coverage_end (datetime): End time of the acquisition.
        dtype: Data type of the radiometry data.
        dims (Tuple[str]): Names of dimensions ("band", "y", "x").
        fill_value_default: Default fill value.
        nodata: No data value.
        units (str): Units of the radiometry data.
        glt (GeoTensor): Geographic Lookup Table as a GeoTensor.
        valid_glt (numpy.ndarray): Boolean mask of valid GLT values.
        glt_relative (GeoTensor): Relative Geographic Lookup Table.
        window_raw (rasterio.windows.Window): Window for raw data.
        bandname_dimension (str): Dimension name for bands.
        band_selection: Current band selection.
        wavelengths (numpy.ndarray): Wavelengths of the selected bands.
        fwhm (numpy.ndarray): Full Width at Half Maximum for each band.

    Examples:
        >>> from georeader.readers.emit import EMITImage, download_product
        >>> link = 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL1BRAD.001/EMIT_L1B_RAD_001_20220828T051941_2224004_006/EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'
        >>> filepath = download_product(link)
        >>> emit = EMITImage(filepath)
        >>> # Reproject to UTM
        >>> crs_utm = georeader.get_utm_epsg(emit.footprint("EPSG:4326"))
        >>> emit_utm = emit.to_crs(crs_utm)
        >>> # Load as reflectance
        >>> reflectance = emit_utm.load(as_reflectance=True)
        >>> # Get cloud mask
        >>> cloud_mask = emit.nc_ds_l2amask
    """
    attributes_set_if_exists = ["_nc_ds_obs", "_mean_sza", "_mean_vza", 
                                "_observation_bands", "_nc_ds_l2amask", "_mask_bands", 
                                "_nc_ds", "obs_file",
                                "l2amaskfile"]
    def __init__(self, filename:str, glt:Optional[GeoTensor]=None, 
                 band_selection:Optional[Union[int, Tuple[int, ...],slice]]=slice(None)):
        self.filename = filename
        self.nc_ds = netCDF4.Dataset(self.filename, 'r', format='NETCDF4')
        self._nc_ds_obs:Optional[netCDF4.Dataset] = None
        self._nc_ds_l2amask:Optional[netCDF4.Dataset] = None
        self._observation_bands = None
        self._mask_bands = None
        self.nc_ds.set_auto_mask(False) # disable automatic masking when reading data
        # self.real_shape = (self.nc_ds['radiance'].shape[-1],) + self.nc_ds['radiance'].shape[:-1]

        self._mean_sza = None
        self._mean_vza = None
        self.obs_file:Optional[str] = None
        self.l2amaskfile:Optional[str] = None

        self.real_transform = rasterio.Affine(self.nc_ds.geotransform[1], self.nc_ds.geotransform[2], self.nc_ds.geotransform[0],
                                              self.nc_ds.geotransform[4], self.nc_ds.geotransform[5], self.nc_ds.geotransform[3])

        self.time_coverage_start = datetime.strptime(self.nc_ds.time_coverage_start, "%Y-%m-%dT%H:%M:%S%z")
        self.time_coverage_end = datetime.strptime(self.nc_ds.time_coverage_end, "%Y-%m-%dT%H:%M:%S%z")

        self.dtype = self.nc_ds['radiance'].dtype
        self.dims = ("band", "y", "x")
        self.fill_value_default = self.nc_ds['radiance']._FillValue
        self.nodata = self.nc_ds['radiance']._FillValue
        self.units = self.nc_ds["radiance"].units

        if glt is None:
            glt_arr = np.zeros((2,) + self.nc_ds.groups['location']['glt_x'].shape, dtype=np.int32)
            glt_arr[0] = np.array(self.nc_ds.groups['location']['glt_x'])
            glt_arr[1] = np.array(self.nc_ds.groups['location']['glt_y'])
            # glt_arr -= 1 # account for 1-based indexing

            # https://rasterio.readthedocs.io/en/stable/api/rasterio.crs.html
            self.glt = GeoTensor(glt_arr, transform=self.real_transform, 
                                 crs=rasterio.crs.CRS.from_wkt(self.nc_ds.spatial_ref),
                                 fill_value_default=0)
        else:
            self.glt = glt

        self.valid_glt = np.all(self.glt.values != self.glt.fill_value_default, axis=0)
        xmin, ymin, xmax, ymax = self._bounds_indexes_raw() # values are 1-based!

        # glt has the absolute indexes of the netCDF object
        # glt_relative has the relative indexes
        self.glt_relative = self.glt.copy()
        self.glt_relative.values[0, self.valid_glt] -= xmin
        self.glt_relative.values[1, self.valid_glt] -= ymin

        self.window_raw = rasterio.windows.Window(col_off=xmin-1, row_off=ymin-1, 
                                                  width=xmax-xmin+1, height=ymax-ymin+1)

        if "wavelengths" in self.nc_ds['sensor_band_parameters'].variables:
            self.bandname_dimension = "wavelengths"
        elif "radiance_wl"  in self.nc_ds['sensor_band_parameters'].variables:
            self.bandname_dimension = "radiance_wl"
        else:
            raise ValueError(f"Cannot find wavelength dimension in {list(self.nc_ds['sensor_band_parameters'].variables.keys())}")

        self.band_selection = band_selection
        self.wavelengths = self.nc_ds['sensor_band_parameters'][self.bandname_dimension][self.band_selection]
        self.fwhm = self.nc_ds['sensor_band_parameters']['fwhm'][self.band_selection]
        self._observation_date_correction_factor:Optional[float] = None

    @property
    def observation_date_correction_factor(self) -> float:
        if self._observation_date_correction_factor is None:
            self._observation_date_correction_factor = reflectance.observation_date_correction_factor(date_of_acquisition=self.time_coverage_start,
                                                                                                      center_coords=self.footprint("EPSG:4326").centroid.coords[0])
        return self._observation_date_correction_factor

    @property
    def crs(self) -> Any:
        return self.glt.crs

    @property
    def shape(self) -> Tuple:
        try:
            n_bands = len(self.wavelengths)
            return  (n_bands,) + self.glt.shape[1:]
        except Exception:
            return self.glt.shape

    @property
    def width(self) -> int:
        return self.shape[-1]

    @property
    def height(self) -> int:
        return self.shape[-2]

    @property
    def transform(self) -> rasterio.Affine:
        return self.glt.transform

    @property
    def res(self) -> Tuple[float, float]:
        return self.glt.res

    @property
    def bounds(self) -> Tuple[float, float, float, float]:
        return self.glt.bounds

    def footprint(self, crs:Optional[str]=None) -> Polygon:
        """
        Get the footprint of the image in the given CRS. If no CRS is given, the footprint is returned in the native CRS.
        This function takes into account the valid_glt mask to compute the footprint.

        Args:
            crs (Optional[str], optional): The CRS to return the footprint in. Defaults to None. 
                If None, the footprint is returned in the native CRS.

        Returns:
            Polygon: The footprint of the image in the given CRS.
        """
        if not hasattr(self, '_pol'):
            from georeader.vectorize import get_polygons
            pols = get_polygons(self.valid_glt, transform=self.transform)
            self._pol = unary_union(pols)
        if crs is not None:
            pol_crs = window_utils.polygon_to_crs(self._pol, self.crs, crs)
        else:
            pol_crs = self._pol

        pol_glt = self.glt.footprint(crs=crs)

        return pol_crs.intersection(pol_glt)

    def set_band_selection(self, band_selection:Optional[Union[int, Tuple[int, ...],slice]]=None):
        """
        Set the band selection. Band selection is absolute w.r.t self.nc_ds['radiance']

        Args:
            band_selection (Optional[Union[int, Tuple[int, ...],slice]], optional): slicing or selection of the bands. Defaults to None.

        Example:
            >>> emit_image.set_band_selection(slice(0, 3)) # will only load the three first bands
            >>> emit_image.wavelengths # will only return the wavelengths of the three first bands
            >>> emit_image.load() # will only load the three first bands
        """
        if band_selection is None:
            band_selection = slice(None)
        self.band_selection = band_selection
        self.wavelengths = self.nc_ds['sensor_band_parameters'][self.bandname_dimension][self.band_selection]
        self.fwhm = self.nc_ds['sensor_band_parameters']['fwhm'][self.band_selection]

    @ property
    def nc_ds_obs(self, obs_file:Optional[str]=None) -> netCDF4.Dataset:
        """
        Loads the observation file. In this file we have information about angles (solar and viewing),
        elevation and ilumination based on elevation and path length.

        This function downloads the observation file if it does not exist from the JPL portal.

        It caches the observation file in the object. (self.nc_ds_obs)

        Args:
            obs_file (Optional[str], optional): Path to the observation file. 
                Defaults to None. If none it will download the observation file 
                from the EMIT server.
        """
        if self._nc_ds_obs is not None:
            return self._nc_ds_obs

        if obs_file is None:
            link_obs_file = get_obs_link(self.filename)
            obs_file = os.path.join(os.path.dirname(self.filename), os.path.basename(link_obs_file))
            if not os.path.exists(obs_file):
                download_product(link_obs_file, obs_file)

        self.obs_file = obs_file
        self._nc_ds_obs = netCDF4.Dataset(obs_file)
        self._nc_ds_obs.set_auto_mask(False)
        self._observation_bands = self._nc_ds_obs['sensor_band_parameters']['observation_bands'][:]
        return self._nc_ds_obs

    @property
    def nc_ds_l2amask(self, l2amaskfile:Optional[str]=None) -> netCDF4.Dataset:
        """
        Loads the L2A mask file. In this file we have information about the cloud mask.

        This function downloads the L2A mask file if it does not exist from the JPL portal.

        It caches the L2A mask file in the object. (self.nc_ds_l2amask)

        See https://lpdaac.usgs.gov/products/emitl2arflv001/ for info about the L2A mask file.

        Args:
            l2amaskfile (Optional[str], optional): Path to the L2A mask file. 
                Defaults to None. If none it will download the L2A mask file 
                from the EMIT server.
        """
        if self._nc_ds_l2amask is not None:
            return self._nc_ds_l2amask

        if l2amaskfile is None:
            link_l2amaskfile = get_l2amask_link(self.filename)
            l2amaskfile = os.path.join(os.path.dirname(self.filename), os.path.basename(link_l2amaskfile))
            if not os.path.exists(l2amaskfile):
                download_product(link_l2amaskfile, l2amaskfile)

        self.l2amaskfile = l2amaskfile
        self._nc_ds_l2amask = netCDF4.Dataset(l2amaskfile)
        self._nc_ds_l2amask.set_auto_mask(False)
        self._mask_bands = self._nc_ds_l2amask["sensor_band_parameters"]["mask_bands"][:]
        return self._nc_ds_l2amask

    @property
    def mask_bands(self) -> np.array:
        """ Returns the mask bands -> ['Cloud flag', 'Cirrus flag', 'Water flag', 'Spacecraft Flag',
       'Dilated Cloud Flag', 'AOD550', 'H2O (g cm-2)', 'Aggregate Flag'] """
        self.nc_ds_l2amask
        return self._mask_bands

    def validmask(self, with_buffer:bool=True) -> GeoTensor:
        """
        Return the validmask mask


        Returns:
            GeoTensor: bool mask. True means that the pixel is valid.
        """

        validmask = ~self.invalid_mask_raw(with_buffer=with_buffer)

        return self.georreference(validmask,
                                  fill_value_default=False)

    def invalid_mask_raw(self, with_buffer:bool=True) -> NDArray:
        """
        Returns the non georreferenced quality mask. True means that the pixel is not valid.

        This mask is computed as the sum of the Cloud flag, Cirrus flag, Spacecraft flag and Dilated Cloud Flag.
        True means that the pixel is not valid.

        From: https://github.com/nasa/EMIT-Data-Resources/blob/main/python/how-tos/How_to_use_EMIT_Quality_data.ipynb
        and https://github.com/nasa/EMIT-Data-Resources/blob/main/python/modules/emit_tools.py#L277


        """
        band_index =  [0,1,3]
        if with_buffer:
            band_index.append(4)

        slice_y, slice_x = self.window_raw.toslices()
        mask_arr = self.nc_ds_l2amask['mask'][slice_y, slice_x, band_index]
        mask_arr = np.sum(mask_arr, axis=-1)
        mask_arr = (mask_arr >= 1)
        return mask_arr

    @property
    def percentage_clear(self) -> float:
        """
        Return the percentage of clear pixels in the image

        Returns:
            float: percentage of clear pixels
        """

        invalids = self.invalid_mask_raw(with_buffer=False)
        return 100 * (1 - np.sum(invalids) / np.prod(invalids.shape))


    def mask(self, mask_name:str="cloud_mask") -> GeoTensor:
        """
        Return the mask layer with the given name.
        Mask shall be one of self.mask_bands -> ['Cloud flag', 'Cirrus flag', 'Water flag', 'Spacecraft Flag',
       'Dilated Cloud Flag', 'AOD550', 'H2O (g cm-2)', 'Aggregate Flag']

        Args:
            mask_name (str, optional): Name of the mask. Defaults to "cloud_mask".

        Returns:
            GeoTensor: mask
        """
        band_index = self.mask_bands.tolist().index(mask_name)
        slice_y, slice_x = self.window_raw.toslices()
        mask_arr = self.nc_ds_l2amask['mask'][slice_y, slice_x, band_index]
        return self.georreference(mask_arr,
                                  fill_value_default=self.nc_ds_l2amask['mask']._FillValue)

    def water_mask(self) -> GeoTensor:
        """ Returns the water mask """
        return self.mask("Water flag")

    @property
    def observation_bands(self) -> np.array:
        """ Returns the observation bands """
        self.nc_ds_obs
        return self._observation_bands

    def observation(self, name:str) -> GeoTensor:
        """ Returns the observation with the given name """
        band_index = self.observation_bands.tolist().index(name)
        slice_y, slice_x = self.window_raw.toslices()
        obs_arr = self.nc_ds_obs['obs'][slice_y, slice_x, band_index]
        return self.georreference(obs_arr, 
                                  fill_value_default=self.nc_ds_obs['obs']._FillValue)

    def sza(self) -> GeoTensor:
        """ Return the solar zenith angle as a GeoTensor """
        return self.observation('To-sun zenith (0 to 90 degrees from zenith)')

    def vza(self) -> GeoTensor:
        """ Return the view zenith angle as a GeoTensor """
        return self.observation('To-sensor zenith (0 to 90 degrees from zenith)')

    def elevation(self) -> GeoTensor:
        obs_arr = self.nc_ds["location"]["elev"]
        slice_y, slice_x = self.window_raw.toslices()
        return self.georreference(obs_arr[slice_y, slice_x], 
                                  fill_value_default=obs_arr._FillValue)

    @property
    def mean_sza(self) -> float:
        """ Return the mean solar zenith angle """
        if self._mean_sza is not None:
            return self._mean_sza

        band_index = self.observation_bands.tolist().index('To-sun zenith (0 to 90 degrees from zenith)')
        sza_arr = self.nc_ds_obs['obs'][..., band_index]
        self._mean_sza = float(np.mean(sza_arr[sza_arr != self.nc_ds_obs['obs']._FillValue]))
        return self._mean_sza

    @property
    def mean_vza(self) -> float:
        """ Return the mean view zenith angle """
        if self._mean_vza is not None:
            return self._mean_vza
        band_index = self.observation_bands.tolist().index('To-sensor zenith (0 to 90 degrees from zenith)')
        vza_arr = self.nc_ds_obs['obs'][..., band_index]
        self._mean_vza = float(np.mean(vza_arr[vza_arr != self.nc_ds_obs['obs']._FillValue]))
        return self._mean_vza

    def __copy__(self) -> '__class__':
        out = EMITImage(self.filename, glt=self.glt.copy(), band_selection=self.band_selection)

        # copy nc_ds_obs if it exists
        for attrname in self.attributes_set_if_exists:
            if hasattr(self, attrname):
                setattr(out, attrname, getattr(self, attrname))

        return out
    def copy(self) -> '__class__':
        return self.__copy__()

    def to_crs(self, crs:Any="UTM", 
               resolution_dst_crs:Optional[Union[float, Tuple[float, float]]]=60) -> '__class__':
        """
        Reproject the image to a new crs

        Args:
            crs (Any): CRS. 

        Returns:
            EmitImage: EMIT image in the new CRS

        Example:
            >>> emit_image = EMITImage("path/to/emit_image.nc")
            >>> emit_image_utm = emit_image.to_crs(crs="UTM")
        """
        if crs == "UTM":
            footprint = self.glt.footprint("EPSG:4326")
            crs = get_utm_epsg(footprint)

        glt = read.read_to_crs(self.glt, crs, resampling=rasterio.warp.Resampling.nearest, 
                               resolution_dst_crs=resolution_dst_crs)

        out = EMITImage(self.filename, glt=glt, band_selection=self.band_selection)
        # Copy _pol attribute if it exists
        if hasattr(self, '_pol'):
            setattr(out, '_pol', window_utils.polygon_to_crs(self._pol, self.crs, crs))

        return out


    def read_from_window(self, window:Optional[rasterio.windows.Window]=None, boundless:bool=True) -> '__class__':
        glt_window = self.glt.read_from_window(window, boundless=boundless)
        out = EMITImage(self.filename, glt=glt_window, band_selection=self.band_selection)

        # copy attributes as in __copy__ method
        for attrname in self.attributes_set_if_exists:
            if hasattr(self, attrname):
                setattr(out, attrname, self.nc_ds_obs)

        return out

    def read_from_bands(self, bands:Union[int, Tuple[int, ...], slice]) -> '__class__':
        copy = self.__copy__()
        copy.set_band_selection(bands)
        return copy

    def load(self, boundless:bool=True, as_reflectance:bool=False)-> GeoTensor:
        data = self.load_raw() # (C, H, W) or (H, W)
        if as_reflectance:
            invalids = np.isnan(data) | (data == self.fill_value_default)
            thuiller = reflectance.load_thuillier_irradiance()
            response = reflectance.srf(self.wavelengths, self.fwhm, thuiller["Nanometer"].values)
            solar_irradiance_norm = thuiller["Radiance(mW/m2/nm)"].values.dot(response) / 1_000
            data = reflectance.radiance_to_reflectance(data, solar_irradiance_norm,
                                                       units=self.units,
                                                       observation_date_corr_factor=self.observation_date_correction_factor)
            data[invalids] = self.fill_value_default
        return self.georreference(data, fill_value_default=self.fill_value_default)

    def load_rgb(self, as_reflectance:bool=True) -> GeoTensor:
        bands_read = np.argmin(np.abs(WAVELENGTHS_RGB[:, np.newaxis] - self.wavelengths), axis=1).tolist()
        ei_rgb = self.read_from_bands(bands_read)
        return ei_rgb.load(boundless=True, as_reflectance=as_reflectance)

    @property
    def shape_raw(self) -> Tuple[int, int, int]:
        """ Return the shape of the raw data in (C, H, W) format """
        return (len(self.wavelengths),) + rasterio.windows.shape(self.window_raw)

    def _bounds_indexes_raw(self) -> Tuple[int, int, int, int]:
        """ Return the bounds of the raw data: (min_x, min_y, max_x, max_y) """
        return _bounds_indexes_raw(self.glt.values, self.valid_glt)


    def load_raw(self, transpose:bool=True) -> np.array:
        """
        Load the raw data, without orthorectification

        Args:
            transpose (bool, optional): Transpose the data if it has 3 dimentsions to (C, H, W) 
                Defaults to True. if False return (H, W, C)

        Returns:
            np.array: raw data (C, H, W) or (H, W)
        """

        slice_y, slice_x = self.window_raw.toslices()

        if isinstance(self.band_selection, slice):
            data = np.array(self.nc_ds['radiance'][slice_y, slice_x, self.band_selection])
        else:
            data = np.array(self.nc_ds['radiance'][slice_y, slice_x][..., self.band_selection])

        # transpose to (C, H, W)
        if transpose and (len(data.shape) == 3):
            data = np.transpose(data, axes=(2, 0, 1))

        return data


    def georreference(self, data:np.array, 
                      fill_value_default:Optional[Union[int,float]]=None) -> GeoTensor:
        """
        Georreference an image in sensor coordinates to coordinates of the current 
        georreferenced object. If you do some processing with the raw data, you can 
        georreference the raw output with this function.

        Args:
            data (np.array): raw data (C, H, W) or (H, W). 

        Returns:
            GeoTensor: georreferenced version of data (C, H', W') or (H', W')

        Example:
            >>> emit_image = EMITImage("path/to/emit_image.nc")
            >>> emit_image_rgb = emit_image.read_from_bands([35, 23, 11])
            >>> data_rgb = emit_image_rgb.load_raw() # (3, H, W)
            >>> data_rgb_ortho = emit_image.georreference(data_rgb) # (3, H', W')
        """
        return georreference(self.glt_relative, data, self.valid_glt, 
                             fill_value_default=fill_value_default)


    @property
    def values(self) -> np.array:
        # return np.zeros(self.shape, dtype=self.dtype)
        raise self.load(boundless=True).values

    def __repr__(self)->str:
        return f""" 
         File: {self.filename}
         Transform: {self.transform}
         Shape: {self.shape}
         Resolution: {self.res}
         Bounds: {self.bounds}
         CRS: {self.crs}
         units: {self.units}
        """

mask_bands: np.array property

Returns the mask bands -> ['Cloud flag', 'Cirrus flag', 'Water flag', 'Spacecraft Flag', 'Dilated Cloud Flag', 'AOD550', 'H2O (g cm-2)', 'Aggregate Flag']

mean_sza: float property

Return the mean solar zenith angle

mean_vza: float property

Return the mean view zenith angle

nc_ds_l2amask: netCDF4.Dataset property

Loads the L2A mask file. In this file we have information about the cloud mask.

This function downloads the L2A mask file if it does not exist from the JPL portal.

It caches the L2A mask file in the object. (self.nc_ds_l2amask)

See https://lpdaac.usgs.gov/products/emitl2arflv001/ for info about the L2A mask file.

Parameters:

Name Type Description Default
l2amaskfile Optional[str]

Path to the L2A mask file. Defaults to None. If none it will download the L2A mask file from the EMIT server.

required

nc_ds_obs: netCDF4.Dataset property

Loads the observation file. In this file we have information about angles (solar and viewing), elevation and ilumination based on elevation and path length.

This function downloads the observation file if it does not exist from the JPL portal.

It caches the observation file in the object. (self.nc_ds_obs)

Parameters:

Name Type Description Default
obs_file Optional[str]

Path to the observation file. Defaults to None. If none it will download the observation file from the EMIT server.

required

observation_bands: np.array property

Returns the observation bands

percentage_clear: float property

Return the percentage of clear pixels in the image

Returns:

Name Type Description
float float

percentage of clear pixels

shape_raw: Tuple[int, int, int] property

Return the shape of the raw data in (C, H, W) format

footprint(crs=None)

Get the footprint of the image in the given CRS. If no CRS is given, the footprint is returned in the native CRS. This function takes into account the valid_glt mask to compute the footprint.

Parameters:

Name Type Description Default
crs Optional[str]

The CRS to return the footprint in. Defaults to None. If None, the footprint is returned in the native CRS.

None

Returns:

Name Type Description
Polygon Polygon

The footprint of the image in the given CRS.

Source code in georeader/readers/emit.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def footprint(self, crs:Optional[str]=None) -> Polygon:
    """
    Get the footprint of the image in the given CRS. If no CRS is given, the footprint is returned in the native CRS.
    This function takes into account the valid_glt mask to compute the footprint.

    Args:
        crs (Optional[str], optional): The CRS to return the footprint in. Defaults to None. 
            If None, the footprint is returned in the native CRS.

    Returns:
        Polygon: The footprint of the image in the given CRS.
    """
    if not hasattr(self, '_pol'):
        from georeader.vectorize import get_polygons
        pols = get_polygons(self.valid_glt, transform=self.transform)
        self._pol = unary_union(pols)
    if crs is not None:
        pol_crs = window_utils.polygon_to_crs(self._pol, self.crs, crs)
    else:
        pol_crs = self._pol

    pol_glt = self.glt.footprint(crs=crs)

    return pol_crs.intersection(pol_glt)

georreference(data, fill_value_default=None)

Georreference an image in sensor coordinates to coordinates of the current georreferenced object. If you do some processing with the raw data, you can georreference the raw output with this function.

Parameters:

Name Type Description Default
data array

raw data (C, H, W) or (H, W).

required

Returns:

Name Type Description
GeoTensor GeoTensor

georreferenced version of data (C, H', W') or (H', W')

Example

emit_image = EMITImage("path/to/emit_image.nc") emit_image_rgb = emit_image.read_from_bands([35, 23, 11]) data_rgb = emit_image_rgb.load_raw() # (3, H, W) data_rgb_ortho = emit_image.georreference(data_rgb) # (3, H', W')

Source code in georeader/readers/emit.py
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
def georreference(self, data:np.array, 
                  fill_value_default:Optional[Union[int,float]]=None) -> GeoTensor:
    """
    Georreference an image in sensor coordinates to coordinates of the current 
    georreferenced object. If you do some processing with the raw data, you can 
    georreference the raw output with this function.

    Args:
        data (np.array): raw data (C, H, W) or (H, W). 

    Returns:
        GeoTensor: georreferenced version of data (C, H', W') or (H', W')

    Example:
        >>> emit_image = EMITImage("path/to/emit_image.nc")
        >>> emit_image_rgb = emit_image.read_from_bands([35, 23, 11])
        >>> data_rgb = emit_image_rgb.load_raw() # (3, H, W)
        >>> data_rgb_ortho = emit_image.georreference(data_rgb) # (3, H', W')
    """
    return georreference(self.glt_relative, data, self.valid_glt, 
                         fill_value_default=fill_value_default)

invalid_mask_raw(with_buffer=True)

Returns the non georreferenced quality mask. True means that the pixel is not valid.

This mask is computed as the sum of the Cloud flag, Cirrus flag, Spacecraft flag and Dilated Cloud Flag. True means that the pixel is not valid.

From: https://github.com/nasa/EMIT-Data-Resources/blob/main/python/how-tos/How_to_use_EMIT_Quality_data.ipynb and https://github.com/nasa/EMIT-Data-Resources/blob/main/python/modules/emit_tools.py#L277

Source code in georeader/readers/emit.py
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def invalid_mask_raw(self, with_buffer:bool=True) -> NDArray:
    """
    Returns the non georreferenced quality mask. True means that the pixel is not valid.

    This mask is computed as the sum of the Cloud flag, Cirrus flag, Spacecraft flag and Dilated Cloud Flag.
    True means that the pixel is not valid.

    From: https://github.com/nasa/EMIT-Data-Resources/blob/main/python/how-tos/How_to_use_EMIT_Quality_data.ipynb
    and https://github.com/nasa/EMIT-Data-Resources/blob/main/python/modules/emit_tools.py#L277


    """
    band_index =  [0,1,3]
    if with_buffer:
        band_index.append(4)

    slice_y, slice_x = self.window_raw.toslices()
    mask_arr = self.nc_ds_l2amask['mask'][slice_y, slice_x, band_index]
    mask_arr = np.sum(mask_arr, axis=-1)
    mask_arr = (mask_arr >= 1)
    return mask_arr

load_raw(transpose=True)

Load the raw data, without orthorectification

Parameters:

Name Type Description Default
transpose bool

Transpose the data if it has 3 dimentsions to (C, H, W) Defaults to True. if False return (H, W, C)

True

Returns:

Type Description
array

np.array: raw data (C, H, W) or (H, W)

Source code in georeader/readers/emit.py
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
def load_raw(self, transpose:bool=True) -> np.array:
    """
    Load the raw data, without orthorectification

    Args:
        transpose (bool, optional): Transpose the data if it has 3 dimentsions to (C, H, W) 
            Defaults to True. if False return (H, W, C)

    Returns:
        np.array: raw data (C, H, W) or (H, W)
    """

    slice_y, slice_x = self.window_raw.toslices()

    if isinstance(self.band_selection, slice):
        data = np.array(self.nc_ds['radiance'][slice_y, slice_x, self.band_selection])
    else:
        data = np.array(self.nc_ds['radiance'][slice_y, slice_x][..., self.band_selection])

    # transpose to (C, H, W)
    if transpose and (len(data.shape) == 3):
        data = np.transpose(data, axes=(2, 0, 1))

    return data

mask(mask_name='cloud_mask')

Return the mask layer with the given name. Mask shall be one of self.mask_bands -> ['Cloud flag', 'Cirrus flag', 'Water flag', 'Spacecraft Flag', 'Dilated Cloud Flag', 'AOD550', 'H2O (g cm-2)', 'Aggregate Flag']

Args: mask_name (str, optional): Name of the mask. Defaults to "cloud_mask".

Returns: GeoTensor: mask

Source code in georeader/readers/emit.py
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def mask(self, mask_name:str="cloud_mask") -> GeoTensor:
    """
    Return the mask layer with the given name.
    Mask shall be one of self.mask_bands -> ['Cloud flag', 'Cirrus flag', 'Water flag', 'Spacecraft Flag',
   'Dilated Cloud Flag', 'AOD550', 'H2O (g cm-2)', 'Aggregate Flag']

    Args:
        mask_name (str, optional): Name of the mask. Defaults to "cloud_mask".

    Returns:
        GeoTensor: mask
    """
    band_index = self.mask_bands.tolist().index(mask_name)
    slice_y, slice_x = self.window_raw.toslices()
    mask_arr = self.nc_ds_l2amask['mask'][slice_y, slice_x, band_index]
    return self.georreference(mask_arr,
                              fill_value_default=self.nc_ds_l2amask['mask']._FillValue)

observation(name)

Returns the observation with the given name

Source code in georeader/readers/emit.py
604
605
606
607
608
609
610
def observation(self, name:str) -> GeoTensor:
    """ Returns the observation with the given name """
    band_index = self.observation_bands.tolist().index(name)
    slice_y, slice_x = self.window_raw.toslices()
    obs_arr = self.nc_ds_obs['obs'][slice_y, slice_x, band_index]
    return self.georreference(obs_arr, 
                              fill_value_default=self.nc_ds_obs['obs']._FillValue)

set_band_selection(band_selection=None)

Set the band selection. Band selection is absolute w.r.t self.nc_ds['radiance']

Parameters:

Name Type Description Default
band_selection Optional[Union[int, Tuple[int, ...], slice]]

slicing or selection of the bands. Defaults to None.

None
Example

emit_image.set_band_selection(slice(0, 3)) # will only load the three first bands emit_image.wavelengths # will only return the wavelengths of the three first bands emit_image.load() # will only load the three first bands

Source code in georeader/readers/emit.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def set_band_selection(self, band_selection:Optional[Union[int, Tuple[int, ...],slice]]=None):
    """
    Set the band selection. Band selection is absolute w.r.t self.nc_ds['radiance']

    Args:
        band_selection (Optional[Union[int, Tuple[int, ...],slice]], optional): slicing or selection of the bands. Defaults to None.

    Example:
        >>> emit_image.set_band_selection(slice(0, 3)) # will only load the three first bands
        >>> emit_image.wavelengths # will only return the wavelengths of the three first bands
        >>> emit_image.load() # will only load the three first bands
    """
    if band_selection is None:
        band_selection = slice(None)
    self.band_selection = band_selection
    self.wavelengths = self.nc_ds['sensor_band_parameters'][self.bandname_dimension][self.band_selection]
    self.fwhm = self.nc_ds['sensor_band_parameters']['fwhm'][self.band_selection]

sza()

Return the solar zenith angle as a GeoTensor

Source code in georeader/readers/emit.py
612
613
614
def sza(self) -> GeoTensor:
    """ Return the solar zenith angle as a GeoTensor """
    return self.observation('To-sun zenith (0 to 90 degrees from zenith)')

to_crs(crs='UTM', resolution_dst_crs=60)

Reproject the image to a new crs

Parameters:

Name Type Description Default
crs Any

CRS.

'UTM'

Returns:

Name Type Description
EmitImage __class__

EMIT image in the new CRS

Example

emit_image = EMITImage("path/to/emit_image.nc") emit_image_utm = emit_image.to_crs(crs="UTM")

Source code in georeader/readers/emit.py
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def to_crs(self, crs:Any="UTM", 
           resolution_dst_crs:Optional[Union[float, Tuple[float, float]]]=60) -> '__class__':
    """
    Reproject the image to a new crs

    Args:
        crs (Any): CRS. 

    Returns:
        EmitImage: EMIT image in the new CRS

    Example:
        >>> emit_image = EMITImage("path/to/emit_image.nc")
        >>> emit_image_utm = emit_image.to_crs(crs="UTM")
    """
    if crs == "UTM":
        footprint = self.glt.footprint("EPSG:4326")
        crs = get_utm_epsg(footprint)

    glt = read.read_to_crs(self.glt, crs, resampling=rasterio.warp.Resampling.nearest, 
                           resolution_dst_crs=resolution_dst_crs)

    out = EMITImage(self.filename, glt=glt, band_selection=self.band_selection)
    # Copy _pol attribute if it exists
    if hasattr(self, '_pol'):
        setattr(out, '_pol', window_utils.polygon_to_crs(self._pol, self.crs, crs))

    return out

validmask(with_buffer=True)

Return the validmask mask

Returns:

Name Type Description
GeoTensor GeoTensor

bool mask. True means that the pixel is valid.

Source code in georeader/readers/emit.py
527
528
529
530
531
532
533
534
535
536
537
538
539
def validmask(self, with_buffer:bool=True) -> GeoTensor:
    """
    Return the validmask mask


    Returns:
        GeoTensor: bool mask. True means that the pixel is valid.
    """

    validmask = ~self.invalid_mask_raw(with_buffer=with_buffer)

    return self.georreference(validmask,
                              fill_value_default=False)

vza()

Return the view zenith angle as a GeoTensor

Source code in georeader/readers/emit.py
616
617
618
def vza(self) -> GeoTensor:
    """ Return the view zenith angle as a GeoTensor """
    return self.observation('To-sensor zenith (0 to 90 degrees from zenith)')

water_mask()

Returns the water mask

Source code in georeader/readers/emit.py
594
595
596
def water_mask(self) -> GeoTensor:
    """ Returns the water mask """
    return self.mask("Water flag")

download_product(link_down, filename=None, display_progress_bar=True, auth=None)

Download a product from the EMIT website (https://search.earthdata.nasa.gov/search). It requires that you have an account in the NASA Earthdata portal.

This code is based on this example: https://git.earthdata.nasa.gov/projects/LPDUR/repos/daac_data_download_python/browse

Parameters:

Name Type Description Default
link_down str

link to the product

required
filename Optional[str]

filename to save the product

None
display_progress_bar bool

display tqdm progress bar

True
auth Optional[Tuple[str, str]]

tuple with user and password to download the product. If None, it will try to read the user and password from ~/.georeader/auth_emit.json

None
Example

link_down = 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL1BRAD.001/EMIT_L1B_RAD_001_20220828T051941_2224004_006/EMIT_L1B_RAD_001_20220828T051941_2224004_006.nc' filename = download_product(link_down)

Source code in georeader/readers/emit.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def download_product(link_down:str, filename:Optional[str]=None,
                     display_progress_bar:bool=True,
                     auth:Optional[Tuple[str, str]] = None) -> str:
    """
    Download a product from the EMIT website (https://search.earthdata.nasa.gov/search). 
    It requires that you have an account in the NASA Earthdata portal. 

    This code is based on this example: https://git.earthdata.nasa.gov/projects/LPDUR/repos/daac_data_download_python/browse

    Args:
        link_down: link to the product
        filename: filename to save the product
        display_progress_bar: display tqdm progress bar
        auth: tuple with user and password to download the product. If None, it will try to read the user and password from ~/.georeader/auth_emit.json 

    Example:
        >>> link_down = 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL1BRAD.001/EMIT_L1B_RAD_001_20220828T051941_2224004_006/EMIT_L1B_RAD_001_20220828T051941_2224004_006.nc'
        >>> filename = download_product(link_down)
    """
    headers = None
    if auth is None:
        if AUTH_METHOD == "auth":
            auth = get_auth()
        elif AUTH_METHOD == "token":
            assert TOKEN is not None, "You need to set the TOKEN variable to download EMIT images"
            headers = get_headers()

    return download_product_base(link_down, filename=filename, auth=auth,
                                 headers=headers,
                                 display_progress_bar=display_progress_bar, 
                                 verify=False)

Get the link to download a product from the EMIT website. See: https://git.earthdata.nasa.gov/projects/LPDUR/repos/daac_data_download_python/browse

Parameters:

Name Type Description Default
product_path str

path to the product or filename of the product or product name with or without extension. e.g. 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'

required
Example

product_path = 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc' link = get_radiance_link(product_path) 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL1BRAD.001/EMIT_L1B_RAD_001_20220827T060753_2223904_013/EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'

Source code in georeader/readers/emit.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def get_radiance_link(product_path:str) -> str:
    """
    Get the link to download a product from the EMIT website.
    See: https://git.earthdata.nasa.gov/projects/LPDUR/repos/daac_data_download_python/browse

    Args:
        product_path: path to the product or filename of the product or product name with or without extension.
            e.g. 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'

    Example:
        >>> product_path = 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'
        >>> link = get_radiance_link(product_path)
        'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL1BRAD.001/EMIT_L1B_RAD_001_20220827T060753_2223904_013/EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'
    """
    "EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc"
    namefile = os.path.splitext(os.path.basename(product_path))[0]
    product_id = os.path.splitext(namefile)[0]
    content_id = product_id.split("_")
    content_id[1] = "L1B"
    content_id[2] = "RAD"
    content_id[3] = content_id[3].replace("V", "")
    product_id = "_".join(content_id)
    link = f"https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL1BRAD.001/{product_id}/{product_id}.nc"
    return link

Get the link to download a product from the EMIT website. See: https://git.earthdata.nasa.gov/projects/LPDUR/repos/daac_data_download_python/browse

Parameters:

Name Type Description Default
product_path str

path to the product or filename of the product with or without extension. e.g. 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'

required
Example

product_path = 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc' link = get_radiance_link(product_path) 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL1BRAD.001/EMIT_L1B_RAD_001_20220827T060753_2223904_013/EMIT_L1B_OBS_001_20220827T060753_2223904_013.nc'

Source code in georeader/readers/emit.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def get_obs_link(product_path:str) -> str:
    """
    Get the link to download a product from the EMIT website.
    See: https://git.earthdata.nasa.gov/projects/LPDUR/repos/daac_data_download_python/browse

    Args:
        product_path: path to the product or filename of the product with or without extension.
            e.g. 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'

    Example:
        >>> product_path = 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'
        >>> link = get_radiance_link(product_path)
        'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL1BRAD.001/EMIT_L1B_RAD_001_20220827T060753_2223904_013/EMIT_L1B_OBS_001_20220827T060753_2223904_013.nc'
    """
    namefile = os.path.splitext(os.path.basename(product_path))[0]

    product_id = os.path.splitext(namefile)[0]
    content_id = product_id.split("_")
    content_id[1] = "L1B"
    content_id[2] = "RAD"
    content_id[3] = content_id[3].replace("V", "")
    product_id = "_".join(content_id)

    content_id[2] = "OBS"
    namefile = "_".join(content_id)

    link = f"https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL1BRAD.001/{product_id}/{namefile}.nc"
    return link

Get the link to download a product from the EMIT website. See: https://git.earthdata.nasa.gov/projects/LPDUR/repos/daac_data_download_python/browse

Parameters:

Name Type Description Default
product_path

path to the product or filename of the product with or without extension. e.g. 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'

required
Example

product_path = 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc' link = get_radiance_link(product_path) 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL2BCH4ENH.001/EMIT_L2B_CH4ENH_001_20220810T064957_2222205_033/EMIT_L2B_CH4ENH_001_20220810T064957_2222205_033.tif'

Source code in georeader/readers/emit.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def get_ch4enhancement_link(tile:str) -> str:
    """
    Get the link to download a product from the EMIT website.
    See: https://git.earthdata.nasa.gov/projects/LPDUR/repos/daac_data_download_python/browse

    Args:
        product_path: path to the product or filename of the product with or without extension.
            e.g. 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'

    Example:
        >>> product_path = 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'
        >>> link = get_radiance_link(product_path)
        'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL2BCH4ENH.001/EMIT_L2B_CH4ENH_001_20220810T064957_2222205_033/EMIT_L2B_CH4ENH_001_20220810T064957_2222205_033.tif'
    """
    namefile = os.path.splitext(os.path.basename(tile))[0]

    product_id = os.path.splitext(namefile)[0]
    content_id = product_id.split("_")
    content_id[1] = "L2B"
    content_id[2] = "CH4ENH"
    content_id[3] = content_id[3].replace("V", "")
    product_id = "_".join(content_id)
    link = f"https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL2BCH4ENH.001/{product_id}/{product_id}.tif"
    return link

Get the link to download a product from the EMIT website (https://search.earthdata.nasa.gov/search)

Parameters:

Name Type Description Default
tile str

path to the product or filename of the L1B product with or without extension. e.g. 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'

required

Returns:

Name Type Description
str str

link to the L2A mask product

Example

tile = 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc' link = get_l2amask_link(tile) 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL2ARFL.001/EMIT_L2A_RFL_001_20220827T060753_2223904_013/EMIT_L2A_MASK_001_20220827T060753_2223904_013.nc'

Source code in georeader/readers/emit.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def get_l2amask_link(tile: str) -> str:
    """
    Get the link to download a product from the EMIT website (https://search.earthdata.nasa.gov/search)

    Args:
        tile (str): path to the product or filename of the L1B product with or without extension.
            e.g. 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'

    Returns:
        str: link to the L2A mask product

    Example:
        >>> tile = 'EMIT_L1B_RAD_001_20220827T060753_2223904_013.nc'
        >>> link = get_l2amask_link(tile)
        'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL2ARFL.001/EMIT_L2A_RFL_001_20220827T060753_2223904_013/EMIT_L2A_MASK_001_20220827T060753_2223904_013.nc'
    """
    namefile = os.path.splitext(os.path.basename(tile))[0]
    namefile = namefile + ".nc"

    product_id = os.path.splitext(namefile)[0]
    content_id = product_id.split("_")
    content_id[1] = "L2A"
    content_id[2] = "RFL"
    content_id[3] = content_id[3].replace("V", "")
    product_id = "_".join(content_id)

    content_id[2] = "MASK"
    namefilenew = "_".join(content_id) + ".nc"
    link = f"https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/EMITL2ARFL.001/{product_id}/{namefilenew}"
    return link

valid_mask(filename, with_buffer=False, dst_crs='UTM', resolution_dst_crs=60)

Loads the valid mask from the EMIT L2AMASK file.

Parameters:

Name Type Description Default
filename str

path to the L2AMASK file. e.g. EMIT_L2A_MASK_001_20220827T060753_2223904_013.nc

required
with_buffer bool

If True, the buffer band is used to compute the valid mask. Defaults to False.

False

Returns:

Name Type Description
GeoTensor Tuple[GeoTensor, float]

valid mask

Source code in georeader/readers/emit.py
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
def valid_mask(filename:str, with_buffer:bool=False, 
               dst_crs:Optional[Any]="UTM", 
               resolution_dst_crs:Optional[Union[float, Tuple[float, float]]]=60) -> Tuple[GeoTensor, float]:
    """
    Loads the valid mask from the EMIT L2AMASK file.

    Args:
        filename (str): path to the L2AMASK file. e.g. EMIT_L2A_MASK_001_20220827T060753_2223904_013.nc
        with_buffer (bool, optional): If True, the buffer band is used to compute the valid mask. Defaults to False.

    Returns:
        GeoTensor: valid mask
    """

    nc_ds = netCDF4.Dataset(filename, 'r', format='NETCDF4')
    nc_ds.set_auto_mask(False)

    real_transform = rasterio.Affine(nc_ds.geotransform[1], nc_ds.geotransform[2], nc_ds.geotransform[0],
                                     nc_ds.geotransform[4], nc_ds.geotransform[5], nc_ds.geotransform[3])

    glt_arr = np.zeros((2,) + nc_ds.groups['location']['glt_x'].shape, dtype=np.int32)
    glt_arr[0] = np.array(nc_ds.groups['location']['glt_x'])
    glt_arr[1] = np.array(nc_ds.groups['location']['glt_y'])
    # glt_arr -= 1 # account for 1-based indexing

    # https://rasterio.readthedocs.io/en/stable/api/rasterio.crs.html
    glt = GeoTensor(glt_arr, transform=real_transform, 
                    crs=rasterio.crs.CRS.from_wkt(nc_ds.spatial_ref),
                    fill_value_default=0)

    if dst_crs is not None:
        if dst_crs == "UTM":
            footprint = glt.footprint("EPSG:4326")
            dst_crs = get_utm_epsg(footprint)

        glt = read.read_to_crs(glt, dst_crs=dst_crs, 
                               resampling=rasterio.warp.Resampling.nearest, 
                               resolution_dst_crs=resolution_dst_crs)

    valid_glt = np.all(glt.values != glt.fill_value_default, axis=0)
    xmin = np.min(glt.values[0, valid_glt])
    ymin = np.min(glt.values[1, valid_glt])

    glt_relative = glt.copy()
    glt_relative.values[0, valid_glt] -= xmin
    glt_relative.values[1, valid_glt] -= ymin
    # mask_bands = nc_ds["sensor_band_parameters"]["mask_bands"][:]

    band_index =  [0,1,3]
    if with_buffer:
        band_index.append(4)

    mask_arr = nc_ds['mask'][:, :, band_index]
    invalidmask_raw = np.sum(mask_arr, axis=-1)
    invalidmask_raw = (invalidmask_raw >= 1)

    validmask = ~invalidmask_raw

    percentage_clear = 100 * (np.sum(validmask) / np.prod(validmask.shape))

    return georreference(glt_relative, validmask, valid_glt,
                         fill_value_default=False), percentage_clear

EnMAP Reader

The EnMAP (Environmental Mapping and Analysis Program) reader processes data from the German hyperspectral satellite mission. This reader works with Level 1B radiometrically calibrated data (not atmospherically corrected) that contains radiance values in physical units.

Key features:

  • Reading L1B hyperspectral radiance data from GeoTIFF format with accompanying XML metadata
  • Working with separate VNIR (420-1000 nm) and SWIR (900-2450 nm) spectral ranges
  • Support for 228 spectral channels with 6.5 nm (VNIR) and 10 nm (SWIR) sampling
  • Integration with Rational Polynomial Coefficients (RPCs) for accurate geometric correction
  • Conversion from radiance (mW/m²/sr/nm) to top-of-atmosphere reflectance
  • Access to solar illumination and viewing geometry for radiometric calculations
  • Support for quality masks

Tutorial example:

API Reference

EnMAP

EnMAP (Environmental Mapping and Analysis Program) image reader.

This class provides functionality to read and manipulate EnMAP satellite imagery products. It handles the specific format and metadata of EnMAP files, supporting operations like loading specific wavelengths, RGB composites, and converting to reflectance.

Parameters:

Name Type Description Default
xml_file str

Path to the EnMAP XML metadata file.

required
by_folder bool

If True, files are organized by folder structure. Defaults to False.

False
window_focus Optional[Window]

Window to focus on a specific region of the image. Defaults to None (entire image).

None
fs Optional[AbstractFileSystem]

Filesystem to use for file access. Defaults to None (local filesystem).

None

Attributes:

Name Type Description
xml_file str

Path to the XML metadata file.

by_folder bool

Whether files are organized by folder structure.

swir_file str

Path to the SWIR image file.

fs AbstractFileSystem

Filesystem used for file access.

swir RasterioReader

Reader for SWIR image.

vnir RasterioReader

Reader for VNIR image.

wl_center Dict

Dictionary of center wavelengths for each band.

wl_fwhm Dict

Dictionary of FWHM (Full Width at Half Maximum) for each band.

hsf float

Mean ground elevation.

sza float

Solar zenith angle.

saa float

Solar azimuth angle.

vaa float

Viewing azimuth angle.

vza float

Viewing zenith angle.

gain_arr Dict

Dictionary of gain values for each band.

offs_arr Dict

Dictionary of offset values for each band.

rpcs_vnir Dict

Rational Polynomial Coefficients for VNIR.

rpcs_swir Dict

Rational Polynomial Coefficients for SWIR.

swir_range Tuple[float, float]

Wavelength range of SWIR bands.

vnir_range Tuple[float, float]

Wavelength range of VNIR bands.

units str

Units of radiance data (mW/m2/sr/nm).

time_coverage_start datetime

Start time of acquisition.

time_coverage_end datetime

End time of acquisition.

Examples:

>>> # Initialize the EnMAP reader with a data path
>>> enmap = EnMAP('/path/to/enmap_metadata.xml')
>>> # Load RGB bands as reflectance with RPCs applied
>>> rgb = enmap.load_rgb(as_reflectance=True, apply_rpcs=True)
>>> # Load specific wavelengths
>>> bands = enmap.load_wavelengths([850, 1600, 2200], as_reflectance=True)
>>> # Load cloud mask or other quality products
>>> clouds = enmap.load_product('QL_QUALITY_CLOUD')
Source code in georeader/readers/enmap.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
class EnMAP:
    """
    EnMAP (Environmental Mapping and Analysis Program) image reader.

    This class provides functionality to read and manipulate EnMAP satellite imagery products.
    It handles the specific format and metadata of EnMAP files, supporting operations
    like loading specific wavelengths, RGB composites, and converting to reflectance.

    Args:
        xml_file (str): Path to the EnMAP XML metadata file.
        by_folder (bool): If True, files are organized by folder structure.
            Defaults to False.
        window_focus (Optional[Window]): Window to focus on a specific
            region of the image. Defaults to None (entire image).
        fs (Optional[fsspec.AbstractFileSystem]): Filesystem to use for file access.
            Defaults to None (local filesystem).

    Attributes:
        xml_file (str): Path to the XML metadata file.
        by_folder (bool): Whether files are organized by folder structure.
        swir_file (str): Path to the SWIR image file.
        fs (fsspec.AbstractFileSystem): Filesystem used for file access.
        swir (RasterioReader): Reader for SWIR image.
        vnir (RasterioReader): Reader for VNIR image.
        wl_center (Dict): Dictionary of center wavelengths for each band.
        wl_fwhm (Dict): Dictionary of FWHM (Full Width at Half Maximum) for each band.
        hsf (float): Mean ground elevation.
        sza (float): Solar zenith angle.
        saa (float): Solar azimuth angle.
        vaa (float): Viewing azimuth angle.
        vza (float): Viewing zenith angle.
        gain_arr (Dict): Dictionary of gain values for each band.
        offs_arr (Dict): Dictionary of offset values for each band.
        rpcs_vnir: Rational Polynomial Coefficients for VNIR.
        rpcs_swir: Rational Polynomial Coefficients for SWIR.
        swir_range (Tuple[float, float]): Wavelength range of SWIR bands.
        vnir_range (Tuple[float, float]): Wavelength range of VNIR bands.
        units (str): Units of radiance data (mW/m2/sr/nm).
        time_coverage_start (datetime): Start time of acquisition.
        time_coverage_end (datetime): End time of acquisition.

    Examples:
        >>> # Initialize the EnMAP reader with a data path
        >>> enmap = EnMAP('/path/to/enmap_metadata.xml')
        >>> # Load RGB bands as reflectance with RPCs applied
        >>> rgb = enmap.load_rgb(as_reflectance=True, apply_rpcs=True)
        >>> # Load specific wavelengths
        >>> bands = enmap.load_wavelengths([850, 1600, 2200], as_reflectance=True)
        >>> # Load cloud mask or other quality products
        >>> clouds = enmap.load_product('QL_QUALITY_CLOUD')
    """

    def __init__(
        self,
        xml_file: str,
        by_folder: bool = False,
        window_focus: Optional[Window] = None,
        fs: Optional[fsspec.AbstractFileSystem] = None,
    ) -> None:
        self.xml_file = xml_file
        self.by_folder = by_folder
        if not self.xml_file.endswith(".xml") and not self.xml_file.endswith(".XML"):
            raise ValueError(
                f"Invalid SWIR file path {self.xml_file} must be a XML file"
            )

        if self.by_folder:
            assert (
                PRODUCT_FOLDERS["METADATA"] in self.xml_file
            ), f"Invalid SWIR file path {self.xml_file} must contain {PRODUCT_FOLDERS['METADATA']} if by folder"
            self.swir_file = (
                self.xml_file.replace(
                    PRODUCT_FOLDERS["METADATA"], PRODUCT_FOLDERS["SPECTRAL_IMAGE_SWIR"]
                )
                .replace(".XML", ".TIF")
                .replace(".xml", ".tif")
            )
        else:
            assert (
                "METADATA" in self.xml_file
            ), f"Invalid SWIR file path {self.xml_file} must contain METADATA if not by folder"
            self.swir_file = (
                self.xml_file.replace("METADATA", "SPECTRAL_IMAGE_SWIR")
                .replace(".XML", ".TIF")
                .replace(".xml", ".tif")
            )

        if not self.swir_file.endswith(".tif") and not self.swir_file.endswith(".TIF"):
            raise ValueError(
                f"Invalid SWIR file path {self.swir_file} must be a TIF file"
            )

        if self.xml_file.startswith("gs://") or self.xml_file.startswith("az://"):
            assert fs is not None, "Filesystem must be provided if using cloud storage"
            self.fs = fs
            assert fs.exists(self.xml_file), f"File {self.xml_file} does not exist"
            assert fs.exists(self.swir_file), f"File {self.swir_file} does not exist"
        else:
            self.fs = fs or fsspec.filesystem("file")
            assert os.path.exists(self.xml_file), f"File {self.xml_file} does not exist"
            assert os.path.exists(
                self.swir_file
            ), f"File {self.swir_file} does not exist"

        self.swir = RasterioReader(self.swir_file, window_focus=window_focus)

        if self.by_folder:
            self.vnir = RasterioReader(
                self.swir_file.replace(
                    PRODUCT_FOLDERS["SPECTRAL_IMAGE_SWIR"],
                    PRODUCT_FOLDERS["SPECTRAL_IMAGE_VNIR"],
                ),
                window_focus=window_focus,
            )
        else:
            self.vnir = RasterioReader(
                self.swir_file.replace("SPECTRAL_IMAGE_SWIR", "SPECTRAL_IMAGE_VNIR"),
                window_focus=window_focus,
            )

        with self.fs.open(self.xml_file) as fh:
            (
                self.wl_center,
                self.wl_fwhm,
                self.hsf,
                self.sza,
                self.saa,
                self.vaa,
                self.vza,
                self.gain_arr,
                self.offs_arr,
                startTime,
                endTime,
                self.rpcs_vnir,
                self.rpcs_swir,
            ) = read_xml(fh)

        self.swir_range = (
            self.wl_center["swir"][0] - self.wl_fwhm["swir"][0],
            self.wl_center["swir"][-1] + self.wl_fwhm["swir"][-1],
        )
        self.vnir_range = (
            self.wl_center["vnir"][0] - self.wl_fwhm["vnir"][0],
            self.wl_center["vnir"][-1] + self.wl_fwhm["vnir"][-1],
        )

        self.units = "mW/m2/sr/nm"  # == W/m^2/SR/um
        self.time_coverage_start = startTime
        self.time_coverage_end = endTime
        self._observation_date_correction_factor: Optional[float] = None

    @property
    def observation_date_correction_factor(self) -> float:
        if self._observation_date_correction_factor is None:
            self._observation_date_correction_factor = (
                reflectance.observation_date_correction_factor(
                    date_of_acquisition=self.time_coverage_start,
                    center_coords=self.footprint("EPSG:4326").centroid.coords[0],
                )
            )
        return self._observation_date_correction_factor

    @property
    def window_focus(self) -> Optional[Window]:
        return self.swir.window_focus

    @property
    def shape(self) -> tuple:
        return (
            len(self.wl_center["vnir"]) + len(self.wl_center["swir"]),
        ) + self.swir.shape[-2:]

    @property
    def transform(self):
        return self.swir.transform

    @property
    def crs(self):
        return self.swir.crs

    @property
    def res(self):
        return self.swir.res

    @property
    def width(self):
        return self.window_focus.width

    @property
    def height(self):
        return self.window_focus.height

    @property
    def bounds(self):
        return self.swir.bounds

    @property
    def fill_value_default(self):
        return self.swir.fill_value_default

    def footprint(self, crs: Optional[Any] = None) -> Any:
        return self.swir.footprint(crs=crs)

    def load_product(self, product_name: str) -> GeoTensor:
        if product_name not in PRODUCT_FOLDERS:
            raise ValueError(f"Invalid product name: {product_name}")

        if self.by_folder:
            folder = PRODUCT_FOLDERS[product_name]
            product_path = self.swir_file.replace(
                PRODUCT_FOLDERS["SPECTRAL_IMAGE_SWIR"], folder
            )

            raster_product = RasterioReader(
                product_path, window_focus=self.window_focus
            ).load()
        else:
            product_path = self.swir_file.replace("SPECTRAL_IMAGE_SWIR", product_name)
            raster_product = RasterioReader(
                product_path, window_focus=self.window_focus
            ).load()

        # Convert to radiance if SPECTRAL_IMAGE_SWIR or SPECRTAL_IMAGE_VNIR
        if product_name == "SPECTRAL_IMAGE_SWIR":
            name_coef = "swir"
        elif product_name == "SPECTRAL_IMAGE_VNIR":
            name_coef = "vnir"
        else:
            name_coef = None

        # https://github.com/GFZ/enpt/blob/main/enpt/model/images/images_sensorgeo.py#L327
        # Lλ = QCAL * GAIN + OFFSET
        # NOTE: - DLR provides gains between 2000 and 10000, so we have to DEVIDE by gains
        #       - DLR gains / offsets are provided in W/m2/sr/nm, so we have to multiply by 1000 to get
        #         mW/m2/sr/nm as needed later
        if name_coef is not None:
            gain = self.gain_arr[name_coef]
            offset = self.offs_arr[name_coef]
            invalids = raster_product.values == raster_product.fill_value_default
            raster_product.values = (
                gain[:, np.newaxis, np.newaxis] * raster_product.values
                + offset[:, np.newaxis, np.newaxis]
            ) * SC_COEFF
            raster_product.values[invalids] = self.fill_value_default

        return raster_product

    def load_wavelengths(
        self,
        wavelengths: Union[float, List[float], NDArray],
        as_reflectance: bool = True,
    ) -> Union[GeoTensor, NDArray]:
        """
        Load the reflectance of the given wavelengths

        Args:
            wavelengths (Union[float, List[float], NDArray]): List of wavelengths to load
            as_reflectance (bool, optional): return the values as reflectance rather than radiance.
                Defaults to True. If False values will have units of W/m^2/SR/um == mW/m2/sr/nm (`self.units`)

        Returns:
            Union[GeoTensor, NDArray]: GeoTensor with the values in reflectance or radiance units.

        Raises:
            ValueError: If any wavelength is outside the sensor's range.
        """
        if isinstance(wavelengths, Number):
            wavelengths = np.array([wavelengths])
        else:
            wavelengths = np.array(wavelengths)

        # Check all wavelengths are within the range of the sensor
        if any(
            [
                wvl < self.vnir_range[0] or wvl > self.swir_range[1]
                for wvl in wavelengths
            ]
        ):
            raise ValueError(
                f"Invalid wavelength range, must be between {self.vnir_range[0]} and {self.swir_range[1]}"
            )

        wavelengths_loaded = []
        fwhm = []
        ltoa_img = []
        for b in range(len(wavelengths)):
            if (
                wavelengths[b] >= self.swir_range[0]
                and wavelengths[b] < self.swir_range[1]
            ):
                index_band = np.argmin(np.abs(wavelengths[b] - self.wl_center["swir"]))
                fwhm.append(self.wl_fwhm["swir"][index_band])
                wavelengths_loaded.append(self.wl_center["swir"][index_band])
                rst = self.swir.isel({"band": [index_band]}).load().squeeze()
                invalids = (rst.values == rst.fill_value_default) | np.isnan(rst.values)

                # Convert to radiance
                gain = self.gain_arr["swir"][index_band]
                offset = self.offs_arr["swir"][index_band]
                img = (gain * rst.values + offset) * SC_COEFF
                img[invalids] = self.fill_value_default
            else:
                index_band = np.argmin(np.abs(wavelengths[b] - self.wl_center["vnir"]))
                fwhm.append(self.wl_fwhm["vnir"][index_band])
                wavelengths_loaded.append(self.wl_center["vnir"][index_band])
                rst = self.vnir.isel({"band": [index_band]}).load().squeeze()
                invalids = (rst.values == rst.fill_value_default) | np.isnan(rst.values)

                # Convert to radiance
                gain = self.gain_arr["vnir"][index_band]
                offset = self.offs_arr["vnir"][index_band]
                img = (gain * rst.values + offset) * SC_COEFF
                img[invalids] = self.fill_value_default

            ltoa_img.append(img)

        ltoa_img = GeoTensor(
            np.stack(ltoa_img, axis=0),
            transform=self.transform,
            crs=self.crs,
            fill_value_default=self.fill_value_default,
        )

        if as_reflectance:
            thuiller = reflectance.load_thuillier_irradiance()
            response = reflectance.srf(
                wavelengths_loaded, fwhm, thuiller["Nanometer"].values
            )

            solar_irradiance_norm = thuiller["Radiance(mW/m2/nm)"].values.dot(
                response
            )  # mW/m$^2$/SR/nm
            solar_irradiance_norm /= 1_000  # W/m$^2$/nm

            # Divide by 10 to convert from mW/m^2/SR/nm to µW /cm²/SR/nm
            ltoa_img = reflectance.radiance_to_reflectance(
                ltoa_img,
                solar_irradiance_norm,
                units=self.units,
                observation_date_corr_factor=self.observation_date_correction_factor,
            )

        return ltoa_img

    def load_rgb(
        self,
        as_reflectance: bool = True,
        apply_rpcs: bool = True,
        dst_crs: str = "EPSG:4326",
        resolution_dst_crs: Optional[Union[float, Tuple[float, float]]] = None,
    ) -> GeoTensor:
        """
        Load RGB image from VNIR bands. Converts radiance to TOA reflectance if as_reflectance is True
        otherwise it will return the radiance values in W/m^2/SR/um == mW/m2/sr/nm (`self.units`)

        Args:
            as_reflectance (bool, optional): Convert radiance to TOA reflectance. Defaults to True.
            apply_rpcs (bool, optional): Apply RPCs to the image. Defaults to True.
            dst_crs (str, optional): Destination CRS. Defaults to "EPSG:4326".
            resolution_dst_crs (Optional[Union[float, Tuple[float, float]]], optional):
                Resolution of the destination CRS. Defaults to None.
        Returns:
            GeoTensor: with the RGB image
        """
        rgb = self.load_wavelengths(WAVELENGTHS_RGB, as_reflectance=as_reflectance)
        if apply_rpcs:
            return read.read_rpcs(
                rgb.values,
                rpcs=self.rpcs_vnir,
                dst_crs=dst_crs,
                resolution_dst_crs=resolution_dst_crs,
                fill_value_default=rgb.fill_value_default,
            )
        elif dst_crs is not None:
            return read.read_to_crs(
                rgb, resolution_dst_crs=resolution_dst_crs, dst_crs=dst_crs
            )

        return rgb

    def load(self) -> GeoTensor:
        swir = self.load_product("SPECTRAL_IMAGE_SWIR")
        # vnir = self.load_product('SPECTRAL_IMAGE_VNIR')

        return swir

    def __repr__(self) -> str:
        return f"""
        File: {self.xml_file}
        Bounds: {self.bounds}
        Time: {self.time_coverage_start}
        Spatial shape (height, width): {self.height, self.width}
        VNIR Range: {self.vnir_range} nbands: {len(self.wl_center['vnir'])} 
        SWIR Range: {self.swir_range} nbands: {len(self.wl_center['swir'])}
        """

load_rgb(as_reflectance=True, apply_rpcs=True, dst_crs='EPSG:4326', resolution_dst_crs=None)

Load RGB image from VNIR bands. Converts radiance to TOA reflectance if as_reflectance is True otherwise it will return the radiance values in W/m^2/SR/um == mW/m2/sr/nm (self.units)

Parameters:

Name Type Description Default
as_reflectance bool

Convert radiance to TOA reflectance. Defaults to True.

True
apply_rpcs bool

Apply RPCs to the image. Defaults to True.

True
dst_crs str

Destination CRS. Defaults to "EPSG:4326".

'EPSG:4326'
resolution_dst_crs Optional[Union[float, Tuple[float, float]]]

Resolution of the destination CRS. Defaults to None.

None

Returns: GeoTensor: with the RGB image

Source code in georeader/readers/enmap.py
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
def load_rgb(
    self,
    as_reflectance: bool = True,
    apply_rpcs: bool = True,
    dst_crs: str = "EPSG:4326",
    resolution_dst_crs: Optional[Union[float, Tuple[float, float]]] = None,
) -> GeoTensor:
    """
    Load RGB image from VNIR bands. Converts radiance to TOA reflectance if as_reflectance is True
    otherwise it will return the radiance values in W/m^2/SR/um == mW/m2/sr/nm (`self.units`)

    Args:
        as_reflectance (bool, optional): Convert radiance to TOA reflectance. Defaults to True.
        apply_rpcs (bool, optional): Apply RPCs to the image. Defaults to True.
        dst_crs (str, optional): Destination CRS. Defaults to "EPSG:4326".
        resolution_dst_crs (Optional[Union[float, Tuple[float, float]]], optional):
            Resolution of the destination CRS. Defaults to None.
    Returns:
        GeoTensor: with the RGB image
    """
    rgb = self.load_wavelengths(WAVELENGTHS_RGB, as_reflectance=as_reflectance)
    if apply_rpcs:
        return read.read_rpcs(
            rgb.values,
            rpcs=self.rpcs_vnir,
            dst_crs=dst_crs,
            resolution_dst_crs=resolution_dst_crs,
            fill_value_default=rgb.fill_value_default,
        )
    elif dst_crs is not None:
        return read.read_to_crs(
            rgb, resolution_dst_crs=resolution_dst_crs, dst_crs=dst_crs
        )

    return rgb

load_wavelengths(wavelengths, as_reflectance=True)

Load the reflectance of the given wavelengths

Parameters:

Name Type Description Default
wavelengths Union[float, List[float], NDArray]

List of wavelengths to load

required
as_reflectance bool

return the values as reflectance rather than radiance. Defaults to True. If False values will have units of W/m^2/SR/um == mW/m2/sr/nm (self.units)

True

Returns:

Type Description
Union[GeoTensor, NDArray]

Union[GeoTensor, NDArray]: GeoTensor with the values in reflectance or radiance units.

Raises:

Type Description
ValueError

If any wavelength is outside the sensor's range.

Source code in georeader/readers/enmap.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
def load_wavelengths(
    self,
    wavelengths: Union[float, List[float], NDArray],
    as_reflectance: bool = True,
) -> Union[GeoTensor, NDArray]:
    """
    Load the reflectance of the given wavelengths

    Args:
        wavelengths (Union[float, List[float], NDArray]): List of wavelengths to load
        as_reflectance (bool, optional): return the values as reflectance rather than radiance.
            Defaults to True. If False values will have units of W/m^2/SR/um == mW/m2/sr/nm (`self.units`)

    Returns:
        Union[GeoTensor, NDArray]: GeoTensor with the values in reflectance or radiance units.

    Raises:
        ValueError: If any wavelength is outside the sensor's range.
    """
    if isinstance(wavelengths, Number):
        wavelengths = np.array([wavelengths])
    else:
        wavelengths = np.array(wavelengths)

    # Check all wavelengths are within the range of the sensor
    if any(
        [
            wvl < self.vnir_range[0] or wvl > self.swir_range[1]
            for wvl in wavelengths
        ]
    ):
        raise ValueError(
            f"Invalid wavelength range, must be between {self.vnir_range[0]} and {self.swir_range[1]}"
        )

    wavelengths_loaded = []
    fwhm = []
    ltoa_img = []
    for b in range(len(wavelengths)):
        if (
            wavelengths[b] >= self.swir_range[0]
            and wavelengths[b] < self.swir_range[1]
        ):
            index_band = np.argmin(np.abs(wavelengths[b] - self.wl_center["swir"]))
            fwhm.append(self.wl_fwhm["swir"][index_band])
            wavelengths_loaded.append(self.wl_center["swir"][index_band])
            rst = self.swir.isel({"band": [index_band]}).load().squeeze()
            invalids = (rst.values == rst.fill_value_default) | np.isnan(rst.values)

            # Convert to radiance
            gain = self.gain_arr["swir"][index_band]
            offset = self.offs_arr["swir"][index_band]
            img = (gain * rst.values + offset) * SC_COEFF
            img[invalids] = self.fill_value_default
        else:
            index_band = np.argmin(np.abs(wavelengths[b] - self.wl_center["vnir"]))
            fwhm.append(self.wl_fwhm["vnir"][index_band])
            wavelengths_loaded.append(self.wl_center["vnir"][index_band])
            rst = self.vnir.isel({"band": [index_band]}).load().squeeze()
            invalids = (rst.values == rst.fill_value_default) | np.isnan(rst.values)

            # Convert to radiance
            gain = self.gain_arr["vnir"][index_band]
            offset = self.offs_arr["vnir"][index_band]
            img = (gain * rst.values + offset) * SC_COEFF
            img[invalids] = self.fill_value_default

        ltoa_img.append(img)

    ltoa_img = GeoTensor(
        np.stack(ltoa_img, axis=0),
        transform=self.transform,
        crs=self.crs,
        fill_value_default=self.fill_value_default,
    )

    if as_reflectance:
        thuiller = reflectance.load_thuillier_irradiance()
        response = reflectance.srf(
            wavelengths_loaded, fwhm, thuiller["Nanometer"].values
        )

        solar_irradiance_norm = thuiller["Radiance(mW/m2/nm)"].values.dot(
            response
        )  # mW/m$^2$/SR/nm
        solar_irradiance_norm /= 1_000  # W/m$^2$/nm

        # Divide by 10 to convert from mW/m^2/SR/nm to µW /cm²/SR/nm
        ltoa_img = reflectance.radiance_to_reflectance(
            ltoa_img,
            solar_irradiance_norm,
            units=self.units,
            observation_date_corr_factor=self.observation_date_correction_factor,
        )

    return ltoa_img