xafsManager/EXAFS_Monitor/monitor_gui.py
2019-12-17 18:04:59 +01:00

425 lines
17 KiB
Python

import zmq
import os, sys
import numpy as np
from scipy.optimize import curve_fit
from scipy.interpolate import LSQUnivariateSpline
from scipy.ndimage import gaussian_filter1d
from scipy.integrate import simps
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtGui
from time import time
FPS_LIMIT = 10
EXAFS_SLOWDOWN_FACTOR = 10
class ZeroMQ_Listener(QtCore.QObject):
message = QtCore.pyqtSignal(list)
def __init__(self):
QtCore.QObject.__init__(self)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect ("tcp://127.0.0.1:6400")
self.socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.running = True
def loop(self):
buf = []
t0 = time()
while self.running:
try:
string = self.socket.recv()
buf.append(string)
if time() - t0 > 1./FPS_LIMIT:
self.message.emit(buf[:])
buf = []
t0 = time()
except:
pass
def finish(self):
self.socket.close()
self.context.term()
class EXAFS_Extractor(QtCore.QObject):
message = QtCore.pyqtSignal(list)
def __init__(self):
QtCore.QObject.__init__(self)
self.running = True
self.pre1 = -150
self.pre2 = -50
self.post1 = 50
def victoreen(self, e, c ,d):
f = 1.23986e4/e
return c*f**3 - d*f**4
def find_e0_idx(self, e, mu):
d = gaussian_filter1d(np.gradient(mu) / np.gradient(e), 3)
return np.argmax(d), d
def pre_edge(self, e, idx, mu):
idx1 = np.abs(e[idx] + self.pre1 - e).argmin()
idx2 = np.abs(e[idx] + self.pre2 - e).argmin()
popt, pcov = curve_fit(self.victoreen, e[idx1:idx2], mu[idx1:idx2])
return self.victoreen(e, popt[0], popt[1])
def post_edge(self, e, idx, mu, pre, kw = 2):
# k indexes starts from e0
f = 0.262467 # magic number for calculation of k
k = np.sqrt(f * (e[idx:]-e[idx]))
kmin_idx = np.abs(e[idx] + self.post1 - e).argmin() - idx
fun_fit = (mu[idx:] - pre[idx:]) * k**kw
n_knots = int(2.0*(k.max() - k.min() ) / np.pi) + 1
knots = np.linspace(k[kmin_idx], k[-2], n_knots)
spl = LSQUnivariateSpline(k, fun_fit, knots)
post = spl(k[kmin_idx:])/(k[kmin_idx:])**kw
edge_step = post[0]
norm = mu - pre
norm[idx + kmin_idx:] += -post + edge_step
norm /= edge_step
chi = norm[idx:] - 1
return k, gaussian_filter1d(chi*k*k, 3)
def hann_win(self, k, kmin, kmax, dk):
win = np.empty_like(k)
for i in range (len(k)):
if (k[i] < (kmin - 0.5*dk)):
win[i] = 0.0
elif (k[i] >= (kmin - 0.5*dk)) and (k[i] <= (kmin + 0.5*dk)):
win[i] = 0.5*(1.0 - np.cos(np.pi*(k[i] - kmin + 0.5*dk)/dk ) )
elif (k[i] > (kmin + 0.5*dk)) and (k[i] < (kmax - 0.5*dk)):
win[i] = 1.0
elif (k[i] >= (kmax - 0.5*dk)) and (k[i] <= (kmax + 0.5*dk)):
win[i] = 0.5*(1.0 + np.cos(np.pi*(k[i] - kmax + 0.5*dk)/dk ) )
elif (k[i] >(kmax + 0.5*dk)):
win[i] = 0.0
return win
def make_ft(self, k, chi, r):
ft_im = np.empty_like(r)
ft_re = np.empty_like(r)
ft_mag = np.empty_like(r)
win = self.hann_win(k, 2.0, max(k) - 1., 1.0)
for i in range (len(r)):
ft_im[i] = simps (np.sin(2.0*k*r[i])*chi*win, k)
ft_re[i] = simps (np.cos(2.0*k*r[i])*chi*win, k)
ft_mag[i] = 1.0/np.sqrt(np.pi)*np.sqrt((ft_im[i]*ft_im[i] + ft_re[i]*ft_re[i]))
return ft_mag
def calculate(self, list_data):
e, mu1, mu2 = np.array(list_data[0]), np.array(list_data[1]), np.array(list_data[2])
e0_idx1, e0_idx2, deriv1, deriv2 = None, None, [], []
try:
e0_idx1, deriv1 = self.find_e0_idx(e, mu1)
pre_edge1 = self.pre_edge(e, e0_idx1, mu1)
k1, chi1 = self.post_edge(e, e0_idx1, mu1, pre_edge1)
except:
k1, chi1 = [], []
try:
e0_idx2, deriv2 = self.find_e0_idx(e, mu2)
pre_edge2 = self.pre_edge(e, e0_idx2, mu2)
k2, chi2 = self.post_edge(e, e0_idx2, mu2, pre_edge2)
except:
k2, chi2 = [], []
r1, ft_mag1, r2, ft_mag2 = [], [], [], []
if len(k1) > 0:
try:
r1 = np.arange(0.0, 6.0, 0.02)
ft_mag1 = self.make_ft(k1, chi1, r1)
except:
r1, ft_mag1 = [], []
if len(k2) > 0:
try:
r2 = np.arange(0.0, 6.0, 0.02)
ft_mag2 = self.make_ft(k2, chi2, r2)
except:
r2, ft_mag2 = [], []
self.message.emit([e0_idx1, deriv1, k1, chi1, r1, ft_mag1, e0_idx2, deriv2, k2, chi2, r2, ft_mag2, e])
class MonitorWidget(QtGui.QWidget):
message_data = QtCore.pyqtSignal(list)
def __init__(self, app):
QtGui.QWidget.__init__(self)
self.app = app
self.thread = QtCore.QThread()
self.zeromq_listener = ZeroMQ_Listener()
self.zeromq_listener.moveToThread(self.thread)
self.thread.started.connect(self.zeromq_listener.loop)
self.zeromq_listener.message.connect(self.signal_received)
QtCore.QTimer.singleShot(0, self.thread.start)
self.thread2 = QtCore.QThread()
self.exafs_extractor = EXAFS_Extractor()
self.exafs_extractor.moveToThread(self.thread2)
self.exafs_extractor.message.connect(self.update_exafs)
self.message_data.connect(self.exafs_extractor.calculate)
QtCore.QTimer.singleShot(1, self.thread2.start)
self.layout = QtGui.QGridLayout()
self.pw1 = pg.PlotWidget(title='I0 [counts] / Energy[eV]')
self.pw2 = pg.PlotWidget(title='I1 [counts] / Energy[eV]')
self.pw3 = pg.PlotWidget(title='I2 [counts] / Energy[eV]')
self.pw4 = pg.PlotWidget(title='IP [counts] / Energy[eV]')
self.hbl = QtGui.QHBoxLayout()
self.flabel = QtGui.QLabel("N/A")
self.flabel.setStyleSheet("color: #fff")
self.fbtn = QtGui.QPushButton("Open data folder")
self.fbtn.clicked.connect(self.open_folder)
self.fbtn.setSizePolicy(QtGui.QSizePolicy.Maximum, QtGui.QSizePolicy.Maximum)
self.fbtn.setStyleSheet('''
QPushButton{
color: #777; font-family:'Arial';
font-weight: bold;
font-size:16px;
border: 2px solid #777;
border-radius: 10px;
min-width: 200px;
padding: 7px;
outline: none;
}'''
)
self.viewlabel = QtGui.QLabel("Currents")
self.viewlabel.setAlignment(QtCore.Qt.AlignCenter)
self.viewlabel.setStyleSheet('''QLabel{
color: #fff; font-family:'Arial';
font-weight: bold;
font-size:16px;
min-width: 200px;
max-width: 200px;
outline: none;
}'''
)
self.btn = QtGui.QPushButton("Switch to XANES")
self.btn.clicked.connect(self.switchView)
self.view = 0
self.btn.setSizePolicy(QtGui.QSizePolicy.Maximum, QtGui.QSizePolicy.Maximum)
self.btn.setStyleSheet('''
QPushButton{
color: #777; font-family:'Arial';
font-weight: bold;
font-size:16px;
border: 2px solid #777;
border-radius: 10px;
min-width: 200px;
padding: 7px;
outline: none;
}'''
)
self.hbl.addWidget(self.fbtn)
self.hbl.addWidget(self.flabel)
self.hbl.addWidget(self.viewlabel)
self.hbl.addWidget(self.btn)
self.datafilename = None
self.layout.addLayout(self.hbl, 0, 0, 1, 2)
self.layout.addWidget(self.pw1, 1, 0)
self.layout.addWidget(self.pw2, 1, 1)
self.layout.addWidget(self.pw3, 2, 0)
self.layout.addWidget(self.pw4, 2, 1)
self.setLayout(self.layout)
self.app = app
self.counter = 0
self.energy = []
self.denergy = []
self.data_i0 = []
self.data_i1 = []
self.data_i2 = []
self.data_ip = []
self.mu = []
self.muf = []
self.dmu = []
self.dmuf = []
self.k = []
self.kf = []
self.chi = []
self.chif = []
self.r = []
self.ft_mag = []
self.rf = []
self.ft_magf = []
self.e0i = None
self.e0if = None
self.curve1 = self.pw1.plot(self.energy, self.data_i0, pen = "b")
self.curve2 = self.pw2.plot(self.energy, self.data_i1, pen = "g")
self.curve3 = self.pw3.plot(self.energy, self.data_i2, pen = "r")
self.curve4 = self.pw4.plot(self.energy, self.data_ip, pen = "c")
self.pen_orchid = pg.mkPen(color=(230, 170, 250))
self.pen_lime = pg.mkPen(color=(190, 250, 0))
self.text2 = pg.TextItem("", anchor=(0.5, 2))
self.arrow2 = pg.ArrowItem(angle=-90)
self.text4 = pg.TextItem("", anchor=(0.5, 2))
self.arrow4 = pg.ArrowItem(angle=-90)
def signal_received(self, message_buf):
for message in message_buf:
if message == "test":
pass
elif message == "clear":
self.counter = 0
self.energy = []
self.denergy = []
self.data_i0 = []
self.data_i1 = []
self.data_i2 = []
self.data_ip = []
self.mu = []
self.muf = []
self.dmu = []
self.dmuf = []
self.k = []
self.chi = []
self.kf = []
self.chif = []
self.r = []
self.rf = []
self.ft_mag = []
self.ft_magf = []
self.e0i = None
self.e0if = None
self.updatePlot()
elif (message[-4:] == ".dat") or (message[-4:] == ".nxs"):
self.flabel.setText(message)
self.datafilename = message
else:
data = [float(v) for v in message.split()]
self.energy.append(data[0])
self.data_i0.append(data[1])
self.data_i1.append(data[2])
self.data_i2.append(data[3])
self.data_ip.append(data[4])
self.mu.append(np.log(data[1]/data[2]))
self.muf.append(data[4]/data[1])
self.updatePlot()
if not self.counter%EXAFS_SLOWDOWN_FACTOR:
self.message_data.emit([self.energy, self.mu, self.muf])
self.counter += 1
self.app.processEvents()
def switchView(self):
self.view = (self.view + 1)%3
if self.view == 0:
self.viewlabel.setText("Currents")
self.btn.setText("Switch to XANES")
self.pw1.setTitle("I0 [counts] / Energy[eV]")
self.pw2.setTitle("I1 [counts] / Energy[eV]")
self.pw3.setTitle("I2 [counts] / Energy[eV]")
self.pw4.setTitle("IP [counts] / Energy[eV]")
self.pw2.removeItem(self.text2)
self.pw2.removeItem(self.arrow2)
self.pw4.removeItem(self.text4)
self.pw4.removeItem(self.arrow4)
elif self.view == 1:
self.viewlabel.setText("XANES")
self.btn.setText("Switch to EXAFS")
self.pw1.setTitle("mu trans. / Energy[eV]")
self.pw2.setTitle("deriv. mu trans. / Energy[eV]")
self.pw3.setTitle("mu fluo. / Energy[eV]")
self.pw4.setTitle("deriv. mu fluo. / Energy[eV]")
elif self.view == 2:
self.viewlabel.setText("EXAFS")
self.btn.setText("Switch to Currents")
self.pw1.setTitle(u"\u03C7" + "(k)*k^2 trans. / k[" + u"\u212B" + "^-1]")
self.pw2.setTitle("| FT(" + u"\u03C7" + "(k)*k^2 trans." + ") | / R[" + u"\u212B" + "]")
self.pw3.setTitle(u"\u03C7" + "(k)*k^2 fluo. / k[" + u"\u212B" + "^-1]")
self.pw4.setTitle("| FT(" + u"\u03C7" + "(k)*k^2 fluo." + ") | / R[" + u"\u212B" + "]")
self.pw2.removeItem(self.text2)
self.pw2.removeItem(self.arrow2)
self.pw4.removeItem(self.text4)
self.pw4.removeItem(self.arrow4)
#https://github.com/pyqtgraph/pyqtgraph/issues/821
self.pw1.getAxis('left').textWidth = 0
self.pw1.getAxis('left')._updateWidth()
self.pw2.getAxis('left').textWidth = 0
self.pw2.getAxis('left')._updateWidth()
self.pw3.getAxis('left').textWidth = 0
self.pw3.getAxis('left')._updateWidth()
self.pw4.getAxis('left').textWidth = 0
self.pw4.getAxis('left')._updateWidth()
self.pw1.enableAutoRange()
self.pw2.enableAutoRange()
self.pw3.enableAutoRange()
self.pw4.enableAutoRange()
self.updatePlot()
def updatePlot(self):
if self.view == 0:
self.curve1.setData(self.energy, self.data_i0)
self.curve1.setPen("b")
self.curve2.setData(self.energy, self.data_i1)
self.curve2.setPen("g")
self.curve3.setData(self.energy, self.data_i2)
self.curve3.setPen("r")
self.curve4.setData(self.energy, self.data_ip)
self.curve4.setPen("c")
elif self.view == 1:
self.curve1.setData(self.energy, self.mu)
self.curve1.setPen(self.pen_lime)
self.curve2.setData(self.denergy, self.dmu)
self.curve2.setPen(self.pen_lime)
self.pw2.removeItem(self.text2)
self.pw2.removeItem(self.arrow2)
if self.e0i is not None:
self.text2.setText("%.2f"%self.denergy[self.e0i])
self.text2.setPos(self.denergy[self.e0i], self.dmu[self.e0i])
self.arrow2.setPos(self.denergy[self.e0i], self.dmu[self.e0i])
self.pw2.addItem(self.text2)
self.pw2.addItem(self.arrow2)
self.pw4.removeItem(self.text4)
self.pw4.removeItem(self.arrow4)
self.curve3.setData(self.energy, self.muf)
self.curve3.setPen(self.pen_orchid)
self.curve4.setData(self.denergy, self.dmuf)
self.curve4.setPen(self.pen_orchid)
if self.e0if is not None:
self.text4.setText("%.2f"%self.denergy[self.e0if])
self.text4.setPos(self.denergy[self.e0if], self.dmuf[self.e0if])
self.arrow4.setPos(self.denergy[self.e0if], self.dmuf[self.e0if])
self.pw4.addItem(self.text4)
self.pw4.addItem(self.arrow4)
elif self.view == 2:
self.curve1.setData(self.k, self.chi)
self.curve1.setPen(self.pen_lime)
self.curve2.setData(self.r, self.ft_mag)
self.curve2.setPen(self.pen_lime)
self.curve3.setData(self.kf, self.chif)
self.curve3.setPen(self.pen_orchid)
self.curve4.setData(self.rf, self.ft_magf)
self.curve4.setPen(self.pen_orchid)
def update_exafs(self, data_list):
self.e0i, self.dmu = data_list[0], data_list[1][:]
self.k, self.chi = data_list[2][:], data_list[3][:]
self.r, self.ft_mag = data_list[4][:], data_list[5][:]
self.e0if, self.dmuf = data_list[6], data_list[7][:]
self.kf, self.chif = data_list[8][:], data_list[9][:]
self.rf, self.ft_magf = data_list[10][:], data_list[11][:]
self.denergy = data_list[12][:]
self.updatePlot()
def open_folder(self):
if self.datafilename:
os.system('xdg-open "%s"' %os.path.dirname(self.datafilename))
def closeEvent(self, event):
self.zeromq_listener.running = False
self.zeromq_listener.finish()
self.thread.quit()
self.thread.wait()
self.thread2.quit()
self.thread2.wait()
self.app.processEvents()
if __name__ == '__main__':
app = QtGui.QApplication([])
mw = MonitorWidget(app)
mw.setStyleSheet("background-color: #000")
mw.setWindowTitle("Data monitor")
mw.show()
sys.exit(app.exec_())