from __future__ import absolute_import
import numpy as nm
from sfepy.base.base import output, assert_, get_default, Struct
from sfepy.homogenization.coefs_base import CoefOne, \
TCorrectorsViaPressureEVP, CoefFMSym, CoefFMOne, CorrMiniApp
from sfepy.discrete.fem.meshio import HDF5MeshIO
from sfepy.solvers.ts import TimeStepper
from six.moves import range
[docs]class PressureRHSVector( CorrMiniApp ):
def __call__( self, problem = None, data = None ):
problem = get_default( problem, self.problem )
problem.select_variables( self.variables )
problem.set_equations( self.equations )
problem.select_bcs( ebc_names = self.ebcs, epbc_names = self.epbcs )
state = problem.create_state()
state.apply_ebc()
vec = eval_term_op( state, list(self.equations.values())[0],
problem, dw_mode = 'vector' )
## print vec.max(), vec.min()
return vec
[docs]class TCorrectorsRSViaPressureEVP( TCorrectorsViaPressureEVP ):
[docs] def get_save_name_base( self ):
return self.save_name + '_%d%d'
[docs] def get_dump_name_base( self ):
return self.dump_name + '_%d%d'
def __call__( self, problem = None, data = None ):
"""data: corrs_rs, evp"""
problem = get_default( problem, self.problem )
ts = problem.get_time_solver().ts
corrs, evp = [data[ii] for ii in self.requires]
assert_( evp.ebcs == self.ebcs )
assert_( evp.epbcs == self.epbcs )
dim = problem.get_dim()
filenames = nm.zeros( (dim, dim), dtype = nm.object )
self.setup_equations(self.equations)
solve = self.compute_correctors
for ir in range( dim ):
for ic in range( dim ):
filenames[ir,ic] = self.get_dump_name() % (ir,ic)
savename = self.get_save_name() % (ir,ic)
solve(evp, -1.0, corrs.states[ir,ic], ts,
filenames[ir,ic], savename)
if self.check:
self.setup_equations(self.verify_equations)
self.init_solvers(problem)
output( 'verifying correctors %s...' % self.name )
verify = self.verify_correctors
ok = True
for ir in range( dim ):
for ic in range( dim ):
oo = verify(-1.0, corrs.states[ir,ic], filenames[ir,ic])
ok = ok and oo
output( '...done, ok: %s' % ok )
return Struct( name = self.name,
filenames = filenames )
[docs]class TCorrectorsPressureViaPressureEVP( TCorrectorsViaPressureEVP ):
[docs] def get_save_name_base( self ):
return self.save_name
[docs] def get_dump_name_base( self ):
return self.dump_name
def __call__( self, problem = None, data = None, save_hook = None ):
"""data: corrs_pressure, evp, optionally vec_g"""
problem = get_default( problem, self.problem )
ts = problem.get_time_solver().ts
corrs, evp = [data[ii] for ii in self.requires[:2]]
if len(self.requires) == 3:
vec_g = data[self.requires[2]]
else:
vec_g = None
assert_( evp.ebcs == self.ebcs )
assert_( evp.epbcs == self.epbcs )
filename = self.get_dump_name()
savename = self.get_save_name()
self.setup_equations(self.equations)
solve = self.compute_correctors
solve(evp, 1.0, corrs.state, ts, filename, savename, vec_g=vec_g)
if self.check:
self.setup_equations(self.verify_equations)
self.init_solvers(problem)
output( 'verifying correctors %s...' % self.name )
verify = self.verify_correctors
ok = verify(1.0, corrs.state, filename)
output( '...done, ok: %s' % ok )
return Struct( name = self.name,
filename = filename )
[docs]class RBiotCoef( CoefFMSym ):
"""Homogenized fading memory Biot-like coefficient."""
[docs] def get_filename( self, data, ir, ic ):
tcorrs = data[self.requires[1]]
self.ir, self.ic = ir, ic
return tcorrs.filename
[docs] def get_variables( self, problem, io, step, data, mode ):
if mode == 'col':
return
else:
pis = data[self.requires[0]]
step_data = io.read_data( step )
# omega.
var_name = self.variables[0]
c_name = problem.variables[var_name].primary_var_name
yield var_name, step_data[c_name].data
var_name = self.variables[1]
yield var_name, pis.states[self.ir,self.ic]
var_name = self.variables[2]
c_name = problem.variables[var_name].primary_var_name
pc = step_data['d'+c_name].data
if self.ir == self.ic:
yield (var_name, pc)
else:
yield (var_name, nm.zeros_like(pc))
[docs]class GBarCoef( CoefOne ):
"""
Asymptotic Barenblatt coefficient.
data = [p^{\infty}]
Note:
solving "dw_diffusion.i1.Y3( m.K, qc, pc ) = 0" solve, in fact
"C p^{\infty} = \hat{C} \hat{\pi}" with the result "\hat{p^{\infty}}",
where the rhs comes from E(P)BC.
- it is preferable to computing directly by
"\hat{p^{\infty}} = \hat{C^-1 \strip(\hat{C} \hat{\pi})}", as it checks
explicitly the residual.
"""
def __call__( self, volume, problem = None, data = None ):
expression, region_name = self.expression
problem = get_default( problem, self.problem )
problem.select_variables( self.variables )
problem.set_equations( {'eq' : expression} )
problem.time_update( conf_ebc = {}, conf_epbc = {}, conf_lcbc = {} )
pi_inf = data[self.requires[0]]
coef = nm.zeros( (1,), dtype = nm.float64 )
vec = eval_term_op( pi_inf.state, expression,
problem, new_geometries = False, dw_mode = 'vector' )
reg = problem.domain.regions[region_name]
field = problem.variables[self.variables[1]].field
nods = field.get_dofs_in_region(reg, merge=True)
coef[0] = vec[nods].sum()
coef /= volume
return coef
[docs]def eval_boundary_diff_vel_grad( problem, uc, pc, equation, region_name,
pi = None ):
get_state = problem.variables.get_state_part_view
problem.set_equations( {'eq_2' : equation} )
state = problem.create_state()
state.set_full(uc, 'uc')
state.set_full(pc, 'pc')
if pi is not None:
problem.variables['Pi'].set_data(pi)
problem.time_update( conf_ebc = {}, conf_epbc = {} )
state.apply_ebc()
aux = problem.get_evaluator().eval_residual(state())
pc = get_state( aux, 'pc', True )
pc = problem.variables.make_full_vec( pc, 'pc', 0 )
field = problem.variables['pc'].field
reg = problem.domain.regions[region_name]
nods = field.get_dofs_in_region(reg, merge=True)
val = pc[nods].sum()
# assert nm.all( problem.variables.di.ptr == problem.variables.adi.ptr )
# problem.time_update() # Restore EBC.
return val
[docs]class GPlusCoef( CoefFMOne ):
[docs] def get_filename( self, data ):
tcorrs = data[self.requires[0]]
return tcorrs.filename
def __call__( self, volume, problem = None, data = None ):
expression, region_name, aux_eq = self.expression
problem = get_default( problem, self.problem )
problem.select_variables( self.variables )
aux = self.get_filename( data )
io = HDF5MeshIO( self.get_filename( data ) )
ts = TimeStepper( *io.read_time_stepper() )
coef = nm.zeros( (ts.n_step, 1), dtype = nm.float64 )
for step, time in ts:
step_data = io.read_data( step )
var_name = self.variables[0]
c_name = problem.variables[var_name].primary_var_name
omega = step_data[c_name].data
problem.variables[var_name].set_data(omega)
val1 = eval_term_op( None, expression,
problem, call_mode = 'd_eval' )
pc = step_data[self.variables[3]].data
val2 = eval_boundary_diff_vel_grad( problem, omega, pc, aux_eq,
region_name )
coef[step] = val1 + val2
coef /= volume
return coef