from collections import defaultdict, deque from enum import Enum from inspect import signature import operator class WaitForInput(Exception): pass class Halted(Exception): pass class PipeError(Exception): pass class defaultlist(list): """Default list class. Allows writing and reading out of bounds.""" def __init__(self, lst, val_factory): super().__init__(lst) self.val_factory = val_factory def __setitem__(self, i, x): for _ in range((i - len(self) + 1)): self.append(self.val_factory()) super().__setitem__(i, x) def __getitem__(self, i): if i >= len(self): return self.val_factory() return super().__getitem__(i) class Emulator(object): def __init__(self, program, get_input, put_output): self.in_f = get_input self.out_f = put_output self.memory = defaultlist(program[:], val_factory = lambda : 0) self.i = 0 self.rel_base = 0 def _of_operator(operator): """Make opcode from operator.""" def opcode(self, p1, p2, p3): r = operator(self.memory[p1], self.memory[p2]) self.memory[p3] = r self.i += 4 return opcode def _jump_when(flag): def opcode(self, p1, p2): if (self.memory[p1] > 0) == flag: self.i = self.memory[p2] else: self.i += 3 return opcode def _get_input(self, p1): x = self.in_f() self.memory[p1] = x self.i += 2 def _put_output(self, p1): self.i += 2 self.out_f(self.memory[p1]) def _adjust_base(self, p1): self.rel_base += self.memory[p1] self.i += 2 opcodes = {1 : _of_operator(operator.add), 2 : _of_operator(operator.mul), 3 : _get_input, 4 : _put_output, 5 : _jump_when(True), 6 : _jump_when(False), 7 : _of_operator(operator.lt), 8 : _of_operator(operator.eq), 9 : _adjust_base, } def __next__(self): state = self op = str(self.memory[state.i]) if op == '99': raise StopIteration par_modes, op = op[:-2][::-1], int(op[-2:]) opcode = self.opcodes[op] parnum = len(signature(opcode).parameters) - 1 par_modes = par_modes + '0'*(parnum - len(par_modes)) par_modes = map(int, par_modes) pars = [] for pn, mode in enumerate(par_modes, start=1): p = state.i + pn if mode == 0: p = state.memory[p] elif mode == 2: p = state.rel_base + state.memory[p] pars.append(p) opcode(state, *pars) def __iter__(self): return self def run(self): if self.memory[self.i] == 99: raise Halted return deque(self, maxlen=0) class Singleton(object): def __init__(self, x=None): self.x = x def append(self, y): self.x = y def pop(self, _): x = self.x if x == None: raise IndexError("pop from empty singleton") self.x = None return x def __eq__(self, other): return self.x == other def __str__(self): return str(self.x) def format(self): if self.x == None: return None return int(self.x) def copy(self): return self def makeIO(in_buff_class, out_buff_class): class IO(object): def __init__(self, in_buff = in_buff_class()): self.in_buff = in_buff self.out_buff = out_buff_class() def pop_input(self): if self.in_buff == in_buff_class(): raise WaitForInput return self.in_buff.pop(0) def append_output(self, x): self.out_buff.append(x) def copy(self): new = IO() new.in_buff = self.in_buff.copy() new.out_buff = self.out_buff.copy() return new def flush(self): o = self.out_buff self.out_buff = out_buff_class() try: return o.format() except AttributeError: return o def write(self, input): if self.in_buff != in_buff_class(): raise IntcodeError("writing to nonempty input") self.in_buff = in_buff_class(input) return IO StackIO = makeIO(list, list) SingletonIO = makeIO(Singleton, Singleton) class Interpreter(object): def __init__(self, program, IO_class=StackIO): self.IO = IO_class() self.comp = Emulator(program, self.IO.pop_input, self.IO.append_output) def __iter__(self): while True: try: self.comp.run() break except WaitForInput: yield self.IO.flush() yield self.IO.flush() def write(self, in_buff): self.IO.write(in_buff) def run(self, in_buff=None): if in_buff != None: self.write(in_buff) self.comp.run() return self.IO.flush() def eval(self, in_buff): self.write(in_buff) try: self.comp.run() except WaitForInput: pass return self.IO.flush() def pipe_from(self, other): class Output(Exception): pass def raise_output(o): raise Output(o) other.comp.out_f = raise_output def get_input(): try: other.run() raise PipeError except Output as o: return int(str(o)) self.comp.in_f = get_input self.IO.write = other.IO.write def copy(self): memory = self.memory.copy() IO = IO.copy() return Emulator(memory, IO) class Display(defaultdict): def __init__(self, encoding): super(Display, self).__init__(int) self.encoding = encoding def __str__(self): xs, ys = zip(*self.keys()) X, Y = max(xs)+1, max(ys)+1 return '\n'.join(''.join(self.encoding[self[(x, y)]] for x in range(X)) for y in range(Y)) class DisplayIO(object): def __init__(self, display): self.display = display self.block = tuple() def input(self): return NotImplemented def wait_for_input(self): return NotImplemented def output(self, x): self.block += (x,) if len(self.block) == 3: self.display[self.block[:2]] = self.block[2] self.block = tuple()