Source code for simudo.io.output_writer

import csv
import logging
import warnings
import operator as OP
from contextlib import closing
from functools import reduce

from cached_property import cached_property
import numpy as np

import dolfin

from ..io import h5yaml
from ..io.csv import LineCutCsvPlot
from ..util import SetattrInitMixin

[docs]class ZeroAreaWarning(RuntimeWarning): pass
[docs]class OutputWriter(SetattrInitMixin): '''Write output data extracted from a solution object. The output can be any of a plot on a mesh, on a 1D linecut, or in a csv file containing data from multiple solutions during a parameter sweep. filename_prefix: a prefix to any filenames that will be saved. plot_mesh: Save data on the original mesh. plot_1d: Extract data along a 1D line cut and save to a csv file. ''' _meta_csv = None @cached_property def meta_extractors(self): return []
[docs] def format_parameter(self, solution, parameter_value): return '{:.14g}'.format(parameter_value)
[docs] def get_plot_prefix(self, solution, parameter_value): return (self.filename_prefix + ' parameter={} csvplot'.format( self.format_parameter(solution, parameter_value)))
[docs] def get_iv_prefix(self, solution, parameter_value): return self.filename_prefix
[docs] def write_output(self, solution, parameter_value): meta = {} meta['parameter_value'] = dict(value=parameter_value) for extractor in self.meta_extractors: extractor( solution=solution, parameter_value=parameter_value, output_writer=self, meta=meta).call() if self.plot_1d: plot_prefix = self.get_plot_prefix(solution, parameter_value) with closing(LineCutCsvPlot( plot_prefix + '.csv', None)) as plotter: solution_plot_1d(plotter, solution, 0) h5yaml.dump(meta, plot_prefix + '.plot_meta.yaml') meta_writer = self.get_meta_csv_file( self.filename_prefix + ' meta.csv', meta) meta_writer.add_row(meta)
[docs] def get_meta_csv_file(self, filename, meta): if self._meta_csv is None: self._meta_csv = MetaCSVWriter(filename, meta) return self._meta_csv
[docs]class MetaCSVWriter(SetattrInitMixin): def __init__(self, filename, meta): columns = [] for k, d in meta.items(): u = d.get('units', '') typ = d.get('type', 'float') columns.append(k) self.columns = columns self.iv_file = iv_file = open(filename, 'wt') self.iv_writer = wr = csv.writer(iv_file) wr.writerow(columns) wr.writerow([meta[c].get('units', '') for c in columns])
[docs] def add_row(self, meta): self.iv_writer.writerow([meta[c]['value'] for c in self.columns]) self.iv_file.flush()
def _ensure_dict(d, k): v = d.get(k, None) if v is None: v = d[k] = {} return v
[docs]class MetaExtractorBandInfo(SetattrInitMixin): prefix = 'band_info:'
[docs] def call(self): for k, band in self.solution.pdd.bands.items(): pre = "{}{}:".format(self.prefix, band.name) self.meta[pre+"sign"] = dict( value=band.sign)
[docs]class MetaExtractorIntegrals(SetattrInitMixin): ''' Attributes ---------- prefix: str String to add to quantites. facets: dict Facet regions where to extract quantities. cells: dict Cell regions where to extract quantities. solution: Solution object. parameter_value: Parameter value. meta: Metadata dictionary to write to. ''' prefix = ''
[docs] def call(self): mu = self.pdd.mesh_util for k, b in self.pdd.bands.items(): self.add_surface_flux( 'avg:current_{}'.format(k), b.j, average=True, units='mA/cm^2') self.add_surface_flux( 'tot:current_{}'.format(k), b.j, average=False, units='mA') self.add_volume_total( 'avg:g_{}'.format(k), b.g, average=True, units='cm^-3/s') self.add_volume_total( 'tot:g_{}'.format(k), b.g, average=False, units='1/s')
@cached_property def pdd(self): return self.solution.pdd
[docs] def add_quantity(self, name, location_name, value, units): self.meta[''.join(( self.prefix, name, ':', location_name))] = dict( value=value, units=units)
[docs] def add_surface_total( self, k, expr_ds, expr_dS, internal=True, external=True, average=False, units=None): mu = self.pdd.mesh_util for reg_name, reg in self.facets.items(): ds, dS = mu.region_ds_and_dS(reg) dsS = ds + dS value = mu.assemble(ds*expr_ds + dS*expr_dS) if average: area = mu.assemble(dsS.abs()*mu.Constant(1.0)) if area == 0: value = (value * np.nan) / area.units warnings.warn(ZeroAreaWarning( "facet region {!r} has zero area".format(reg_name))) else: value = value / area self.add_quantity(k, reg_name, value.m_as(units), units)
[docs] def add_surface_flux(self, k, expr, **kwargs): mu = self.pdd.mesh_util self.add_surface_total( k, expr_ds=mu.dot(expr, mu.n), expr_dS=mu.dot(mu.avg(expr), mu.pside(mu.n)), **kwargs)
[docs] def add_volume_total(self, k, expr, average=False, units=None): mu = self.pdd.mesh_util for reg_name, cregion in self.cells.items(): dx = mu.region_dx(cregion) value = mu.assemble(dx*expr) if average: value = value / mu.assemble(dx*mu.Constant(1.0)) self.add_quantity(k, reg_name, value.m_as(units), units)
[docs]def solution_plot_1d(plotter, s, timestep, solver=None): plotter.new(timestep) pdd = s.pdd mesh = pdd.mesh_util.mesh po = pdd.poisson ur = s.unit_registry mu = pdd.mesh_util CG1 = mu.space.CG1 DG0 = mu.space.DG0 DG1 = mu.space.DG1 DG2 = mu.space.DG2 VCG1 = mu.space.vCG1 add = plotter.add Vunit = ur.V Eunit = ur.V/ur.mesh_unit eV = ur.eV conc = 1/ur.cm**3 econc = ur.elementary_charge*conc junit = ur.mA/ur.cm**2 Iunit = 1/ur.cm**2/ur.s αunit = 1/ur.cm gunit = conc/ur.s if 1: add('E', Eunit, po.E, VCG1) add('phi', ur.V, po.phi, DG1) add('thmeq_phi', ur.V, po.thermal_equilibrium_phi, DG1) add('rho', econc, po.rho, DG1) add('static_rho', econc, po.static_rho, DG1) if 1: jays = [] for k, band in pdd.bands.items(): add('u_'+k, conc, band.u, DG1) add('thmeq_u_'+k, conc, band.thermal_equilibrium_u, DG1) add('qfl_'+k, eV, band.qfl, DG2) add('g_'+k, conc/ur.s, band.g, DG1) for procname, proc in pdd.electro_optical_processes.items(): add('g_{}_{}'.format(procname, k), conc/ur.s, proc.get_generation(band), DG1) add('j_'+k, junit, band.j, VCG1) add('mobility_'+k, ur('cm^2/V/s'), band.mobility, DG1) jays.append(band.j) if hasattr(band, 'energy_level'): E = band.energy_level ephi = po.phi*ur.elementary_charge add('E_' +k, eV, E, DG1) add('Ephi_'+k, eV, E - ephi, DG1) del E, ephi if hasattr(band, 'mixedqfl_base_w'): add('w_{}_base'.format(k), eV, band.mixedqfl_base_w, DG2) add('w_{}_delta'.format(k), eV, band.mixedqfl_delta_w, DG2) add('j_tot', junit, reduce(OP.add, jays), VCG1) omu = s.optical.mesh_util oCG1 = omu.space.CG1 if 1: for k, o in s.optical.fields.items(): add('Phi_'+k, ur('1/cm^2/s'), o.Phi, oCG1) add('opt_g_'+k, ur('1/cm^3/s'), o.g, oCG1) add('opt_alpha_'+k, ur('1/cm'), o.alpha, oCG1)
[docs]def get_contact_currents(solution, contact_name): '''Integrate current flux across the area of a contact. Returns the electron and hole currents as Pint quantities.''' sl = solution # components of electron and hole currents normal to mesh facets jvn = sl['/pde/dot'](sl['/CB/j'], dolfin.FacetNormal(sl['/mesh'])) jvp = sl['/pde/dot'](sl['/VB/j'], dolfin.FacetNormal(sl['/mesh'])) int_over_surf = sl['/pde/integrate_over_surface'] hole_current = int_over_surf(jvp, sl['/geometry/facet_regions/' + contact_name + '/ds']) elec_current = int_over_surf(jvn, sl['/geometry/facet_regions/' + contact_name + '/ds']) return elec_current, hole_current
''' # this is broken, don't use class WriteIVFile(object): def __init__(self, filename, solution): from csv import writer iv_file = open(filename, 'w') self.iv_writer = writer(iv_file) # This is ugly... is there a better way? self.contacts = [n.split('/')[-1] for n in solution['/poisson/V_contact_bc_values'].keys()] column_names = [] column_units = [] for c in self.contacts: cname = c column_names.extend([cname + '_voltage', cname + '_elec_current', cname + '_hole_current']) if len(column_units) == 0: units = ['# V', 'A', 'A'] else: units = ['V', 'A', 'A'] column_units.extend(units) self.rate_quantities = ['r_cv', 'r_ci', 'r_iv', 'og_cv', 'og_ci', 'og_iv'] column_names.extend(self.rate_quantities) column_units.extend(['1/s']*len(self.rate_quantities)) self.iv_writer.writerow(column_names) self.iv_writer.writerow(column_units) def write_row(self, solution): sl = solution u = sl['/unit_registry'] avg_over_surf = sl['/pde/average_over_surface'] row = [] for c in self.contacts: cname = c.split('/')[-1] ec, hc = [i.m_as(u.A) for i in get_contact_currents(solution, cname)] v = avg_over_surf(sl['/common/' + cname + '_bias'], sl['/geometry/facet_regions/' + cname + '/ds']).m_as(u.V) row.extend([v, ec, hc]) for r in self.rate_quantities: row.append(sl['/pde/assemble'](sl['/pde/dx']*sl['/strbg/' + r]).m_as(1./u.s)) self.iv_writer.writerow(row) '''