Source code for simudo.io.output_writer
import csv
import logging
import warnings
import operator as OP
from contextlib import closing
from functools import reduce
import os
from cached_property import cached_property
import numpy as np
import dolfin
from ..io import h5yaml
from ..io.xdmf import XdmfPlot
from ..io.csv import LineCutCsvPlot
from ..util import SetattrInitMixin
from ..mesh import CellRegions
[docs]class OutputWriter(SetattrInitMixin):
'''Write output data extracted from a solution object.
The output can be any of a plot on a mesh, on a 1D linecut,
or in a csv file containing data from multiple solutions
during a parameter sweep.
parameter_name: label for the parameter being swept
filename_prefix: a prefix to any filenames that will be saved.
plot_mesh -- Save data on the original mesh. (default: False)
plot_1d -- Extract data along a 1D line cut and save to a csv file. (default: False)
plot_iv -- Save a csv file with terminal voltages, currents and other extracted
data. (default: True)
stepper -- (optional) give access to the stepper. Allows plotting quantities
related to solution such as error metric (du).
'''
_meta_csv = None
stepper = None
plot_iv = True
plot_1d = False
plot_mesh = False
parameter_name = 'parameter'
line_cut_resolution = 5001
@cached_property
def meta_extractors(self):
return []
[docs] def format_parameter(self, solution, parameter_value):
return '{:.14g}'.format(parameter_value)
[docs] def get_plot_prefix(self, solution, parameter_value):
return (self.filename_prefix +
'_{}={}'.format(self.parameter_name,
self.format_parameter(solution, parameter_value)))
[docs] def write_output(self, solution, parameter_value):
if os.path.dirname(self.filename_prefix) != '':
os.makedirs(os.path.dirname(self.filename_prefix), exist_ok=True)
meta = {}
meta['sweep_parameter:{}'.format(self.parameter_name)] = dict(value=parameter_value)
for extractor in self.meta_extractors:
extractor(
solution=solution, parameter_value=parameter_value,
output_writer=self, meta=meta).call()
if self.plot_mesh:
for ext in ['.xdmf', '_checkpoint.xdmf', '.h5', '_checkpoint.h5']:
try:
os.remove(self.get_plot_prefix(solution, parameter_value)+ext)
except FileNotFoundError:
pass
with closing(XdmfPlot(self.get_plot_prefix(
solution, parameter_value) + '.xdmf', None)) as mesh_plotter:
solution_plot(mesh_plotter, solution, 0, stepper=self.stepper)
with closing(XdmfPlot(self.get_plot_prefix(
solution, parameter_value) + '_checkpoint.xdmf',
None, checkpoint=True)) as mesh_plotter:
solution_plot(mesh_plotter, solution, 0, stepper=self.stepper)
if self.plot_1d:
plot_prefix = self.get_plot_prefix(solution, parameter_value)
with closing(LineCutCsvPlot(
plot_prefix + '.csv', None,
resolution=self.line_cut_resolution,
)) as plotter:
solution_plot(plotter, solution, 0, stepper=self.stepper)
h5yaml.dump(meta, plot_prefix + '.plot_meta.yaml')
if self.plot_iv:
meta_writer = self.get_meta_csv_file(
self.filename_prefix + '_{}.csv'.format(self.parameter_name), meta)
meta_writer.add_row(meta)
[docs] def get_meta_csv_file(self, filename, meta):
if self._meta_csv is None:
self._meta_csv = MetaCSVWriter(filename, meta)
return self._meta_csv
[docs]class MetaCSVWriter(SetattrInitMixin):
def __init__(self, filename, meta):
columns = []
for k, d in meta.items():
u = d.get('units', '')
typ = d.get('type', 'float')
columns.append(k)
self.columns = columns
self.file = open(filename, 'wt')
self.writer = csv.writer(self.file)
self.writer.writerow(columns)
# add a '#' at the beginning of the units line so it can be
# treated as a comment by numpy.genfromtxt(), pandas.read_csv() etc.
units = [meta[c].get('units', '') for c in columns]
units[0] = '# ' + units[0]
self.writer.writerow(units)
[docs] def add_row(self, meta):
self.writer.writerow([meta[c]['value'] for c in self.columns])
self.file.flush()
def _ensure_dict(d, k):
v = d.get(k, None)
if v is None:
v = d[k] = {}
return v
[docs]class MetaExtractorBandInfo(SetattrInitMixin):
prefix = 'band_info:'
[docs] def call(self):
for k, band in self.solution.pdd.bands.items():
pre = "{}{}:".format(self.prefix, band.name)
self.meta[pre+"sign"] = dict(
value=band.sign)
[docs]class MetaExtractorIntegrals(SetattrInitMixin):
'''
Attributes
----------
prefix: str
String to add to quantites.
facets: dict
Facet regions where to extract quantities.
cells: dict
Cell regions where to extract quantities.
solution:
Solution object.
parameter_value:
Parameter value.
meta:
Metadata dictionary to write to.
'''
prefix = ''
[docs] def call(self):
mu = self.pdd.mesh_util
for k, b in self.pdd.bands.items():
self.add_surface_flux(
'avg:current_{}'.format(k), b.j, average=True,
units='mA/cm^2')
self.add_surface_flux(
'tot:current_{}'.format(k), b.j, average=False,
units='mA')
self.add_volume_total(
'avg:g_{}'.format(k), b.g, average=True,
units='cm^-3/s')
self.add_volume_total(
'tot:g_{}'.format(k), b.g, average=False,
units='1/s')
for procname, proc in self.pdd.electro_optical_processes.items():
self.add_volume_total('tot:g_{}_{}'.format(procname, k),
proc.get_generation(b), average=False, units='1/s' )
for fieldname, field in self.solution.optical.fields.items():
self.add_volume_total('tot:gother_{}'.format(fieldname),
field.g, average=False, units='1/s')
self.add_volume_total('tot:gabs_{}'.format(fieldname),
field.alpha*field.Phi, average=False, units='1/s')
@cached_property
def pdd(self):
return self.solution.pdd
[docs] def add_quantity(self, name, location_name, value, units):
self.meta[''.join((
self.prefix, name, ':', location_name))] = dict(
value=value, units=units)
[docs] def add_surface_total(
self, k, expr_ds, expr_dS, internal=True, external=True,
average=False, units=None):
mu = self.pdd.mesh_util
for reg_name, reg in self.facets.items():
ds, dS = mu.region_ds_and_dS(reg)
dsS = ds + dS
value = mu.assemble(ds*expr_ds + dS*expr_dS)
if average:
area = mu.assemble(dsS.abs()*mu.Constant(1.0))
if area == 0:
value = (value * np.nan) / area.units
warnings.warn(ZeroAreaWarning(
"facet region {!r} has zero area".format(reg_name)))
else:
value = value / area
self.add_quantity(k, reg_name, value.m_as(units), units)
[docs] def add_surface_flux(self, k, expr, **kwargs):
mu = self.pdd.mesh_util
self.add_surface_total(
k,
expr_ds=mu.dot(expr, mu.n),
expr_dS=mu.dot(mu.avg(expr), mu.pside(mu.n)), **kwargs)
[docs] def add_volume_total(self, k, expr, average=False, units=None):
mu = self.pdd.mesh_util
for reg_name, cregion in self.cells.items():
dx = mu.region_dx(cregion)
value = mu.assemble(dx*expr)
if average:
value = value / mu.assemble(dx*mu.Constant(1.0))
self.add_quantity(k, reg_name, value.m_as(units), units)
[docs]def solution_plot(plotter, s, timestep, solver=None, stepper=None):
plotter.new(timestep)
pdd = s.pdd
mesh = pdd.mesh_util.mesh
mesh_data = pdd.mesh_data
po = pdd.poisson
ur = s.unit_registry
mu = pdd.mesh_util
CG1 = mu.space.CG1
DG0 = mu.space.DG0
DG1 = mu.space.DG1
DG2 = mu.space.DG2
VCG1 = mu.space.vCG1
add = plotter.add
Vunit = ur.V
Eunit = ur.V/ur.mesh_unit
eV = ur.eV
conc = 1/ur.cm**3
econc = ur.elementary_charge*conc
junit = ur.mA/ur.cm**2
fluxunit = 1/ur.cm**2/ur.s
alphaunit = 1/ur.cm
gunit = conc/ur.s
add('E', Eunit, po.E, VCG1)
add('phi', Vunit, po.phi, DG1)
add('thmeq_phi', ur.V, po.thermal_equilibrium_phi, DG1)
add('rho', econc, po.rho, DG1)
add('static_rho', econc, po.static_rho, DG1)
if hasattr(pdd, 'XMoleFraction'):
add('XMole', ur.dimensionless, pdd.XMoleFraction, DG1)
jays = []
for k, band in pdd.bands.items():
extent = band.extent
add('u_'+k, conc, band.u * extent, DG1)
add('thmeq_u_'+k, conc, band.thermal_equilibrium_u * extent, DG1)
add('qfl_'+k, eV, band.qfl * extent, DG2)
add('g_'+k, gunit, band.g * extent, DG1)
for procname, proc in pdd.electro_optical_processes.items():
add('g_{}_{}'.format(procname, k), gunit,
proc.get_generation(band), DG1)
add('j_'+k, junit, band.j, VCG1)
jays.append(band.j)
add('mobility_'+k, ur('cm^2/V/s'), band.mobility * extent, DG1)
if hasattr(band, 'energy_level'):
E = band.energy_level
ephi = po.phi*ur.elementary_charge
add('E_' +k, eV, E * extent, DG1)
add('Ephi_'+k, eV, E - ephi * extent, DG1)
del E, ephi
if hasattr(band, 'mixedqfl_base_w'):
add('w_{}_base'.format(k), eV, band.mixedqfl_base_w * extent, DG2)
add('w_{}_delta'.format(k), eV, band.mixedqfl_delta_w * extent, DG2)
if hasattr(band, 'number_of_states'):
add('number_of_states_'+k, conc, band.number_of_states, DG2)
if hasattr(band, 'effective_density_of_states'):
add('effective_density_of_states_'+k, conc, band.effective_density_of_states, DG2)
add('j_tot', junit, reduce(OP.add, jays), VCG1)
omu = s.optical.mesh_util
oDG1 = omu.space.DG1
for k, o in s.optical.fields.items():
add('opt_Phi_'+k, fluxunit, o.Phi, oDG1)
add('opt_gother_'+k, gunit, o.g, oDG1)
add('opt_alpha_'+k, alphaunit, o.alpha, oDG1)
add('opt_gabs_'+k, gunit, o.alpha*o.Phi, oDG1)
if stepper is not None:
split_du = pdd.mixed_function_helper.solution_mixed_space.split(stepper.du)
# These units should match the trial units in poisson_drift_diffusion1.py1
add('du_E', Eunit, split_du['poisson/E'], VCG1)
add('du_phi', Vunit, split_du['poisson/phi'], DG1)
for k, b in pdd.bands.items():
add(f'du_{k}_delta_w', eV, split_du[f'{k}/delta_w'], DG1 )
add(f'du_{k}_j', ur.A / ur.mesh_unit**2, split_du[f'{k}/j'], VCG1)