import struct import socket import hashlib import logging from time import sleep logger = logging.getLogger('pyhpfeeds') OP_ERROR = 0 OP_INFO = 1 OP_AUTH = 2 OP_PUBLISH = 3 OP_SUBSCRIBE = 4 BUFSIZ = 16384 __all__ = ["new", "FeedException"] def msghdr(op, data): return struct.pack('!iB', 5+len(data), op) + data def msgpublish(ident, chan, data): # if isinstance(data, str): # data = data.encode('latin1') return msghdr(OP_PUBLISH, struct.pack('!B', len(ident)) + ident + struct.pack('!B', len(chan)) + chan + data) def msgsubscribe(ident, chan): return msghdr(OP_SUBSCRIBE, struct.pack('!B', len(ident)) + ident + chan) def msgauth(rand, ident, secret): hash = hashlib.sha1(rand+secret).digest() return msghdr(OP_AUTH, struct.pack('!B', len(ident)) + ident + hash) class FeedUnpack(object): def __init__(self): self.buf = bytearray() def __iter__(self): return self def next(self): return self.unpack() def feed(self, data): self.buf.extend(data) def unpack(self): if len(self.buf) < 5: raise StopIteration('No message.') ml, opcode = struct.unpack('!iB', buffer(self.buf,0,5)) if len(self.buf) < ml: raise StopIteration('No message.') data = bytearray(buffer(self.buf, 5, ml-5)) del self.buf[:ml] return opcode, data class FeedException(Exception): pass class HPC(object): def __init__(self, host, port, ident, secret, timeout=3, reconnect=False, sleepwait=20): self.host, self.port = host, port self.ident, self.secret = ident, secret self.timeout = timeout self.reconnect = reconnect self.sleepwait = sleepwait self.brokername = 'unknown' self.connected = False self.stopped = False self.unpacker = FeedUnpack() self.connect() def connect(self): logger.info('connecting to {0}:{1}'.format(self.host, self.port)) self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.s.settimeout(self.timeout) try: self.s.connect((self.host, self.port)) except: raise FeedException('Could not connect to broker.') self.connected = True try: d = self.s.recv(BUFSIZ) except socket.timeout: raise FeedException('Connection receive timeout.') self.unpacker.feed(d) for opcode, data in self.unpacker: if opcode == OP_INFO: rest = buffer(data, 0) name, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0])) rand = str(rest) logger.debug('info message name: {0}, rand: {1}'.format(name, repr(rand))) self.brokername = name self.s.send(msgauth(rand, self.ident, self.secret)) break else: raise FeedException('Expected info message at this point.') self.s.settimeout(None) def _run(self, message_callback, error_callback): while not self.stopped: while self.connected: d = self.s.recv(BUFSIZ) if not d: self.connected = False break self.unpacker.feed(d) for opcode, data in self.unpacker: if opcode == OP_PUBLISH: rest = buffer(data, 0) ident, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0])) chan, content = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0])) message_callback(str(ident), str(chan), content) elif opcode == OP_ERROR: error_callback(data) if self.stopped: break if self.stopped: break self.connect() def run(self, message_callback, error_callback): if not self.reconnect: self._run(message_callback, error_callback) else: while True: self._run(message_callback, error_callback) # reconnect now we've failed sleep(self.sleepwait) while True: try: self.connect() break except FeedException: sleep(self.sleepwait) def subscribe(self, chaninfo): if type(chaninfo) == str: chaninfo = [chaninfo,] for c in chaninfo: self.s.send(msgsubscribe(self.ident, c)) def publish(self, chaninfo, data): if type(chaninfo) == str: chaninfo = [chaninfo,] for c in chaninfo: self.s.send(msgpublish(self.ident, c, data)) def stop(self): self.stopped = True def close(self): try: self.s.close() except: logger.warn('Socket exception when closing.') def new(host=None, port=10000, ident=None, secret=None, reconnect=True, sleepwait=20): return HPC(host, port, ident, secret, reconnect) .