Source code for utils.nodes.tempestextremes_utils.node_utils

#!/usr/bin/env python
import os
import shutil
import subprocess
import xarray as xr
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from utils.general.nci_utils import get_GADI_ERA5_filename
from utils.general.date_utils import generate_datetimes_months
from utils.general.file_utils import write_to_filelist,create_directory,read_filelist


[docs] def create_Node_dirstruct(runpath,casename): #### Create the case directory #### casedir=os.path.join(runpath,casename) create_directory(casedir) #### Create the detectBlobs directory #### inputdir=os.path.join(runpath,casename,'input') create_directory(inputdir) #### Create the detectNodes directory #### detectNodesdir=os.path.join(runpath,casename,'detectNodes') create_directory(detectNodesdir) #### Create the detectNodes directory #### stitchNodesdir=os.path.join(runpath,casename,'stitchNodes') create_directory(stitchNodesdir) #### Create the detectNodes directory #### nodeFileComposedir=os.path.join(runpath,casename,'nodeFileCompose') create_directory(nodeFileComposedir) #### Create the detectNodes directory #### logsdir=os.path.join(runpath,casename,'logs') create_directory(logsdir) return casedir,inputdir,detectNodesdir,stitchNodesdir,logsdir
[docs] def run_detectNodes(input_filelist, detect_filelist, mpi_np=1, searchby="min", detect_var="msl", merge_dist=6.0, bounds=None, closedcontour_commands="msl,200.0,5.5,0;_DIFF(z(300millibars),z(500millibars)),-58.8,6.5,1.0", output_commands="msl,min,0;_VECMAG(u10,v10),max,2.0;zs,min,0", timeinterval="6hr", lonname="longitude",latname="latitude", logdir="./log/", regional=False, quiet=False, out_command_only=False): """ Detect and track minimum based on TempestExtremes. TC detection is based on warm-core criterion from Zarzycki and Ullrich (2017) https://agupubs.onlinelibrary.wiley.com/doi/10.1002/2016GL071606 Parameters ---------- input_filelist : str String with a path to the textfile containing the input data required. detect_filelist : str String with a path to the textfile containing the names of the detectNode output. mpi_np : int, optional Number of cores used in the calculation, given to mpi command. Defaults to 1. searchby : {"min", "max"}, optional Type of extremum to search for. Defaults to "min". detect_var : str, optional String with the variable to detect (must match in the input netcdf file). Defaults to "msl". merge_dist : float, optional Distance for merging nodes. Defaults to 6.0. bounds : list (N=4), optional A list containing the bounds of a bounding box to do detection in the form ``[minlon,maxlon,minlat,maxlat]``. Defaults to None. closedcontour_commands : str, optional String with the closed contour commands. Should be of the form ``<var,op,threshold,dist>`` with commands separated by a ";". Defaults to "msl,200.0,5.5,0;_DIFF(z(300millibars),z(500millibars)),-58.8,6.5,1.0". output_commands : str, optional String with the output commands. Should be of the form ``<var,op,dist>`` with commands separated by a ";". Defaults to "msl,min,0;_VECMAG(u10,v10),max,2.0;zs,min,0". timeinterval : str, optional String with the time interval (e.g. "6hr"). Defaults to "6hr". lonname : str, optional String with the longitude variable name, as in the input netcdfs. Defaults to "longitude". latname : str, optional String with the latitude variable name, as in the input netcdfs. Defaults to "latitude". logdir : str, optional String with the path for logfile output. Defaults to "./log/". regional : bool, optional If True, tells TE that it is expecting a regional grid without periodic boundaries. Defaults to False. quiet : bool, optional If True, progress information is suppressed. Defaults to False. out_command_only : bool, optional If True, will not run the TE command but instead with output the command for terminal use. Defaults to False. Returns ------- tuple of (str, str) A tuple containing stdout and stderr from the `DetectNodes` subprocess if `quiet` is False, otherwise nothing. Notes ----- This function wraps the TempestExtremes `DetectNodes` command. Ensure the `TEMPESTEXTREMESDIR` environment variable is set. Examples -------- >>> # Assuming you have input and detect filelists ready >>> # stdout, stderr = run_detectNodes("input_files.txt", "detect_output.txt", mpi_np=4) >>> # print("DetectNodes stdout:", stdout) """ # DetectNode command detectNode_command = ["mpirun", "-np", f"{int(mpi_np)}", f"{os.environ['TEMPESTEXTREMESDIR']}/DetectNodes", "--in_data_list",f"{input_filelist}", "--out_file_list", f"{detect_filelist}", f"--searchby{searchby}",f"{detect_var}", "--closedcontourcmd",f"{closedcontour_commands}", "--mergedist",f"{merge_dist}", "--outputcmd",f"{output_commands}", "--timefilter",f"{timeinterval}", "--latname",f"{latname}", "--lonname",f"{lonname}", "--logdir",f"{logdir}", ] if regional: detectNode_command=detectNode_command+["--regional"] if bounds is not None: detectNode_command=detectNode_command+["--minlon",f"{bounds[0]}","--maxlon",f"{bounds[1]}","--minlat",f"{bounds[2]}","--maxlat",f"{bounds[3]}"] ### Here I prepare the command to printed out ready for the terminal as I need to add in some "" for some variables printed_command=detectNode_command.copy() indx=np.where(np.array(printed_command)==f"--searchby{searchby}")[0][0]+1 printed_command[indx]=[f'"{s}"' for s in [printed_command[indx]]][0] indx=np.where(np.array(printed_command)=='--closedcontourcmd')[0][0]+1 printed_command[indx]=[f'"{s}"' for s in [printed_command[indx]]][0] indx=np.where(np.array(printed_command)=='--outputcmd')[0][0]+1 printed_command[indx]=[f'"{s}"' for s in [printed_command[indx]]][0] indx=np.where(np.array(printed_command)=='--timefilter')[0][0]+1 printed_command[indx]=[f'"{s}"' for s in [printed_command[indx]]][0] print(*printed_command) if not out_command_only: print('Running command ...') detectNode_process = subprocess.Popen(detectNode_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # Wait for the process to complete and capture output stdout, stderr = detectNode_process.communicate() #path,_=os.path.split(input_filelist) outfile=logdir+'/detectNodes_outlog.txt' with open(outfile, 'w') as file: file.write(stdout) outfile=logdir+'/detectNodes_errlog.txt' with open(outfile, 'w') as file: file.write(stderr) if not quiet: return stdout, stderr
[docs] def run_stitchNodes(input_filelist, stitch_file, mpi_np=1, output_filefmt="csv", in_fmt_commands="lon,lat,msl", range_dist=8.0, minim_time="54h", maxgap_time="24h", min_endpoint_dist=12.0, threshold_condition="wind,>=,10.0,10;lat,<=,50.0,10;lat,>=,-50.0,10;zs,<,150,10", quiet=False, out_command_only=False): """ input_filelist --- text file contain a list of DetectNode output files stitch_file --- a single text file with stitched nodes output_filefmt --- output format (gfdl|csv|csvnohead) range_dist --- the distance between two nodes in two consecutive time steps adjust this parameter according to timeinterval in run_detectNodes at least >= merge_dist in run_detectNodes minim_time --- the minimum time detected nodes must sustain maxgap_time --- the maximum gap within the track threshold_var --- threshold variable threshold_op --- threshold operation (>|<|==|!=) threshold_val --- threshold value threshold_time --- the numbe of timesteps threshold must be satisfied """ # StitchNode command stitchNode_command = ["mpirun", "-np", f"{int(mpi_np)}", f"{os.environ['TEMPESTEXTREMESDIR']}/StitchNodes", "--in_list",f"{input_filelist}", "--in_fmt",f"{in_fmt_commands}", "--range",f"{range_dist}", "--mintime",f"{minim_time}", "--maxgap",f"{maxgap_time}", "--threshold",f"{threshold_condition}", "--min_endpoint_dist",f"{min_endpoint_dist}", "--out_file_format",f"{output_filefmt}", "--out", f"{stitch_file}" ] print(*stitchNode_command) if not out_command_only: stitchNode_process = subprocess.Popen(stitchNode_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # Wait for the process to complete and capture output stdout, stderr = stitchNode_process.communicate() path,_=os.path.split(input_filelist) outfile=path+'/stitchNodes_outlog.txt' with open(outfile, 'w') as file: file.write(stdout) outfile=path+'/stitchNodes_errlog.txt' with open(outfile, 'w') as file: file.write(stderr) if not quiet: return stdout, stderr
[docs] def run_nodeCompose(stitch_file, compose_file, in_fmt, level_type, grid_type='XY', variables=['u','v'], lonname="longitude",latname="latitude", dx=0.5, resx=11, quiet=False, out_command_only=False): """ Parameters ---------- stitch_file : dtype str String with a path to the csv file of the stitch node output data. compose_file : dtype str String with a path to the netcdf file of the compose node output data. level_type : dtype str level type of era5 data ['pressure-levels', 'single-levels', 'potential-temperature'] grid_type : dtype str grid type of the output data default='XY' variables : list (N=n) a list containing names of variables in the form of ['u', 'v'] lonname : dtype str String with the longitude variable name, as in the input netcdfs latname : dtype str String with the latitude variable name, as in the input netcdfs dx: dtype float Horizontal grid spacing of the XY grid resx: dtype int Number of grid ceels in each coordiante on the XY grid quiet : bool *Optional*, default ``False``. If ``True``, progress information is suppressed. out_command_only : bool *Optional*, default ``False``. If ``True``, will not run the TE command but instead with output the command for terminal use. """ # create a "time" column df = pd.read_csv(stitch_file, skipinitialspace=True) df['time'] = pd.to_datetime({ 'year': df.year, 'month': df.month, 'day': df.day, 'hour': df.hour }) # extract start and end date sta_date = df.time.min() end_date = df.time.max() months_list = generate_datetimes_months(sta_date,end_date,interval=1) # input filelist inputlist = os.path.dirname(compose_file) + os.sep + 'inputlist.txt' with open(inputlist, 'w') as in_file: for m in months_list: files = [] for var in variables: file = get_GADI_ERA5_filename(var,m,stream='hourly',level_type=level_type) files.append(file) in_file.write(";".join(files) + "\n") # --var if level_type == 'single-levels': varin_str = ",".join([f'{v}' for v in variables]) if level_type == 'pressure-levels' or level_type == 'potential-temperature': varin_str = ",".join([f'{v}(:)' for v in variables]) varout_str = ",".join([f'{v}' for v in variables]) composeNode_command = [f"{os.environ['TEMPESTEXTREMESDIR']}/NodeFileCompose", "--in_nodefile",f"{stitch_file}", "--in_nodefile_type", "SN", "--in_fmt",f"{in_fmt}", "--snapshots", "--in_data_list",f"{inputlist}", "--dx",f"{dx}", "--resx",f"{resx}", "--out_grid",f"{grid_type}", "--out_data",f"{compose_file}", "--latname",f"{latname}", "--lonname",f"{lonname}", "--var",f"{varin_str}", "--varout",f"{varout_str}", ] print(*composeNode_command) if not out_command_only: composeNode_process = subprocess.Popen(composeNode_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # Wait for the process to complete and capture output stdout, stderr = composeNode_process.communicate() path,_=os.path.split(compose_file) outfile=path+'/composeNode_outlog.txt' with open(outfile, 'w') as file: file.write(stdout) outfile=path+'/composeNode_errlog.txt' with open(outfile, 'w') as file: file.write(stderr) if not quiet: return stdout, stderr os.remove(inputlist)
[docs] def assign_lev(ds, level_type): if level_type == 'pressure-levels': level_range = np.array([ 1, 2, 3, 5, 7, 10, 20, 30, 50, 70, 100, 125, 150, 175, 200, 225, 250, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 775, 800, 825, 850, 875, 900, 925, 950, 975, 1000]) if level_type == 'potential-temperature': level_range = np.array([265, 275, 285, 300, 315, 320, 330, 350, 370, 395, 430, 475, 530, 600, 700, 850]) ds_lev = ds.assign_coords(level=('level', level_range)) return ds_lev