from AVCommon.logger import logging import copy import threading from AVCommon import config import command from mq import MQStar import traceback class ProtocolClient: """ Protocol, client side. When the command is received, it's executed and the result resent to the server. """ def __init__(self, mq, vm, timeout=0): self.mq = mq self.vm = vm self.timeout = 0 self.on_error = "SKIP" # ["SKIP", "CONTINUE", "STOP"] assert (isinstance(vm, str)) assert mq def _execute_command(self, cmd): try: ret = cmd.execute(self.vm, cmd.args) logging.debug("cmd.execute ret: %s" % str(ret)) cmd.success, cmd.result = ret except Exception, e: logging.exception("ERROR:_execute_command") cmd.success = False cmd.result = e assert isinstance(cmd.success, bool) self.send_answer(cmd) return cmd def _meta(self, cmd): if config.verbose: logging.debug("PROTO S executing meta") ret = cmd.execute(self.vm, self, cmd.args) cmd.success, cmd.result = ret #logging.debug("PROTO S error: %s" % self.error) assert isinstance(cmd.success, bool) self.send_answer(cmd) return cmd # client side def receive_command(self): assert (isinstance(self.vm, str)) #logging.debug("PROTO receiveCommand %s" % (self.client)) msg = self.mq.receive_client(self.vm, blocking=True, timeout=self.timeout) if config.verbose: logging.debug("PROTO C receive_command %s, %s" % (self.vm, msg)) cmd = command.unserialize(msg) cmd.vm = self.vm if cmd.side == "meta": return self._meta(cmd) else: return self._execute_command(cmd) def send_answer(self, reply): if config.verbose: logging.debug("PROTO C send_answer %s" % reply) self.mq.send_server(self.vm, reply.serialize()) class Protocol(ProtocolClient): """ A protocol implements the server behavior.""" procedure = None last_command = None pool = 0 def __init__(self, dispatcher, vm, procedure=None, timeout=0, id=0): ProtocolClient.__init__(self, dispatcher.mq, vm, timeout) self.dispatcher = dispatcher self.vm = vm self.procedure = copy.deepcopy(procedure) self.sent_commands = [] assert (isinstance(vm, str)) assert dispatcher self.add_vm(vm) self.id = id self.error = False self.errors = [] def add_vm(self, vm): self.mq.add_client(vm) # server side def _send_command_mq(self, cmd): accept = cmd.on_init(self, cmd.args) if accept == True: if config.verbose: logging.debug("sending command to client") self.mq.send_client(self.vm, cmd.serialize()) elif accept == None: logging.debug("don't send command to client") cmd.success = True cmd.result = "blocked by on_init" self.send_answer(cmd) else: logging.debug("error sending command to client") cmd.success = False cmd.result = "blocked by on_init with error" self.send_answer(cmd) def _execute_command_server(self, cmd): try: ret = cmd.execute(self.vm, self, cmd.args) logging.debug("%s, cmd.execute ret: %s" % (self.vm, str(ret))) cmd.success, cmd.result = ret except Exception, e: logging.exception("ERROR:_execute_command") cmd.success = False cmd.result = e assert isinstance(cmd.success, bool) self.send_answer(cmd) return cmd def _execute(self, cmd, blocking=False): #logging.debug("PROTO S executing server") t = threading.Thread(target=self._execute_command_server, args=(cmd,)) t.start() if blocking: t.join() def send_next_kind(self, kind): while(True): if not self.procedure: self.last_command = None return False self.last_command = self.procedure.next_command() name = self.last_command.name if name == kind: break if name.startswith("STOP") or name.startswith("REPORT"): break logging.debug("skipping to the next %s: %s" % (kind, self.last_command.name)) #return self.send_command(copy.deepcopy(self.last_command)) return self.send_command(self.last_command) def send_next_call(self): return self.send_next_kind("CALL") def send_next_proc(self): return self.send_next_kind("END_CALL") def send_next_command(self): if not self.procedure: self.last_command = None return False self.last_command = self.procedure.next_command() #return self.send_command(copy.deepcopy(self.last_command)) return self.send_command(self.last_command) def send_command(self, cmd): self.sent_commands.append(cmd) if config.verbose: logging.debug("PROTO S send_command: %s" % str(cmd)) #cmd = command.unserialize(cmd) cmd.reset(self.vm) try: if cmd.side == "client": self._send_command_mq(cmd) elif cmd.side == "server": self._execute(cmd) elif cmd.side == "meta": self._meta(cmd) return True except Exception, ex: cmd.success = False cmd.result = str(ex) logging.exception("Error sending command %s: %s" % (cmd, ex)) return False def receive_answer(self, vm, cmd): """ returns a command with name, success and payload """ #msg = self.mq.receiveClient(self, client) if not self.sent_commands: raise RuntimeError("no sent commands") sent_command = self.sent_commands[0] cmd.reset(vm) if config.verbose: logging.debug("PROTO S manage_answer %s: %s" % (vm, cmd)) if cmd.success != None and cmd.name == sent_command.name: if config.verbose: logging.debug("PROTO S we got the expected answer") if cmd.success == False: self.error = True self.errors.append(str(cmd)) cmd.on_answer(vm, cmd.success, cmd.result) self.sent_commands.pop(0) else: if config.verbose: logging.debug("PROTO S ignoring unexpected answer: ") #logging.debug("cmd.timestamp == sent_command.timestamp: %s" % cmd.timestamp == sent_command.timestamp) cmd.success = None return cmd .