#!/usr/bin/python
"""
Provides several Dataloader objects which open different kinds of data files
- typically acquired at different sources (i.e. beamlines at various
synchrotrons) - and crunch them into the same shape and form.
The output form is an :class:`argparse.Namespace` object like this::
Namespace(data,
xscale,
yscale,
zscale,
angles,
theta,
phi,
E_b,
hv)
Where the entries are as follows:
====== ========================================================================
data np.array of shape (z,y,x); this has to be a 3D array even for 2D data
- z=1 in that case. x, y and z are the lengths of the x-, y- and
z-scales, respectively. The convention (z,y,x) is used over (x,y,z)
as a consequence of matplotlib.pcolormesh transposing the data when
plotting.
xscale np.array of shape(x); the x axis corresponding to the data.
yscale np.array of shape(y); the y axis corresponding to the data.
zscale np.array of shape(z); the z axis corresponding to the data.
angles 1D np.array; corresponding angles on the momentum axis of the
analyzer. Depending on the beamline (analyzer slit orientation) this
is expressed as theta or tilt. Usually coincides with either of x-,
y- and zscales.
theta float or 1D np.array; the value of theta (or tilt in rotated analyzer
slit orientation). Is mostly used for angle-to-k conversion.
phi float; value of the azimuthal angle phi. Mostly used for angle-to-k
conversion.
E_b float; typical binding energy for the electrons represented in the
data. In principle there is not just a single binding energy but as
this is only used in angle-to-k conversion, where the typical
variations of the order <10eV doen't matter it suffices to give an
average or maximum value.
hv float or 1D np.array; the used photon energy in the scan(s). In Case
of a hv-scan, this obviously coincides with one of the x-, y- or
z-scales.
====== ========================================================================
Note that any change in the output structure has consequences for all
programs and routines that receive from a dataloader (which is pretty much
everything in this module) and previously pickled files.
"""
import ast
import os
import pickle
import re
import zipfile
from argparse import Namespace
from errno import ENOENT
from warnings import catch_warnings, simplefilter, warn
import h5py
import numpy as np
import astropy.io.fits as pyfits
from igor import binarywave
# Fcn to build the x, y (, z) ranges (maybe outsource this fcn definition)
[docs]def start_step_n(start, step, n) :
"""
Return an array that starts at value `start` and goes `n`
steps of `step`.
"""
end = start + n*step
return np.linspace(start, end, n)
[docs]class Dataloader() :
"""
Base dataloader class (interface) from which others inherit some
methods (specifically the ``__repr__()`` function).
The `date` attribute should indicate the last date that this specific
dataloader worked properly for files of its type (as beamline filetypes
may vary with time).
"""
name = 'Base'
date = ''
def __init__(self, *args, **kwargs) :
pass
def __repr__(self, *args, **kwargs) :
return '<class Dataloader_{}>'.format(self.name)
[docs] def print_m(self, *messages) :
""" Print message to console, adding the dataloader name. """
s = '[Dataloader {}]'.format(self.name)
print(s, *messages)
[docs]class Dataloader_Pickle(Dataloader) :
"""
Load data that has been saved using python's `pickle` module. ARPES
pickle files are assumed to just contain the data namespace the way it
would be returned by any Dataloader of this module.
"""
name = 'Pickle'
[docs] def load_data(self, filename) :
# Open the file and get a handle for it
with open(filename, 'rb') as f :
filedata = pickle.load(f)
return filedata
[docs]class Dataloader_i05(Dataloader) :
"""
Dataloader object for the i05 beamline at the Diamond Light Source.
"""
name = 'i05'
[docs] def load_data(self, filename) :
# Read file with h5py reader
infile = h5py.File(filename, 'r')
data = np.array(infile['/entry1/analyser/data']).T
angles = np.array(infile['/entry1/analyser/angles'])
energies = np.array(infile['/entry1/analyser/energies'])
hv = np.array(infile['/entry1/instrument/monochromator/energy'])
zscale = energies
yscale = angles
# Find which xscale is appropriate
#"""
#sapolar : map
#salong : combined x & y scan along beam
#saperp : combined x & y scan perpendicular to beam
#"""
#for z_name in ['salong', 'saperp'] :
# index = '/entry1/analyser/{}'.format(z_name)
# try :
# zscale = np.array(infile[index])
# except KeyError :
# continue
# Check if we have a scan
if data.shape[2] == 1 :
xscale = energies
zscale = np.array([0])
data = data.T
else :
# Otherwise, extract third dimension from scan command
command = infile['entry1/scan_command'][()]
# Special case for 'pathgroup'
if command.split()[1] == 'pathgroup' :
self.print_m('is pathgroup')
# Extract points from a ([polar, x, y], [polar, x, y], ...)
# tuple
points = command.split('(')[-1].split(')')[0]
tuples = points.split('[')[1:]
xscale = []
for t in tuples :
point = t.split(',')[0]
xscale.append(float(point))
xscale = np.array(xscale)
# Now, if this was a scan with varying centre_energy, the
# zscale contains a list of energies...
# for now, just take the first one
# zscale = zscale[0]
# Special case for 'scangroup'
elif command.split()[1] == 'scan_group' :
self.print_m('is scan_group')
# Extract points from a ([polar, x, y], [polar, x, y], ...)
# tuple
points = command.split('((')[-1].split('))')[0]
points = '((' + points + '))'
xscale = np.array(ast.literal_eval(points))[:,0]
# Now, if this was a scan with varying centre_energy, the
# zscale contains a list of energies...
# for now, just take the first one
zscale = zscale[0]
# "Normal" case
else :
start_stop_step = command.split()[2:5]
start, stop, step = [float(s) for s in start_stop_step]
xscale = np.arange(start, stop+0.5*step, step)
# What we usually call theta is tilt in this beamline
theta = infile['entry1/instrument/manipulator/satilt'][0]
phi = infile['entry1/instrument/manipulator/sapolar'][0]
# Take the mean of the given binding energies as an estimate
try :
E_b = -np.mean(infile['entry1/analyser/binding_energies'])
except KeyError :
E_b = -np.mean(infile['entry1/analyser/energies'])
res = Namespace(
data = data,
xscale = xscale,
yscale = yscale,
zscale = zscale,
angles = angles,
theta = theta,
phi = phi,
E_b = E_b,
hv = hv
)
return res
[docs]class Dataloader_ALS(Dataloader) :
"""
Object that allows loading and saving of ARPES data from the MAESTRO
beamline at ALS, Berkely, in the newer .h5 format
Organization of the ALS h5 file (June, 2018)::
/-0D_Data
| |
| +-Cryostat_A
| +-Cryostat_B
| +-Cryostat_C
| +-Cryostat_D
| +-I0_NEXAFS
| +-IG_NEXAFS
| +-X <--- only present for xy scans (probably)
| +-Y <--- "
| +-Sorensen Program <--- only present for dosing scans
| +-time
|
+-1D_Data
| |
| +-Swept_SpectraN <--- Not always present
|
+-2D_Data
| |
| +-Swept_SpectraN <--- Usual location of data. There can be
| several 'Swept_SpectraN', each with an
| increasing value of N. The relevant data
| seems to be in the highest numbered
| Swept_Spectra.
|
+-Comments
| |
| +-PreScan
|
+-Headers
|
+-Beamline
| |
| +-[...]
| +-EPU_POL <--- Polarization (Integer encoded)
| +-BL_E <--- Beamline energy (hv)
| +-[...]
|
+-Computer
+-DAQ_Swept
| |
| +-[...]
| +-SSPE_0 <--- Pass energy (eV)
| +-[...]
| |
+-FileFormat
+-Low_Level_Scan
+-Main
+-Motors_Logical
+-Motors_Logical_Offset
+-Motors_Physical
+-Motors_Sample <--- Contains sample coordinates (xyz & angles)
| |
| +-[...]
| +-SMOTOR3 <--- Theta
| +-SMOTOR5 <--- Phi
| +-[...]
|
+-Motors_Sample_Offset
+-Notebook
"""
name = 'ALS'
[docs] def get(self, group, field_name) :
"""
Return the value of property *field_name* from h5File group *group*.
*field_name* must be a bytestring (e.g. ``b'some_string'``)!
This also returns a bytes object, remember to cast it correctly.
Returns *None* if *field_name* was not found in *group*.
"""
for entry in group[()] :
if entry[1].strip() == field_name :
return entry[2]
[docs] def load_data(self, filename) :
h5file = h5py.File(filename, 'r')
# The relevant data seems to be the highest numbered "Swept_SpectraN"
# entry in "2D_Data". Find the highest N:
Ns = [int(spectrum[-1]) for spectrum in list(h5file['2D_Data'])]
i = np.argmax(Ns)
path = '2D_Data/Swept_Spectra{}'.format(Ns[i])
# Convert the data to a numpy array
self.print_m('Converting data to numpy array (this may take a '
'while)...')
data = np.array(h5file[path])
# Get hv
hv = self.get(h5file['Headers/Beamline/'], b'BL_E')
hv = float(hv)
# Build x-, y- and zscales
# TODO Detect cuts (i.e. factual 2D Datasets)
nz, ny, nx = data.shape
# xscale is the outer loop (if present)
x_group = h5file['Headers/Low_Level_Scan']
x_start = self.get(x_group, b'ST_0_0')
x_end = self.get(x_group, b'EN_0_0')
if x_start is not None :
xscale = np.linspace(float(x_start), float(x_end), nx)
else :
xscale = np.arange(nx)
# y- and zscale are the axis of one spectrum (i.e. pixels and energies)
attributes = h5file[path].attrs
y_start, z_start = attributes['scaleOffset']
y_step, z_step = attributes['scaleDelta']
yscale = start_step_n(y_start, y_step, ny)
zscale = start_step_n(z_start, z_step, nz)
# zscale, yscale, xscale = [np.arange(s) for s in shape]
# TODO pixel to angle conversion
yscale *= 0.193/2
# Get theta and phi
theta = float(self.get(h5file['Headers/Motors_Sample'], b'SMOTOR3'))
phi = float(self.get(h5file['Headers/Motors_Sample'], b'SMOTOR5'))
res = Namespace(data=data,
xscale=xscale,
yscale=yscale,
zscale=zscale,
angles=yscale,
theta=theta,
phi=phi,
E_b=0,
hv=hv)
return res
[docs]class Dataloader_ALS_fits(Dataloader) :
"""
Object that allows loading and saving of ARPES data from the MAESTRO
beamline at ALS, Berkely, which is in .fits format.
"""
name = 'ALS .fits'
# A factor required to reach a reasonable result for the k space
# coordinates
k_stretch = 1.05
# Scanmode aliases
CUT = 'null'
MAP = 'Slit Defl'
HV = 'mono_eV'
DOPING = 'Sorensen Program'
def __init__(self, work_func=4) :
# Assign a value for the work function
self.work_func = work_func
[docs] def load_data(self, filename) :
# Open the file
hdulist = pyfits.open(filename)
# Access the BinTableHDU
self.bintable = hdulist[1]
# Try to fix FITS standard violations
self.bintable.verify('fix')
# Get the header to extract metadata from
header = hdulist[0].header
# Find out what scan mode we're dealing with
scanmode = header['NM_0_0']
self.print_m('Scanmode: ', scanmode)
# Find out whether we are in fixed or swept (dithered) mode which has
# an influence on how the data is stored in the fits file.
swept = True if ('SSSW0' in header) else False
# Case normal cut
if scanmode == self.CUT :
data = self.load_cut()
# Case FSM
elif scanmode == self.MAP :
data = self.load_map(swept)
# Case hv scan
elif scanmode == self.HV :
data = self.load_hv_scan()
# Case doping scan
elif scanmode == self.DOPING :
#data = self.load_doping_scan()
data = self.load_hv_scan()
else :
raise(IndexError('Couldn\'t determine scan type'))
nz, ny, nx = data.shape
# Determine whether the file uses the SS or SF prefix for metadata
if 'SSX0_0' in header :
pre = 'SS'
elif 'SFX0_0' in header :
pre = 'SF'
else :
raise(IndexError('Neither SSX0_0 nor SFX0_0 appear in header.'))
# Starting pixel (energy scale)
e0 = header[pre+'X0_0']
# Ending pixel (energy scale)
e1 = header[pre+'X1_0']
# Pixels per eV
ppeV = header[pre+'PEV_0']
# Zero pixel (=Fermi level?)
fermi_level = header[pre+'KE0_0']
# Create the energy (y) scale
energy_binning = 2
energies_in_px = np.arange(e0, e1, energy_binning)
energies = (energies_in_px - fermi_level)/ppeV
# Starting and ending pixels (angle or ky scale)
# DEPRECATED
#y0 = header[pre+'Y0_0']
#y1 = header[pre+'Y1_0']
# Use this arbitrary seeming conversion factor (found in Denys'
# script) to get from pixels to angles
#deg_per_px = 0.193 / 2
deg_per_px = 0.193 * self.k_stretch
#angle_binning = 3
angle_binning = 2
#angles_in_px = np.arange(y0, y1, angle_binning)
#n_y = int((y1-y0)/angle_binning)
#angles_in_px = np.arange(0, n_y, 1)
angles_in_px = np.arange(0, ny, 1)
# NOTE
# Actually, I would think that we have to multiply by
# `angle_binning` to get the right result. That leads to
# unreasonable results, however, and division just gives a
# perfect looking output. Maybe the factor 0.193/2 from ALS has
# changed with the introduction of binning?
#angles = angles_in_px * deg_per_px / angle_binning
angles = angles_in_px * deg_per_px / angle_binning
# Case cut
if scanmode == self.CUT :
# NOTE x and y scale may need to be swapped
yscale = energies
xscale = angles
zscale = None
# Case map
elif scanmode == self.MAP :
x0 = header['ST_0_0']
x1 = header['EN_0_0']
#n_x = header['N_0_0']
#xscale = np.linspace(x0, x1, n_x) * self.k_stretch
xscale = np.linspace(x0, x1, nx) #* self.k_stretch
yscale = angles
zscale = energies
# Case hv scan
elif scanmode == self.HV :
z0 = header['ST_0_0']
z1 = header['EN_0_0']
#nz = header['N_0_0']
angles_in_px = np.arange(0, nx, 1)
angles = angles_in_px * deg_per_px / angle_binning
xscale = angles
yscale = energies
zscale = np.linspace(z0, z1, nz)
# Case doping
elif scanmode == self.DOPING :
z0 = header['ST_0_0']
z1 = header['EN_0_0']
# Not sure how the sqrt2 comes in
angles_in_px = np.arange(0, nx, 1) * np.sqrt(2)
angles = angles_in_px * deg_per_px / angle_binning
xscale = angles
# For some reason the energy determination from file fails here...
yscale = np.arange(ny, dtype=float)
zscale = np.linspace(z0, z1, nz)
# For the binding energy, just take a min value as its variations
# are small compared to the photon energy
E_b = energies.min()
# Extract some additional metadata (mostly for angles->k conversion)
# TODO Special cases for different scan types
# Get relevant metadata from header
theta = header['LMOTOR3']
# beta in LMOTOR4
phi = header['LMOTOR5']
# The photon energy may vary in the case that this is a hv_scan
hv = header['MONO_E']
# In recent ALS data the above determined x, y and z scales did not
# match the actual shape of the data...
# self.print_m(*data.shape)
# self.print_m(nx, ny, nz)
scales = [zscale, yscale, xscale]
for i,scale in enumerate(scales) :
try :
length = len(scale)
except Exception :
self.print_m('length problem')
continue
# self.print_m(length)
n = data.shape[i]
if length != n :
self.print_m(('Shape mismatch in dim {}: {} != {}. Setting ' +
'scale to arange.').format(i, length, n))
scales[i] = np.arange(n)
zscale, yscale, xscale = scales
# NOTE angles==xscale and E_b can be extracted from yscale, thus it
# is not really necessary to pass them on here.
res = Namespace(
data = data,
xscale = xscale,
yscale = yscale,
zscale = zscale,
angles = angles,
theta = theta,
phi = phi,
E_b = E_b,
hv = hv
)
return res
[docs] def load_cut(self) :
""" Read data from a 'cut', which is just one slice of energy vs k. """
# Access the actual data which is in the last element of the last
# element of the bin table...
fields = self.bintable.data[-1]
data = fields[-1]
# Reshape towards GUI expectations
x, y = data.shape
data = data.reshape(1, x, y)
return data
[docs] def load_map(self, swept) :
"""
Read data from a 'map', i.e. several energy vs k slices and bring
them in the right shape for the gui, which is
(energy, k_parallel, k_perpendicular)
"""
bt_data = self.bintable.data
n_slices = len(bt_data)
first_slice = bt_data[0][-1]
# The shape of the returned array must be (energy, kx, ky)
# (n_slices corresponds to ky)
if swept :
n_energy, n_kx = first_slice.shape
data = np.zeros([n_energy, n_kx, n_slices])
for i in range(n_slices) :
this_slice = bt_data[i][-1]
data[:,:,i] = this_slice
else :
n_kx, n_energy = first_slice.shape
data = np.zeros([n_energy, n_kx, n_slices])
for i in range(n_slices) :
this_slice = bt_data[i][-1]
# Reshape the slice to (energy, kx)
this_slice = this_slice.transpose()
data[:,:,i] = this_slice
# Convert to numpy array
data = np.array(data)
return data
[docs] def load_hv_scan(self) :
"""
Read data from a hv scan, i.e. a series of energy vs k cuts
(shape (n_kx, n_energy), each belonging to a different photon energy
hv. The returned shape in this case must be
(photon_energies, energy, k)
The same is used for doping scans, where the output shape is
(doping, energy, k)
"""
bt_data = self.bintable.data
n_hv = len(bt_data)
# = header['N_0_0']
first_cut = bt_data[0][-1]
n_kx, n_energy = first_cut.shape # should all be accessible from header
data = np.zeros([n_hv, n_kx, n_energy])
for i in range(n_hv) :
this_slice = bt_data[i][-1]
data[i,:,:] = this_slice
# Convert to numpy array
data = np.array(data)
return data
[docs]class Dataloader_SIS(Dataloader) :
"""
Object that allows loading and saving of ARPES data from the SIS
beamline at PSI which is in hd5 format.
"""
name = 'SIS'
# Number of cuts that need to be present to assume the data as a map
# instead of a series of cuts
min_cuts_for_map = 10
def __init__(self, filename=None) :
pass
[docs] def load_data(self, filename) :
"""
Extract and return the actual 'data', i.e. the recorded map/cut.
Also return labels which provide some indications what the data means.
"""
# Note: x and y are a bit confusing here as the hd5 file has a
# different notion of zero'th and first dimension as numpy and then
# later pcolormesh introduces yet another layer of confusion. The way
# it is written now, though hard to read, turns out to do the right
# thing and leads to readable code from after this point.
if filename.endswith('h5') :
return self.load_h5(filename)
elif filename.endswith('zip') :
return self.load_zip(filename)
else :
raise NotImplementedError('File suffix not supported.')
[docs] def load_zip(self, filename) :
""" Load and store a deflector mode file from SIS-ULTRA. """
# Prepare metadata key-value pairs for the different metadata files
# and their expected types
keys1 = [
('width', 'n_energy', int),
('height', 'n_x', int),
('depth', 'n_y', int),
('first_full', 'first_energy', int),
('last_full', 'last_energy', int),
('widthoffset', 'start_energy', float),
('widthdelta', 'step_energy', float),
('heightoffset', 'start_x', float),
('heightdelta', 'step_x', float),
('depthoffset', 'start_y', float),
('depthdelta', 'step_y', float)
]
keys2 = [('Excitation Energy', 'hv', float)]
# Load the zipfile
with zipfile.ZipFile(filename, 'r') as z :
# Get the created filename from the viewer
with z.open('viewer.ini') as viewer :
file_id = self.read_viewer(viewer)
# Get most metadata from a metadata file
with z.open('Spectrum_' + file_id + '.ini') as metadata_file :
M = self.read_metadata(keys1, metadata_file)
# Get additional metadata from a second metadata file...
with z.open(file_id + '.ini') as metadata_file2 :
M2 = self.read_metadata(keys2, metadata_file2)
# Extract the binary data from the zipfile
with z.open('Spectrum_' + file_id + '.bin') as f :
data_flat = np.frombuffer(f.read(), dtype='float32')
# Put the data back into its actual shape
data = np.reshape(data_flat, (int(M.n_y), int(M.n_x), int(M.n_energy)))
# Cut off unswept region
data = data[:,:,M.first_energy:M.last_energy+1]
# Put into shape (energy, other angle, angle along analyzer)
data = np.moveaxis(data, 2, 0)
# Create axes
xscale = start_step_n(M.start_x, M.step_x, M.n_x)
yscale = start_step_n(M.start_y, M.step_y, M.n_y)
energies = start_step_n(M.start_energy, M.step_energy, M.n_energy)
energies = energies[M.first_energy:M.last_energy+1]
res = Namespace(
data = data,
xscale = xscale,
yscale = yscale,
zscale = energies,
hv = M2.hv
)
return res
[docs] def read_viewer(self, viewer) :
""" Extract the file ID from a SIS-ULTRA deflector mode output file. """
for line in viewer.readlines() :
l = line.decode('UTF-8')
if l.startswith('name') :
# Make sure to split off unwanted whitespace
return l.split('=')[1].split()[0]
[docs] def load_h5(self, filename) :
""" Load and store the full h5 file and extract relevant information. """
# Load the hdf5 file
self.datfile = h5py.File(filename, 'r')
# Extract the actual dataset and some metadata
h5_data = self.datfile['Electron Analyzer/Image Data']
attributes = h5_data.attrs
# Convert to array and make 3 dimensional if necessary
shape = h5_data.shape
# Access data chunk-wise, which is much faster.
# This improvement has been contributed by Wojtek Pudelko and makes data
# loading from SIS Ultra orders of magnitude faster!
if len(shape) == 3:
data = np.zeros(shape)
for i in range(shape[2]):
data[:, :, i] = h5_data[:, :, i]
else:
data = np.array(h5_data)
# How the data needs to be arranged depends on the scan type: cut,
# map, hv scan or a sequence of cuts
# Case cut
if len(shape) == 2 :
x = shape[0]
y = shape[1]
# Make data 3D
data = data.reshape(1, x, y)
# N_E = y
N_E = 1
# Extract the limits
xlims = attributes['Axis1.Scale']
ylims = attributes['Axis0.Scale']
# elims = ylims
elims = [1, 1]
# shape[2] should hold the number of cuts. If it is reasonably large,
# we have a map. Otherwise just a sequence of cuts.
# Case map
elif shape[2] > self.min_cuts_for_map :
self.print_m('Is a map?')
x = shape[1]
y = shape[2]
N_E = shape[0]
# Extract the limits
xlims = attributes['Axis2.Scale']
ylims = attributes['Axis1.Scale']
elims = attributes['Axis0.Scale']
# Case sequence of cuts
else :
x = shape[0]
y = shape[1]
N_E = y
z = shape[2]
# Reshape data
#new_data = np.zeros([z, x, y])
#for i in range(z) :
# cut = data[:,:,i]
# new_data[i] = cut
#data = new_data
data = np.rollaxis(data, 2, 0)
# Extract the limits
xlims = attributes['Axis1.Scale']
ylims = attributes['Axis0.Scale']
elims = ylims
# Construct x, y and energy scale (x/ylims[1] contains the step size)
xscale = start_step_n(*xlims, y)
yscale = start_step_n(*ylims, x)
energies = start_step_n(*elims, N_E)
# xscale = self.make_scale(xlims, y)
# yscale = self.make_scale(ylims, x)
# energies = self.make_scale(elims, N_E)
# Extract some data for ang2k conversion
metadata = self.datfile['Other Instruments']
theta = metadata['Theta'][0]
#theta = metadata['Tilt'][0]
phi = metadata['Phi'][0]
hv = attributes['Excitation Energy (eV)']
angles = xscale
E_b = min(energies)
res = Namespace(
data = data,
xscale = xscale,
yscale = yscale,
zscale = energies,
angles = angles,
theta = theta,
phi = phi,
E_b = E_b,
hv = hv
)
return res
# def make_scale(self, limits, nstep) :
# """
# Helper function to construct numbers starting from limits[0]
# and going in steps of limits[1] for nstep steps.
# """
# start = limits[0]
# step = limits[1]
# end = start + (nstep+1)*step
# return np.linspace(start, end, nstep)
[docs]class Dataloader_ADRESS(Dataloader) :
""" ADRESS beamline at SLS, PSI. """
name = 'ADRESS'
[docs] def load_data(self, filename) :
h5file = h5py.File(filename, 'r')
# The actual data is in the field: 'Matrix'
matrix = h5file['Matrix']
# The scales can be extracted from the matrix' attributes
scalings = matrix.attrs['IGORWaveScaling']
units = matrix.attrs['IGORWaveUnits']
info = matrix.attrs['IGORWaveNote']
# Convert `units` and `info`, which is a bytestring of ASCII
# characters, to lists of strings
metadata = info.decode('ASCII').split('\n')
units = [b.decode('ASCII') for b in units[0]]
# Put the data into a numpy array and convert to float
data = np.array(matrix, dtype=float)
shape = data.shape
# 'IGORWaveUnits' contains a list of the form
# ['', 'degree', 'eV', units[3]]. The first three elements should
# always be the same, but the third one may vary or not even exist.
# Use this to determine the scan type.
# Convert to np.array bring it in the shape the gui expects, which is
# [energy/hv/1, kparallel/"/kparallel, kperpendicular/"/energy] and
# prepare the x,y,z # scales
# Note: `scalings` is a 3/4 by 2 array where every line contains a
# (step, start) pair
if len(units) == 3 :
# Case cut
# Make data 3-dimensional by adding an empty dimension
data = data.reshape(1, shape[0], shape[1])
data = np.rollaxis(data, 2, 1)
# Shape has changed
shape = data.shape
xstep, xstart = scalings[1]
ystep, ystart = scalings[2]
zscale = None
else :
# Case map or hv scan (or...?)
data = np.rollaxis(data, 1, 0)
# Shape has changed
shape = data.shape
xstep, xstart = scalings[3]
ystep, ystart = scalings[1]
zstep, zstart = scalings[2]
zscale = start_step_n(zstart, zstep, shape[0])
# Binned data may contain some sort of scaling
xscale = start_step_n(xstart, xstep, shape[2])
yscale = start_step_n(ystart, ystep, shape[1])
def convert_raw(raw) :
"""
Try to convert a string which is expected to be either just a
decimal number or a MATLAB-like range expression
(START:STEP:STOP) to either a float or an np.array.
"""
if ':' in raw :
# raw is of the form start:step:end
start, step, end = [float(n) for n in raw.split(':')]
res = np.arange(start, end*step, step)
else :
res = float(raw)
return res
hv_raw = metadata[0].split('=')[-1]
hv = convert_raw(hv_raw)
theta_raw = metadata[8].split('=')[-1]
theta = convert_raw(theta_raw)
phi_raw = metadata[10].split('=')[-1]
phi = convert_raw(phi_raw)
angles = xscale
# For the binding energy just take the minimum of the energies
E_b = yscale.min()
res = Namespace(
data = data,
xscale = xscale,
yscale = yscale,
zscale = zscale,
angles = angles,
theta = theta,
phi = phi,
E_b = E_b,
hv = hv
)
return res
[docs]class Dataloader_CASSIOPEE(Dataloader) :
""" CASSIOPEE beamline at SOLEIL synchrotron, Paris. """
name = 'CASSIOPEE'
date = '18.07.2018'
# Possible scantypes
HV = 'hv'
FSM = 'FSM'
[docs] def load_data(self, filename) :
"""
Single cuts are stored as two files: One file contians the data and
the other the metadata. Maps, hv scans and other *external
loop*-scans are stored as a directory containing these two files for
each cut/step of the external loop. Thus, this dataloader
distinguishes between directories and single files and changes its
behaviour accordingly.
"""
if os.path.isfile(filename) :
return self.load_from_file(filename)
else :
if not filename.endswith('/') : filename += '/'
return self.load_from_dir(filename)
[docs] def load_from_dir(self, dirname) :
"""
Load 3D data from a directory as it is output by the IGOR macro used
at CASSIOPEE. The dir is assumed to contain two files for each cut::
BASENAME_INDEX_i.txt -> beamline related metadata
BASENAME_INDEX_ROI1_.txt -> data and analyzer related metadata
To be more precise, the assumptions made on the filenames in the
directory are:
* the INDEX is surrounded by underscores (`_`) and appears after
the first underscore.
* the string ``ROI`` appears in the data filename.
"""
# Get the all filenames in the dir
all_filenames = os.listdir(dirname)
# Remove all non-data files
filenames = []
for name in all_filenames :
if 'ROI' in name :
filenames.append(name)
# Get metadata from first file in list
skip, energy, angles = self.get_metadata(dirname+filenames[0])
# Get the data from each cut separately. This happens in the order
# they appear in os.listdir() which is usually not what we want -> a
# reordering is necessary later.
unordered = {}
i_min = np.inf
i_max = -np.inf
for name in filenames :
# Keep track of the min and max indices in the directory
i = int(name.split('_')[1])
if i < i_min : i_min = i
if i > i_max : i_max = i
# Get the data of cut i
this_cut = np.loadtxt(dirname+name, skiprows=skip+1)[:,1:]
unordered.update({i: this_cut})
# Properly rearrange the cuts
data = []
for i in range(i_min, i_max+1) :
data.append(unordered[i])
data = np.array(data)
# Get the z-axis from the metadata files
scantype, outer_loop, hv = self.get_outer_loop(dirname, filenames)
self.print_m('Scantype: {}'.format(scantype))
if scantype == self.HV :
yscale = energy
zscale = outer_loop
hv = outer_loop
elif scantype == self.FSM :
yscale = outer_loop
zscale = energy
# For a map, we expect output of the form (Energy, k_para,
# k_perp), currently it is (tilt, energy, theta) -> reshape
# with moveaxis
data = np.moveaxis(data, 0, 1)
else :
yscale = energy
zscale = np.arange(data.shape[0])
xscale = angles
res = Namespace(
data = data,
xscale = xscale,
yscale = yscale,
zscale = zscale,
angles = angles,
theta = 1,
phi = 1,
E_b = 0,
hv = hv)
return res
[docs] def load_from_file(self, filename) :
"""
Load just a single cut. However, at CASSIOPEE they output .ibw files
if the cut does not belong to a scan...
"""
if filename.endswith('.ibw') :
return self.load_from_ibw(filename)
else :
return self.load_from_txt(filename)
[docs] def load_from_ibw(self, filename) :
"""
Load scan data from an IGOR binary wave file. Luckily someone has
already written an interface for this (the python `igor` package).
"""
wave = binarywave.load(filename)['wave']
data = np.array([wave['wData']])
# The `header` contains some metadata
header = wave['wave_header']
nDim = header['nDim']
steps = header['sfA']
starts = header['sfB']
# Construct the x and y scales from start, stop and n
yscale = start_step_n(starts[0], steps[0], nDim[0])
xscale = start_step_n(starts[1], steps[1], nDim[1])
# Convert `note`, which is a bytestring of ASCII characters that
# contains some metadata, to a list of strings
note = wave['note']
note = note.decode('ASCII').split('\r')
# Now the extraction fun begins. Most lines are of the form
# `Some-kind-of-name=some-value`
metadata = dict()
for line in note :
# Split at '='. If it fails, we are not in a line that contains
# useful information
try :
name, val = line.split('=')
except ValueError :
continue
# Put the pair in a dictionary for later access
metadata.update({name: val})
# NOTE Unreliable hv
hv = metadata['Excitation Energy']
res = Namespace(
data = data,
xscale = xscale,
yscale = yscale,
zscale = None,
angles = xscale,
theta = 0,
phi = 0,
E_b = 0,
hv = hv)
return res
[docs] def load_from_txt(self, filename) :
i, energy, angles = self.get_metadata(filename)
self.print_m('Loading from txt')
data0 = np.loadtxt(filename, skiprows=i+1)
# The first column in the datafile contains the angles
angles_from_data = data0[:,0]
data = np.array([data0[:,1:]])
res = Namespace(
data = data,
xscale = angles,
yscale = energy,
zscale = None,
angles = angles,
theta = 1,
phi = 1,
E_b = 0,
hv = 1)
return res
[docs] def get_outer_loop(self, dirname, filenames) :
"""
Try to determine the scantype and the corresponding z-axis scale from
the additional metadata textfiles. These follow the assumptions made
in :meth:`self.load_from_dir
<arpys.dataloaders.Dataloader_CASSIOPEE.load_from_dir>`.
Additionally, the MONOCHROMATOR section must come before the
UNDULATOR section as in both sections we have a key `hv` but only the
former makes sense.
Return a string for the scantype, the extracted z-scale and the value
for hv for non-hv-scans (scantype, zscale, hvs[0]) or (None,
None, hvs[0]) in case of failure.
"""
# Step 1) Extract metadata from metadata file
# Prepare containers
indices, xs, ys, zs, thetas, phis, tilts, hvs = ([], [], [], [], [],
[], [], [])
containers = [indices, xs, ys, zs, thetas, phis, tilts, hvs]
for name in filenames :
# Get the index of the file
index = int(name.split('_')[1])
# Build the metadata-filename by substituting the ROI part with i
metafile = re.sub(r'_ROI.?_', '_i', name)
# The values are separated from the names by a `:`
splitchar = ':'
# Read in the file
with open(dirname + metafile, 'r') as f :
for line in f.readlines() :
if line.startswith('x (mm)') :
x = float(line.split(splitchar)[-1])
elif line.startswith('y (mm)') :
y = float(line.split(splitchar)[-1])
elif line.startswith('z (mm)') :
z = float(line.split(splitchar)[-1])
elif line.startswith('theta (deg)') :
theta = float(line.split(splitchar)[-1])
elif line.startswith('phi (deg)') :
phi = float(line.split(splitchar)[-1])
elif line.startswith('tilt (deg)') :
tilt = float(line.split(splitchar)[-1])
elif line.startswith('hv (eV)') :
hv = float(line.split(splitchar)[-1])
elif line.startswith('UNDULATOR') :
break
# NOTE The order of this list has to match the order of the
# containers
values = [index, x, y, z, theta, phi, tilt, hv]
for i,container in enumerate(containers) :
container.append(values[i])
# Step 2) Check which parameters vary to determine scantype
if hvs[1] != hvs[0] :
scantype = self.HV
zscale = hvs
elif thetas[1] != thetas[0] :
scantype = self.FSM
zscale = thetas
else :
scantype = None
zscale = None
# Step 3) Put zscale in order and return
if zscale is not None :
zscale = np.array(zscale)[np.argsort(indices)]
return scantype, zscale, hvs[0]
# +-------+ #
# | Tools | # ==================================================================
# +-------+ #
# List containing all reasonably defined dataloaders
all_dls = [
Dataloader_SIS,
Dataloader_ADRESS,
Dataloader_i05,
Dataloader_CASSIOPEE,
Dataloader_ALS,
Dataloader_ALS_fits,
Dataloader_Pickle
]
# Function to try all dataloaders in all_dls
[docs]def load_data(filename, exclude=None, suppress_warnings=False) :
"""
Try to load some dataset *filename* by iterating through `all_dls`
and appliyng the respective dataloader's load_data method. If it works:
great. If not, try with the next dataloader.
Collects and prints all raised exceptions in case that no dataloader
succeeded.
"""
# Sanity check: does the given path even exist in the filesystem?
if not os.path.exists(filename) :
raise FileNotFoundError(ENOENT, os.strerror(ENOENT), filename)
# If only a single string is given as exclude, pack it into a list
if exclude is not None and type(exclude)==str :
exclude = [exclude]
# Keep track of all exceptions in case no loader succeeds
exceptions = dict()
# Suppress warnings
with catch_warnings() :
if suppress_warnings :
simplefilter('ignore')
for dataloader in all_dls :
# Instantiate a dataloader object
dl = dataloader()
# Skip to the next if this dl is excluded (continue brings us
# back to the top of the loop, starting with the next element)
if exclude is not None and dl.name in exclude :
continue
# Try loading the data
try :
namespace = dl.load_data(filename)
except Exception as e :
# Temporarily store the exception
exceptions.update({dl : e})
# Try the next dl
continue
# Reaching this point must mean we succeeded. Print warnings from
# this dataloader, if any occurred
print('[arpys]Loaded data with {}.'.format(dl))
try :
print(dl, ': ', exceptions[dl])
except KeyError :
pass
return namespace
# Reaching this point means something went wrong. Print all exceptions.
for dl in exceptions :
print('[arpys]', dl)
e = exceptions[dl]
print('[arpys]Exception {}: {}'.format(type(e), e))
raise Exception('Could not load data {}.'.format(filename))
# Function to create a python pickle file from a data namespace
[docs]def dump(D, filename, force=False) :
""" Wrapper for :func:`pickle.dump`. Does not overwrite if a file of
the given name already exists, unless *force* is True.
**Parameters**
======== ==================================================================
D python object to be stored.
filename str; name of the output file to create.
force boolean; if True, overwrite existing file.
======== ==================================================================
"""
# Check if file already exists
if not force and os.path.isfile(filename) :
question = 'File <{}> exists. Overwrite it? (y/N)'.format(filename)
answer = input(question)
# If the answer is anything but a clear affirmative, stop here
if answer.lower() not in ['y', 'yes'] :
return
with open(filename, 'wb') as f :
pickle.dump(D, f)
message = 'Wrote to file <{}>.'.format(filename)
print(message)
[docs]def load_pickle(filename) :
""" Shorthand for loading python objects stored in pickle files.
**Parameters**
======== ==================================================================
filename str; name of file to load.
======== ==================================================================
"""
with open(filename, 'rb') as f :
return pickle.load(f)
[docs]def update_namespace(D, *attributes) :
""" Add arbitrary attributes to a :class:`Namespace <argparse.Namespace>`.
**Parameters**
========== ================================================================
D argparse.Namespace; the namespace holding the data and
metadata. The format is the same as what is returned by a
dataloader.
attributes tuples or len(2) lists; (name, value) pairs of the attributes
to add. Where `name` is a str and value any python object.
========== ================================================================
"""
for name, attribute in attributes :
D.__dict__.update( {name: attribute} )
[docs]def add_attributes(filename, *attributes) :
""" Add arbitrary attributes to an argparse.Namespace that is stored as a
python pickle file. Simply opens the file, updates the namespace with
:func:`update_namespace <arpys.dataloaders.update_namespace>` and writes
back to file.
**Parameters**
========== ================================================================
filename str; name of the file to update.
attributes tuples or len(2) lists; (name, value) pairs of the attributes
to add. Where `name` is a str and value any python object.
========== ================================================================
"""
dataloader = Dataloader_Pickle()
D = dataloader.load_data(filename)
update_namespace(D, *attributes)
dump(D, filename, force=True)
# +---------+ #
# | Testing | # ================================================================
# +---------+ #
if __name__ == '__main__' :
D = Dataloader_SIS().load_data('/home/kevin/qmap/experiments/2020_09_SIS/PrFeP_3P/PrFeP_3P_0001.zip')
print(D.data.shape)
print(D.xscale.shape)