ticemodelvec2t.py - pism - [fork] customized build of PISM, the parallel ice sheet model (tillflux branch)
(HTM) git clone git://src.adamsgaard.dk/pism
(DIR) Log
(DIR) Files
(DIR) Refs
(DIR) LICENSE
---
ticemodelvec2t.py (10196B)
---
1 # run this using "python -m nose icemodelvec2t.py -m test_name" to run only one of the
2 # tests
3 import unittest
4 import numpy
5 import os
6 import PISM
7
8 ctx = PISM.Context()
9
10 # use the 360-day calendar so that the period of 1 year has a clear meaning
11 ctx.ctx.time().init_calendar("360_day")
12
13 # suppress all output
14 ctx.log.set_threshold(1)
15
16 def compare(v, value):
17 with PISM.vec.Access(nocomm=v):
18 numpy.testing.assert_almost_equal(v[0, 0], value)
19
20 class ForcingInput(unittest.TestCase):
21
22 def setUp(self):
23 "Prepare an input file with an interesting time-dependent field."
24 self.filename = "input.nc"
25 self.empty = "empty.nc"
26 self.one_record = "one_record.nc"
27 self.no_time = "no_time.nc"
28 self.no_bounds = "no_time_bounds.nc"
29 self.time_order = "time_order.nc"
30
31 self.period = 1
32
33 M = 3
34 self.grid = PISM.IceGrid.Shallow(ctx.ctx, 1, 1, 0, 0, M, M, PISM.CELL_CORNER,
35 PISM.NOT_PERIODIC)
36
37 v = PISM.IceModelVec2S(self.grid, "v", PISM.WITHOUT_GHOSTS)
38
39 units = "days since 1-1-1"
40 N = 12
41 dt = 30.0 # days (floating point)
42 t = numpy.r_[0:N] * dt
43 self.tb = numpy.r_[0:N + 1] * dt
44 self.f = numpy.sin(2.0 * numpy.pi * t / (N * dt))
45
46 def write_data(filename, use_bounds=True, forward=True):
47 bounds = PISM.TimeBoundsMetadata("time_bounds", "time", ctx.unit_system)
48 bounds.set_string("units", units)
49
50 output = PISM.util.prepare_output(filename, append_time=False)
51 output.write_attribute("time", "units", units)
52
53 if use_bounds:
54 output.write_attribute("time", "bounds", "time_bounds")
55
56 if forward:
57 order = range(N)
58 else:
59 order = range(N - 1, -1, -1)
60
61 for k in order:
62 PISM.append_time(output, "time", self.tb[k+1])
63
64 if use_bounds:
65 PISM.write_time_bounds(output, bounds, k, (self.tb[k], self.tb[k + 1]))
66
67 v.set(self.f[k])
68 v.write(output)
69
70 # regular forcing file
71 write_data(self.filename, use_bounds=True)
72
73 # file without time bounds
74 write_data(self.no_bounds, use_bounds=False)
75
76 # file with invalid time order
77 write_data(self.time_order, forward=False)
78
79 # empty file
80 PISM.util.prepare_output(self.empty)
81
82 # file with one record
83 output = PISM.util.prepare_output(self.one_record)
84 v.set(self.f[-1])
85 v.write(output)
86
87 # file without a time dimension
88 v.set(self.f[-1])
89 v.set_time_independent(True)
90 v.dump(self.no_time)
91
92 def tearDown(self):
93 "Remove files created by setUp()"
94 files = [self.filename, self.empty, self.one_record,
95 self.no_time, self.no_bounds, self.time_order]
96 for f in files:
97 os.remove(f)
98
99 def forcing(self, filename, buffer_size=12, periodic=False):
100 "Allocate and initialize forcing"
101 input_file = PISM.File(ctx.com, self.filename, PISM.PISM_NETCDF3, PISM.PISM_READONLY)
102 forcing = PISM.IceModelVec2T.ForcingField(self.grid, input_file, "v", "",
103 buffer_size, 52, periodic)
104 forcing.metadata().set_string("long_name", "test field")
105
106 if periodic:
107 forcing.init(filename, self.period, 0)
108 else:
109 forcing.init(filename, 0, 0)
110
111 return forcing
112
113 def test_missing_file(self):
114 "Missing input file"
115 try:
116 forcing = self.forcing(self.empty)
117 assert 0, "initialized with an empty file"
118 except RuntimeError:
119 pass
120
121 def test_average(self):
122 "Time average using average(t, dt)"
123 forcing = self.forcing(self.filename)
124
125 # second month
126 N = 1
127 t = self.tb[N] * 86400 + 1
128 dt = forcing.max_timestep(t).value()
129 forcing.update(t, dt)
130 forcing.average(t, dt)
131
132 compare(forcing, self.f[N])
133
134 # average with a zero length
135 forcing.average(t, 0.0)
136
137 compare(forcing, self.f[N])
138
139
140 # average() with only one record
141 forcing = self.forcing(self.filename, buffer_size=1)
142 forcing.update(0, 1)
143 forcing.average(0, 1)
144
145 compare(forcing, self.f[0])
146
147 def test_extrapolation_left(self):
148 "Extrapolation on the left"
149 forcing = self.forcing(self.filename)
150
151 t = self.tb[0] * 86400 - 1
152 dt = self.tb[1] * 86400 + 1
153 forcing.update(0, dt)
154 forcing.interp(t)
155
156 compare(forcing, self.f[0])
157
158 def test_extrapolation_right(self):
159 "Extrapolation on the right"
160 forcing = self.forcing(self.filename)
161
162 t = self.tb[-1] * 86400
163 forcing.update(0, t)
164 forcing.interp(t + 1)
165
166 compare(forcing, self.f[-1])
167
168 def test_interp_1(self):
169 "Interpolation using interp(t)"
170 forcing = self.forcing(self.filename)
171
172 N = 1
173 t = self.tb[N] * 86400 + 1
174 forcing.update(0, t)
175 forcing.interp(t)
176
177 compare(forcing, self.f[N])
178
179 def test_interp_2(self):
180 "Interpolation using init_interpolation(ts) and interp(i, j)"
181 forcing = self.forcing(self.filename)
182 N = 12
183 dt = 30.0 # days (floating point)
184 ts = numpy.r_[0:N] * dt + 0.5 * dt
185 ts = [PISM.util.convert(T, "days", "seconds") for T in ts]
186
187 forcing.update(0, self.tb[-1])
188 forcing.init_interpolation(ts)
189
190 with PISM.vec.Access(nocomm=forcing):
191 numpy.testing.assert_almost_equal(forcing.interp(0, 0), self.f)
192
193 def test_one_record(self):
194 "Input file with only one time record"
195 forcing = self.forcing(self.one_record)
196
197 self.check_forcing(forcing, self.f[-1], 0, 1)
198
199 def test_no_time_dimension(self):
200 "Forcing without a time dimension"
201 forcing = self.forcing(self.no_time)
202
203 self.check_forcing(forcing, self.f[-1], 0, 1)
204
205 def test_constant_field(self):
206 "Field initialized using init_constant()"
207 f = 100.0
208 forcing = PISM.IceModelVec2T(self.grid, "v", 1, 1)
209 forcing.init_constant(f)
210
211 self.check_forcing(forcing, f, 0, 1)
212
213 def check_forcing(self, forcing, value, t, dt):
214 forcing.update(t, dt)
215 forcing.average(t, dt)
216 compare(forcing, value)
217
218 def test_large_update_interval(self):
219 "Test update() calls with an update interval past the last available time"
220 forcing = self.forcing(self.filename)
221
222 forcing.update(1e12, 1e12+1)
223 forcing.init_interpolation([1e12])
224
225 with PISM.vec.Access(nocomm=forcing):
226 numpy.testing.assert_almost_equal(forcing.interp(0, 0), self.f[-1])
227
228 def test_buffer_too_small(self):
229 "Reading periodic data that does not fit in the buffer"
230 # Note: IceModelVec2T::init() will throw RuntimeError if the buffer is too small
231 # to hold all records of a periodic forcing field. This will never happen if this
232 # IceModelVec2T was allocated using IceModelVec2T::ForcingField because it ensures
233 # that the buffer is big enough.
234 forcing = self.forcing(self.filename, buffer_size=2, periodic=False)
235 try:
236 forcing.init(self.filename, self.period, 0)
237 assert False, "Failed to stop because the buffer size is too small"
238 except RuntimeError:
239 pass
240
241 def test_time_step_too_long(self):
242 "update() call with a time step that is too long"
243
244 N = 2
245 forcing = self.forcing(self.filename, buffer_size=N)
246
247 try:
248 dt = (N + 1) * 30 * 86400
249 forcing.update(0, dt)
250 assert False, "Failed to catch a time step that is too long"
251 except RuntimeError:
252 pass
253
254 def test_no_time_bounds(self):
255 "Forcing without time bounds"
256 try:
257 forcing = self.forcing(self.no_bounds)
258 assert False, "Loaded forcing without time bounds"
259 except RuntimeError:
260 pass
261
262 def test_decreasing_time(self):
263 "Invalid (decreasing) time"
264 try:
265 forcing = self.forcing(self.time_order)
266 assert False, "Loaded forcing with a decreasing time dimension"
267 except RuntimeError:
268 pass
269
270 def test_multiple_steps(self):
271 "miltiple update() calls with a small buffer, discarding records"
272 # In this test it is crucial that the time step (dt below) is long enough to use
273 # more than one time record. This triggers the code that discards records that are
274 # no longer needed but keeps the ones that are still of use, reducing the amount
275 # of data we have to read from a file.
276
277 forcing = self.forcing(self.filename, buffer_size=3)
278
279 def check(month):
280 t = self.tb[month] * 86400 + 1
281 dt = 2 * 30 * 86400 # long-ish time step (2 months)
282 forcing.update(t, dt)
283 forcing.interp(t)
284
285 compare(forcing, self.f[month])
286
287 # second month
288 check(1)
289 # fourth month
290 check(3)
291
292 def test_max_timestep(self):
293 "Maximum time step"
294 forcing = self.forcing(self.filename, buffer_size=1)
295
296 # time bounds in seconds
297 tb = self.tb * 86400
298
299 assert forcing.max_timestep(0).value() == tb[1] - tb[0]
300 assert forcing.max_timestep(tb[-1]).infinite()
301 assert forcing.max_timestep(tb[1] - 0.5).value() == tb[2] - tb[1]
302 assert forcing.max_timestep(tb[-1] - 0.5).infinite()
303
304 def test_periodic(self):
305 "Using periodic data"
306 forcing = self.forcing(self.filename, periodic=True)
307
308 # a year and a half
309 t = numpy.arange(18 + 1) * 30 * 86400.0
310
311 forcing.update(0, t[-1])
312
313 forcing.init_interpolation(t)
314
315 with PISM.vec.Access(nocomm=forcing):
316 numpy.testing.assert_almost_equal(forcing.interp(0, 0),
317 numpy.r_[self.f, self.f[0:(6 + 1)]])