# Python Workbench prototype # Simple derived components for testing # (C)Sky Coyote, June 2007 # Import required code import pwb from PyQt4 import QtCore, QtGui from numpy import * import pyfits import plots import controls # Create uninitialized Qt QApplication app = None # Simple data source component class Signal(pwb.Component): def __init__(self, name, data): pwb.Component.__init__(self, name) # Make 2 copies of input list self.data = data[:] self.data2 = data[:] def run(self): if self.done(): return if len(self.outputs) < 1: return # Send 1 element from data list out to all connections e = self.data.pop(0) for i in range(len(self.outputs)): self.send(i, e) def done(self): # Done if no more data to send return len(self.data) < 1 def describe(self): pwb.Component.describe(self) print "data = " + repr(self.data) print "data2 = " + repr(self.data2) def reset(self): pwb.Component.reset(self) # Reset data from second copy self.data = self.data2[:] # Simple data consumer component class Display(pwb.Component): def __init__(self, name): pwb.Component.__init__(self, name) self.data = [] def run(self): if self.done(): return for i in self.inputs: if len(i['queue']) > 0: # Get datum from every input and accumulate in list e = i['queue'].pop(0) print self.name + " got " + repr(e) + " from " + i['component'].name self.data.append(e) def done(self): # Done if no more inputs for i in self.inputs: if len(i['queue']) > 0: return False return True def describe(self): pwb.Component.describe(self) print "data = " + repr(self.data) def reset(self): pwb.Component.reset(self) self.data = [] # Data consumer with attached Qt plot window class Display2(pwb.Component): def __init__(self, name): pwb.Component.__init__(self, name) self.data = [] # Create Qt QApplication if does not yet exist global app if app == None: app = QtGui.QApplication([]) # Create a plot window showing current component data self.window = plots.PlotWindow(self.name, self.data) self.window.show() def run(self): if self.done(): return for i in self.inputs: if len(i['queue']) > 0: e = i['queue'].pop(0) print self.name + " got " + repr(e) + " from " + i['component'].name self.data.append(e) # Scroll the data if >= 100 points if len(self.data) > 100: self.data = self.data[-100:] # Update the plot window to show the current data self.window.setData(self.data) # Allow QApplication to process events (like drawing) global app app.processEvents() def done(self): for i in self.inputs: if len(i['queue']) > 0: return False return True def describe(self): pwb.Component.describe(self) print "data = " + repr(self.data) print "window = " + repr(self.window) def reset(self): pwb.Component.reset(self) self.data = [] self.window.setData(self.data) global app app.processEvents() # Add all inputs and send to all outputs class Add(pwb.Component): def run(self): if self.done(): return # Wait for data on all inputs for i in self.inputs: if len(i['queue']) < 1: return sum = 0 for i in self.inputs: # Add together a datum from all inputs e = i['queue'].pop(0) print self.name + " got " + repr(e) + " from " + i['component'].name sum += e # Send sum to all outputs for i in range(len(self.outputs)): self.send(i, sum) def done(self): # Done if no more inputs for i in self.inputs: if len(i['queue']) > 0: return False return True # Add all inputs and send to all outputs, # without waiting for data on all inputs first class Add2(pwb.Component): def run(self): if self.done(): return sum = 0 for i in self.inputs: if len(i['queue']) > 0: # Add together data from any available inputs e = i['queue'].pop(0) print self.name + " got " + repr(e) + " from " + i['component'].name sum += e for i in range(len(self.outputs)): self.send(i, sum) def done(self): # Done if no more inputs for i in self.inputs: if len(i['queue']) > 0: return False return True class Subtract(pwb.Component): def run(self): if self.done(): return for i in self.inputs: if len(i['queue']) < 1: return difference = 0 for index, i in enumerate(self.inputs): e = i['queue'].pop(0) print self.name + " got " + repr(e) + " from " + i['component'].name if index == 0: difference = e else: difference -= e for i in range(len(self.outputs)): self.send(i, difference) def done(self): for i in self.inputs: if len(i['queue']) > 0: return False return True class Multiply(pwb.Component): def run(self): if self.done(): return for i in self.inputs: if len(i['queue']) < 1: return product = 1 for i in self.inputs: e = i['queue'].pop(0) print self.name + " got " + repr(e) + " from " + i['component'].name product *= e for i in range(len(self.outputs)): self.send(i, product) def done(self): for i in self.inputs: if len(i['queue']) > 0: return False return True class Divide(pwb.Component): def run(self): if self.done(): return for i in self.inputs: if len(i['queue']) < 1: return quotient = 1 for index, i in enumerate(self.inputs): e = i['queue'].pop(0) print self.name + " got " + repr(e) + " from " + i['component'].name if index == 0: quotient = e else: quotient /= e for i in range(len(self.outputs)): self.send(i, quotient) def done(self): for i in self.inputs: if len(i['queue']) > 0: return False return True # Simple Euler integration of 1 input class Integrate(pwb.Component): def __init__(self, name): pwb.Component.__init__(self, name) self.sum = 0 self.dt = 0.1 self.damping = 1.0 def run(self): if self.done(): return if len(self.inputs) > 0: i = self.inputs[0] if len(i['queue']) > 0: # Get input from first connection only e = i['queue'].pop(0) print self.name + " got " + repr(e) + " from " + i['component'].name # Accumulate self.sum += e * self.dt # Apply decay to state self.sum *= self.damping for i in range(len(self.outputs)): self.send(i, self.sum) # Never ending def done(self): return False def describe(self): pwb.Component.describe(self) print "sum = " + repr(self.sum) print "damping = " + repr(self.damping) def reset(self): pwb.Component.reset(self) self.sum = 0 # Additive inverse of 1 input class Minus(pwb.Component): def run(self): if self.done(): return if len(self.inputs) > 0: i = self.inputs[0] if len(i['queue']) > 0: # Get input from first connection only e = i['queue'].pop(0) print self.name + " got " + repr(e) + " from " + i['component'].name # Send inverse to all outputs for o in range(len(self.outputs)): self.send(o, -e) # Never ending def done(self): return False # Get user input from a slider class Slider(pwb.Component): def __init__(self, name): pwb.Component.__init__(self, name) self.value = 1.0 # Create app if necessary and then slider window global app if app == None: app = QtGui.QApplication([]) self.window = controls.SliderWindow(self.name, self.handler) self.window.show() # Handler to receive slider value when changed def handler(self, tag, newValue): self.value = newValue print self.name + ": new value = " + repr(newValue) def run(self): if self.done(): return # Process slider drawing global app app.processEvents() # Send value to all outputs for o in range(len(self.outputs)): self.send(o, self.value) def done(self): return False def describe(self): pwb.Component.describe(self) print "value = " + repr(self.value) print "window = " + repr(self.window) def reset(self): pwb.Component.reset(self) self.value = 1 class Slider2(Slider): def __init__(self, name, n=1): pwb.Component.__init__(self, name) self.value = 1 # Create window, but not app # Set number of sliders desired self.window = controls.SliderWindow(self.name, self.handler, n) self.window.show() def handler(self, tag, newValue): self.value = newValue print self.name + ": tag = " + repr(tag) + " value = " + repr(newValue) # Send both slider tag and value to all outputs for o in range(len(self.outputs)): self.send(o, (tag, self.value)) # Get user input from 1 or more buttons class Button(pwb.Component): def __init__(self, name, bnames): pwb.Component.__init__(self, name) # Reset value self.value = -1 # Create app and window global app if app == None: app = QtGui.QApplication([]) self.window = controls.ButtonWindow(self.name, bnames, self.handler) self.window.show() # Get tag and value from buttons def handler(self, tag, value): self.value = tag print self.name + ": new value = " + repr(self.value) def run(self): if self.done(): return # Allow app to draw global app app.processEvents() # Send value to all outputs for o in range(len(self.outputs)): self.send(o, self.value) def done(self): return False def describe(self): pwb.Component.describe(self) print "value = " + repr(self.value) print "window = " + repr(self.window) def reset(self): pwb.Component.reset(self) self.value = -1 # Display an image with a circular overlay class ImageCircle(pwb.Component): def __init__(self, name, data, min=0.0, max=0.0, x=100.0, y=100.0, r=100.0): pwb.Component.__init__(self, name) self.x = x self.y = y self.r = r # Create app and window global app if app == None: app = QtGui.QApplication([]) self.window = plots.ImageCircleWindow(self.name, data, min, max, self.x, self.y, self.r) self.window.show() def run(self): if self.done(): return # Input 0 = x if len(self.inputs) > 0 and len(self.inputs[0]['queue']) > 0: self.x = self.inputs[0]['queue'].pop(0) * self.window.pixmap.width() # Input 1 = y if len(self.inputs) > 1 and len(self.inputs[1]['queue']) > 0: self.y = (1.0 - self.inputs[1]['queue'].pop(0)) * self.window.pixmap.height() # Input 2 = radius if len(self.inputs) > 2 and len(self.inputs[2]['queue']) > 0: self.r = self.inputs[2]['queue'].pop(0) * sqrt(self.window.pixmap.width() * \ self.window.pixmap.width() + self.window.pixmap.height() * \ self.window.pixmap.height()) / 2.0 # Set variables and update display if self.x != self.window.x or self.y != self.window.y or self.r != self.window.r: self.window.x = self.x self.window.y = self.y self.window.r = self.r self.window.update() global app app.processEvents() def done(self): return False def describe(self): pwb.Component.describe(self) print "x = " + repr(self.x) print "y = " + repr(self.y) print "r = " + repr(self.r) print "window = " + repr(self.window) def reset(self): pwb.Component.reset(self) self.x = 100 self.y = 100 self.r = 100 self.window.x = self.x self.window.y = self.y self.window.r = self.r self.window.update() global app app.processEvents() class ImageCircle2(pwb.Component): def __init__(self, name, data, min=0.0, max=0.0, x=100.0, y=100.0, r=100.0): pwb.Component.__init__(self, name) self.x = x self.y = y self.r = r # Create window, but not app self.window = plots.ImageCircleWindow(self.name, data, min, max, self.x, self.y, self.r) self.window.show() def run(self): if self.done(): return if len(self.inputs) > 0 and len(self.inputs[0]['queue']) > 0: # Get slider tag and value from one input e = self.inputs[0]['queue'].pop(0) tag = e[0] value = e[1] # Set x if tag == 0: self.x = value * self.window.pixmap.width() # Set y elif tag == 1: self.y = (1.0 - value) * self.window.pixmap.height() # Set radius elif tag == 2: self.r = value * sqrt(self.window.pixmap.width() * \ self.window.pixmap.width() + self.window.pixmap.height() * \ self.window.pixmap.height()) / 2.0 if self.x != self.window.x or self.y != self.window.y or self.r != self.window.r: # Update display self.window.x = self.x self.window.y = self.y self.window.r = self.r self.window.update() # Overload 'receive' function from Component class to put data directly into # queue rather than intermediate buffer def receive(self, input, data): for i in self.inputs: if i['component'] == input: i['queue'].append(data) self.run() def reset(self): pwb.Component.reset(self) self.x = 100 self.y = 100 self.r = 100 self.window.x = self.x self.window.y = self.y self.window.r = self.r self.window.update() if __name__ == "__main__": c = Signal('signal', range(10)) c = Display('display') c = Display2('display2') c = Add('add') c = Add2('add2') c = Subtract('subtract') c = Multiply('multiply') c = Divide('divide') c = Integrate('integrate') c = Minus('minus') c = Slider('slider') c = Slider2('slider2') c = Button('button', ['button']) c = ImageCircle('image', pyfits.getdata('images/c1230.en.fits'), 0, 0, 201.0, 400.0, 336.0) c = ImageCircle2('image2', pyfits.getdata('images/c1230.en.fits'), 0, 0, 201.0, 400.0, 336.0) pwb.describe() app.exec_()