Skip to content

Commit

Permalink
Code update
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 715462263
  • Loading branch information
ilopezgp authored and The swirl_dynamics Authors committed Jan 14, 2025
1 parent 6fcbb50 commit 1737a1a
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@

from collections.abc import Iterable
import functools
import typing
from typing import TypeVar

from absl import app
from absl import flags
import apache_beam as beam
import numpy as np
from swirl_dynamics.projects.probabilistic_diffusion.downscaling.gcm_wrf import beam_utils
import xarray as xr
import xarray_beam as xbeam
from zarr.google import gfile_store
Expand Down Expand Up @@ -94,30 +94,6 @@ def _impose_data_selection(ds: xr.Dataset) -> xr.Dataset:
return ds.sel({k: v for k, v in selection.items() if k in ds.dims})


# TODO: Move this function to a common library.
def _get_climatology_mean(
climatology: xr.Dataset, variables: list[str], **sel_kwargs
) -> xr.Dataset:
"""Returns the climatological mean of the given variables.
The climatology dataset is assumed to have been produced through
the weatherbench2 compute_climatology.py script,
(https://github.com/google-research/weatherbench2/blob/main/scripts/compute_climatology.py)
and statistics `mean`, and `std`. The convention is that the climatological
means do not have a suffix, and standard deviations have a `_std` suffix.
Args:
climatology: The climatology dataset.
variables: The variables to extract from the climatology.
**sel_kwargs: Additional selection criteria for the variables.
Returns:
The climatological mean of the given variables.
"""
climatology_mean = climatology[variables]
return typing.cast(xr.Dataset, climatology_mean.sel(**sel_kwargs).compute())


def _combine_stats(
*,
mean: Dataset,
Expand Down Expand Up @@ -169,7 +145,7 @@ def compute_moments_chunk(
drop=True,
)
variables = [str(key) for key in dataset.keys()]
clim_mean = _get_climatology_mean(clim, variables, **clim_sel)
clim_mean = beam_utils.get_climatology_mean(clim, variables, **clim_sel)

dataset = dataset - clim_mean
count = dataset.notnull() if skipna else xr.ones_like(dataset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
from absl import flags
import apache_beam as beam
import numpy as np
from swirl_dynamics.projects.probabilistic_diffusion.downscaling.gcm_wrf import beam_utils
import xarray as xr
import xarray_beam as xbeam

Expand Down Expand Up @@ -170,54 +171,6 @@
RUNNER = flags.DEFINE_string('runner', None, 'beam.runners.Runner')


def _get_climatology_mean(
climatology: xr.Dataset, variables: list[str], **sel_kwargs
) -> xr.Dataset:
"""Returns the climatological mean of the given variables.
The climatology dataset is assumed to have been produced through
the weatherbench2 compute_climatology.py script,
(https://github.com/google-research/weatherbench2/blob/main/scripts/compute_climatology.py)
and statistics `mean`, and `std`. The convention is that the climatological
means do not have a suffix, and standard deviations have a `_std` suffix.
Args:
climatology: The climatology dataset.
variables: The variables to extract from the climatology.
**sel_kwargs: Additional selection criteria for the variables.
Returns:
The climatological mean of the given variables.
"""
climatology_mean = climatology[variables]
return typing.cast(xr.Dataset, climatology_mean.sel(**sel_kwargs).compute())


def _get_climatology_std(
climatology: xr.Dataset, variables: list[str], **sel_kwargs
) -> xr.Dataset:
"""Returns the climatological standard deviation of the given variables.
The climatology dataset is assumed to have been produced through
the weatherbench2 compute_climatology.py script, and statistics
`mean`, and `std`. The convention is that the climatological means do not
have a suffix, and standard deviations have a `_std` suffix.
Args:
climatology: The climatology dataset.
variables: The variables to extract from the climatology.
**sel_kwargs: Additional selection criteria for the variables.
Returns:
The climatological standard deviation of the given variables.
"""
clim_std_dict = {key + '_std': key for key in variables} # pytype: disable=unsupported-operands
climatology_std = climatology[list(clim_std_dict.keys())].rename(
clim_std_dict
)
return typing.cast(xr.Dataset, climatology_std.sel(**sel_kwargs).compute())


def _staresdm_on_chunks(
source: xr.Dataset,
*,
Expand Down Expand Up @@ -272,18 +225,24 @@ def _staresdm_on_chunks(
)

# Static input low-resolution climatology.
input_clim_mean = _get_climatology_mean(input_clim, variables, **sel)
input_clim_std = _get_climatology_std(input_clim, variables, **sel)
input_clim_mean = beam_utils.get_climatology_mean(
input_clim, variables, **sel
)
input_clim_std = beam_utils.get_climatology_std(input_clim, variables, **sel)
# Dynamic input low-resolution climatology.
input_dynamic_clim_mean = _get_climatology_mean(
input_dynamic_clim_mean = beam_utils.get_climatology_mean(
input_dynamic_clim, variables, **sel
)
input_dynamic_clim_std = _get_climatology_std(
input_dynamic_clim_std = beam_utils.get_climatology_std(
input_dynamic_clim, variables, **sel
)
# Target high-resolution climatology.
target_clim_mean = _get_climatology_mean(target_clim, variables, **sel)
target_clim_std = _get_climatology_std(target_clim, variables, **sel)
target_clim_mean = beam_utils.get_climatology_mean(
target_clim, variables, **sel
)
target_clim_std = beam_utils.get_climatology_std(
target_clim, variables, **sel
)

staresdm = staresdm + target_clim_mean
staresdm = staresdm + (input_dynamic_clim_mean - input_clim_mean)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2024 The swirl_dynamics Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utils for beam pipelines."""

import typing

import xarray as xr


def get_climatology_mean(
climatology: xr.Dataset, variables: list[str], **sel_kwargs
) -> xr.Dataset:
"""Returns the climatological mean of the given variables.
The climatology dataset is assumed to have been produced through
the weatherbench2 compute_climatology.py script,
(https://github.com/google-research/weatherbench2/blob/main/scripts/compute_climatology.py)
and statistics `mean`, and `std`. The convention is that the climatological
means do not have a suffix, and standard deviations have a `_std` suffix.
Args:
climatology: The climatology dataset.
variables: The variables to extract from the climatology.
**sel_kwargs: Additional selection criteria for the variables.
Returns:
The climatological mean of the given variables.
"""
climatology_mean = climatology[variables]
return typing.cast(xr.Dataset, climatology_mean.sel(**sel_kwargs).compute())


def get_climatology_std(
climatology: xr.Dataset, variables: list[str], **sel_kwargs
) -> xr.Dataset:
"""Returns the climatological standard deviation of the given variables.
The climatology dataset is assumed to have been produced through
the weatherbench2 compute_climatology.py script, and statistics
`mean`, and `std`. The convention is that the climatological means do not
have a suffix, and standard deviations have a `_std` suffix.
Args:
climatology: The climatology dataset.
variables: The variables to extract from the climatology.
**sel_kwargs: Additional selection criteria for the variables.
Returns:
The climatological standard deviation of the given variables.
"""
clim_std_dict = {key + '_std': key for key in variables} # pytype: disable=unsupported-operands
climatology_std = climatology[list(clim_std_dict.keys())].rename(
clim_std_dict
)
return typing.cast(xr.Dataset, climatology_std.sel(**sel_kwargs).compute())

0 comments on commit 1737a1a

Please sign in to comment.