Package sparked :: Package hardware :: Module serial
[hide private]
[frames] | no frames]

Source Code for Module sparked.hardware.serial

  1  # Copyright (c) 2010 Arjan Scherpenisse 
  2  # See LICENSE for details. 
  3   
  4  """ 
  5  Everything serial ports. 
  6   
  7  Autodetection of plugged in serialport devices and protocol probing. 
  8  """ 
  9   
 10   
 11  from zope.interface import Interface, Attribute 
 12   
 13  from twisted.internet import protocol, defer, reactor 
 14  from twisted.internet.serialport import SerialPort 
 15   
 16  from sparked.hardware import hal 
 17  from sparked import events 
 18   
 19   
20 -class SerialPortMonitor (hal.HardwareMonitor):
21 """ 22 Serial port device monitor. 23 """ 24 subsystem = "serial" 25
26 - def __init__(self):
27 self.events = serialEvents
28 29 serialEvents = events.EventDispatcher() 30 """ Event dispatcher for serial events """ 31 32
33 -class IProtocolProbe(Interface):
34 35 probeRequest = Attribute(""" 36 What to send to the serial port on connection. 37 """) 38 39 probeResponse = Attribute(""" 40 What to receive from the serial port. When this is a string, 41 the serialport is expected to return exactly this. 42 43 If this attribute is callable as a static method, a boolean 44 return value determines whether the probe is successful or 45 not. If None is returned, there is not enough data yet and the 46 function will be called again when data arrives again. 47 """)
48 49 50
51 -class SerialProbeFactory(object):
52 pass
53 54 55 56 57
58 -class SerialProbe(object):
59
60 - def __init__(self, device):
61 self.candidates = [] 62 self.device = device
63 64
65 - def addCandidate(self, proto, baudrate=9600):
66 if not IProtocolProbe.implementedBy(proto): 67 raise Exception("%s should implement IProtocolProbe" % proto) 68 69 self.candidates.append( (proto, baudrate) )
70 71
72 - def start(self):
73 self.deferred = defer.Deferred() 74 self._next() 75 return self.deferred
76 77
78 - def _next(self):
79 probe, baudrate = self.candidates[0] 80 del self.candidates[0] 81 82 print "Trying", probe 83 proto = SerialProbeProtocol(probe) 84 try: 85 SerialPort(proto, self.device, reactor, baudrate=baudrate) 86 except: 87 # "port not open" 88 self._probeResult(False, probe, baudrate) 89 return 90 91 proto.d.addCallback(self._probeResult, probe, baudrate)
92 93
94 - def _probeResult(self, r, probe, baudrate):
95 if r is True: 96 self.deferred.callback( (probe, baudrate) ) 97 return 98 99 if len(self.candidates): 100 self._next() 101 return 102 103 self.deferred.errback(Exception("Probing failed"))
104 105 106
107 -class SerialProbeProtocol(protocol.Protocol):
108 - def __init__(self, probe, timeout=0.5):
109 self.probe = probe 110 self.timeout = timeout 111 self.d = defer.Deferred()
112
113 - def connectionMade(self):
114 self.data = "" 115 self.transport.write(self.probe.probeRequest) 116 self.timer = reactor.callLater(self.timeout, self.response, False)
117
118 - def dataReceived(self, data):
119 self.data += data 120 121 if callable(self.probe.probeResponse): 122 retval = self.probe.probeResponse(self.data) 123 else: 124 if len(self.data) < len(self.probe.probeResponse): 125 retval = None 126 else: 127 retval = self.probe.probeResponse == self.data[:len(self.probe.probeResponse)] 128 if retval is None: 129 return 130 131 self.response(retval)
132
133 - def response(self, val):
134 if self.timer.active(): 135 self.timer.cancel() 136 self.transport.loseConnection() 137 self.d.callback(val)
138