import numpy as np
from osgeo import gdal
import os, glob
import tempfile,shutil
from tqdm import tqdm
from polsartools.utils.utils import time_it, mlook_arr
# from polsartools.utils.io_utils import write_T3, write_C3
from polsartools.preprocess.convert_S2 import convert_S
gdal.UseExceptions()
def load_a4(input_file, output_file, calfac_linear,mat=None,
driver='GTiff',
cog=False, ovr=[2, 4, 8, 16], comp=False,
block_size=1000):
with open(input_file, mode='rb') as fp:
# 2. Extract Metadata for dimensions
fp.seek(236)
nline = int(fp.read(8))
fp.seek(248)
npixel = int(fp.read(8))
prefix_len = 800
nrec_bytes = prefix_len + (npixel * 8)
row_width_floats = nrec_bytes // 4
pixel_start_idx = prefix_len // 4
# print(f"Image Dimensions: {nline} lines x {npixel} pixels")
# print(f"Processing in blocks of {block_size} lines...")
# 3. Initialize GDAL Output File
if driver =='ENVI':
driver = gdal.GetDriverByName('ENVI')
dataset = driver.Create(output_file, npixel, nline, 1, gdal.GDT_CFloat32)
else:
driver = gdal.GetDriverByName("GTiff")
options = ['BIGTIFF=IF_SAFER']
if comp:
# options += ['COMPRESS=DEFLATE', 'PREDICTOR=2', 'ZLEVEL=9']
options += ['COMPRESS=LZW']
if cog:
options += ['TILED=YES', 'BLOCKXSIZE=512', 'BLOCKYSIZE=512']
dataset = driver.Create(
output_file,
npixel, nline,
1,
gdal.GDT_CFloat32,
options
)
out_band = dataset.GetRasterBand(1)
# 4. Block-by-Block Loop
fp.seek(720) # Skip CEOS header
print(f"Extracting single-look elements...")
for start_line in tqdm(range(0, nline, block_size)):
# Calculate how many lines to read (don't overflow the end)
current_block_height = min(block_size, nline - start_line)
# Read block of bytes
raw_bytes = fp.read(nrec_bytes * current_block_height)
if not raw_bytes:
break
# Convert to numpy array
data = np.frombuffer(raw_bytes, dtype='>f4').reshape(current_block_height, row_width_floats)
# Extract Pixels (strip 800-byte prefix)
pixel_data = data[:, pixel_start_idx:]
# Convert to Complex and Apply Calibration + Absolute Value
# Process: Real/Imag -> Complex -> Magnitude -> Scale
# real = pixel_data[:, ::2]
# imag = pixel_data[:, 1::2]
slc = (pixel_data[:, ::2] + 1j*pixel_data[:, 1::2]).astype(np.float64)* calfac_linear
# Logic: abs(complex) * calfac = sqrt(re^2 + im^2) * calfac
# We do this in one step to save memory
# mag_block = np.sqrt(real**2 + imag**2).astype(np.float32) * calfac_linear
# 5. Write block to disk
# WriteArray(array, x_offset, y_offset)
out_band.WriteArray(slc, 0, start_line)
# if start_line % (block_size * 5) == 0:
# print(f"Progress: {start_line}/{nline} lines processed.")
if cog:
dataset.BuildOverviews("NEAREST", ovr)
# Finalize
dataset.FlushCache()
dataset = None
if mat == 'S2' or mat == 'Sxy':
print(f"Saved file: {output_file}")
def write_a2_rst(out_file,data,
driver='GTiff', out_dtype=gdal.GDT_CFloat32,
mat=None,
cog=False, ovr=[2, 4, 8, 16], comp=False
):
if driver =='ENVI':
# Create GDAL dataset
driver = gdal.GetDriverByName(driver)
dataset = driver.Create(
out_file,
data.shape[1],
data.shape[0],
1,
out_dtype
)
else:
driver = gdal.GetDriverByName("GTiff")
options = ['BIGTIFF=IF_SAFER']
if comp:
# options += ['COMPRESS=DEFLATE', 'PREDICTOR=2', 'ZLEVEL=9']
options += ['COMPRESS=LZW']
if cog:
options += ['TILED=YES', 'BLOCKXSIZE=512', 'BLOCKYSIZE=512']
dataset = driver.Create(
out_file,
data.shape[1],
data.shape[0],
1,
out_dtype,
options
)
dataset.GetRasterBand(1).WriteArray(data)
# outdata.GetRasterBand(1).SetNoDataValue(0)##if you want these values transparent
dataset.FlushCache() ##saves to disk!!
if cog:
dataset.BuildOverviews("NEAREST", ovr)
dataset = None
if mat == 'S2' or mat == 'Sxy':
print(f"Saved file: {out_file}")
#################################################################################################
[docs]
@time_it
def import_alos4_uwd_l11(in_dir,mat='C2', azlks=25,rglks=5,
fmt='tif', cog=False,ovr = [2, 4, 8, 16],comp=False,
out_dir=None,
cf_dB=-83):
"""
Extracts the C2 matrix elements (C11, C22, and C12) from ALOS-4 Ultra Wide Beam Dual-Pol (UWD) CEOS data
and saves them into respective binary files.
Example:
--------
>>> import_alos4_uwd_l11("path_to_folder", azlks=25, rglks=5)
This will extract the C2 matrix elements from the ALOS-2 Wide Beam Dual-Pol data
in the specified folder and save them in the 'C2' directory.
Parameters:
-----------
in_dir : str
The path to the folder containing the ALOS-2 Wide Beam Dual-Pol CEOS data files.
mat : str, optional (default = 'S2' or 'Sxy)
Type of matrix to extract. Valid options: 'Sxy','C2', 'T2'.
azlks : int, optional (default=25)
The number of azimuth looks for multi-looking.
rglks : int, optional (default=5)
The number of range looks for multi-looking.
swath : int, optional (default=1)
The swath number [1,2,3,4,5].
fmt : {'tif', 'bin'}, optional (default='tif')
Output format:
- 'tif': GeoTIFF
- 'bin': Raw binary format
cog : bool, optional (default=False)
If True, outputs will be saved as Cloud Optimized GeoTIFFs with internal tiling and overviews.
ovr : list of int, optional (default=[2, 4, 8, 16])
Overview levels for COG generation. Ignored if cog=False.
comp : bool, optional (default=False)
If True, applies LZW compression to GeoTIFF outputs.
out_dir : str or None, optional (default=None)
Directory to save output files. If None, a folder named after the matrix type will be created
in the same location as the input file.
cf_dB : float, optional (default=-83)
The calibration factor in dB used to scale the raw radar data. It is applied to
the HH and HV polarization data before matrix computation.
Returns:
--------
None
The function does not return any value. Instead, it creates a folder named `C2`
(if not already present) and saves the following binary files:
- `C11.bin`: Contains the C11 matrix elements.
- `C22.bin`: Contains the C22 matrix elements.
- `C12_real.bin`: Contains the real part of the C12 matrix.
- `C12_imag.bin`: Contains the imaginary part of the C12 matrix.
Raises:
-------
FileNotFoundError
If the required ALOS-4 data files (e.g., `IMG-HH` and `IMG-HV`) cannot be found in the specified folder.
"""
valid_dual_pol = ['Sxy', 'C2', 'T2']
valid_matrices = valid_dual_pol
if mat not in valid_matrices:
raise ValueError(f"Invalid matrix type '{mat}'. \n Supported types are:\n"
f" Dual-pol: {sorted(valid_dual_pol)}")
temp_dir = None
ext = 'bin' if fmt == 'bin' else 'tif'
driver = 'ENVI' if fmt == 'bin' else None
# Final output directory
if out_dir is None:
final_out_dir = os.path.join(in_dir, mat)
else:
final_out_dir = os.path.join(out_dir, mat)
os.makedirs(final_out_dir, exist_ok=True)
# Intermediate output directory
if mat in ['Sxy']:
base_out_dir = final_out_dir
else:
temp_dir = tempfile.mkdtemp(prefix='temp_S2_')
base_out_dir = temp_dir
hh_file = glob.glob(os.path.join(in_dir,f'IMG-HH-*')) [0]
hv_file = glob.glob(os.path.join(in_dir,f'IMG-HV-*')) [0]
calfac_linear = np.sqrt(10 ** ((cf_dB) / 10))
load_a4(hh_file, os.path.join(base_out_dir, f's11.{ext}'),
calfac_linear, mat=mat, driver=driver, cog=cog, ovr=ovr, comp=comp,
block_size=1000)
load_a4(hv_file, os.path.join(base_out_dir, f's12.{ext}'),
calfac_linear, mat=mat, driver=driver, cog=cog, ovr=ovr, comp=comp,
block_size=1000)
# Matrix conversion if needed
if mat in ['C2', 'T2']:
convert_S(base_out_dir, mat=mat, azlks=azlks, rglks=rglks, cf=1,
fmt=fmt, out_dir=final_out_dir, cog=cog, ovr=ovr, comp=comp)
# Clean up temp directory
if temp_dir:
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Warning: Could not delete temporary directory {temp_dir}: {e}")