Some style changes to intcode. Moved opcodes.py to intcode.py
parent
31d860b227
commit
8e83278b02
236
intcode.py
236
intcode.py
|
@ -1,66 +1,207 @@
|
|||
from collections import defaultdict
|
||||
from opcodes import run_op, CONTINUE, HALT, WAIT
|
||||
from enum import Enum
|
||||
from inspect import signature
|
||||
import operator
|
||||
|
||||
class StackIO:
|
||||
def __init__(self):
|
||||
self.input_stack = []
|
||||
self.output_stack = []
|
||||
class IntcodeError(Exception):
|
||||
pass
|
||||
|
||||
def input(self):
|
||||
return self.input_stack.pop(0)
|
||||
class OpState(Enum):
|
||||
"""Execution state class."""
|
||||
CONTINUE = 0
|
||||
HALT = 1
|
||||
WAIT = 2
|
||||
|
||||
def output(self, val):
|
||||
self.output_stack.append(val)
|
||||
class defaultlist(list):
|
||||
"""Default list class. Allows writing and reading out of bounds."""
|
||||
def __init__(self, lst, val_factory):
|
||||
super().__init__(lst)
|
||||
self.val_factory = val_factory
|
||||
|
||||
def wait_for_input(self):
|
||||
return len(self.input_stack) == 0
|
||||
def __setitem__(self, i, x):
|
||||
for _ in range((i - len(self) + 1)):
|
||||
self.append(self.val_factory())
|
||||
super().__setitem__(i, x)
|
||||
|
||||
def pop_out(self):
|
||||
return self.output_stack.pop(0)
|
||||
def __getitem__(self, i):
|
||||
if i >= len(self):
|
||||
return self.val_factory()
|
||||
return super().__getitem__(i)
|
||||
|
||||
class Memory:
|
||||
def __init__(self, program=[]):
|
||||
self.memory = program[:]
|
||||
class Singleton(object):
|
||||
def __init__(self, x=None):
|
||||
self.x = x
|
||||
|
||||
def write(self, i, val):
|
||||
self.memory += [0] * (i - len(self.memory) + 1)
|
||||
self.memory[i] = val
|
||||
def append(self, y):
|
||||
self.x = y
|
||||
|
||||
def read(self, i):
|
||||
if i >= len(self.memory):
|
||||
return 0
|
||||
return self.memory[i]
|
||||
def pop(self, _):
|
||||
x = self.x
|
||||
if x == None:
|
||||
raise IndexError("pop from empty singleton")
|
||||
self.x = None
|
||||
return x
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.x == other
|
||||
|
||||
def __str__(self):
|
||||
return str(self.x)
|
||||
|
||||
def format(self):
|
||||
if self.x == None:
|
||||
return None
|
||||
return int(self.x)
|
||||
|
||||
def copy(self):
|
||||
return Memory(self.memory[:])
|
||||
return self
|
||||
|
||||
class Computer(object):
|
||||
def __init__(self, program, IO, i=0, rel_base=0):
|
||||
self.memory = Memory(program)
|
||||
self.IO = IO
|
||||
def makeIO(in_buff_class, out_buff_class):
|
||||
class IO(object):
|
||||
def __init__(self, in_buff = in_buff_class()):
|
||||
self.in_buff = in_buff
|
||||
self.out_buff = out_buff_class()
|
||||
|
||||
def pop_input(self):
|
||||
if self.in_buff == in_buff_class():
|
||||
return None
|
||||
return self.in_buff.pop(0)
|
||||
|
||||
def append_output(self, x):
|
||||
self.out_buff.append(x)
|
||||
|
||||
def copy(self):
|
||||
new = IO()
|
||||
new.in_buff = self.in_buff.copy()
|
||||
new.out_buff = self.out_buff.copy()
|
||||
return new
|
||||
|
||||
def flush(self):
|
||||
o = self.out_buff
|
||||
self.out_buff = out_buff_class()
|
||||
try:
|
||||
return o.format()
|
||||
except AttributeError:
|
||||
return o
|
||||
|
||||
def write(self, input):
|
||||
if self.in_buff != in_buff_class():
|
||||
raise IntcodeError("writing to nonempty input")
|
||||
self.in_buff = in_buff_class(input)
|
||||
|
||||
return IO
|
||||
|
||||
StackIO = makeIO(list, list)
|
||||
SingletonIO = makeIO(Singleton, Singleton)
|
||||
|
||||
class Opcode:
|
||||
def of_operator(operator):
|
||||
"""Make opcode from operator."""
|
||||
def opcode(state, p1, p2, p3):
|
||||
r = operator(state.memory[p1], state.memory[p2])
|
||||
state.memory[p3] = r
|
||||
state.i += 4
|
||||
return OpState.CONTINUE
|
||||
return opcode
|
||||
|
||||
def jump_when(flag):
|
||||
def opcode(state, p1, p2):
|
||||
if (state.memory[p1] > 0) == flag:
|
||||
state.i = state.memory[p2]
|
||||
else:
|
||||
state.i += 3
|
||||
return OpState.CONTINUE
|
||||
return opcode
|
||||
|
||||
def adjust_base(state, p1):
|
||||
state.rel_base += state.memory[p1]
|
||||
state.i += 2
|
||||
return OpState.CONTINUE
|
||||
|
||||
def get_input(state, p1):
|
||||
x = state.IO.pop_input()
|
||||
if x == None:
|
||||
return OpState.WAIT
|
||||
state.memory[p1] = x
|
||||
state.i += 2
|
||||
return OpState.CONTINUE
|
||||
|
||||
def put_output(state, p1):
|
||||
state.IO.append_output(state.memory[p1])
|
||||
state.i += 2
|
||||
return OpState.CONTINUE
|
||||
|
||||
halt = lambda _ : OpState.HALT
|
||||
|
||||
ops = {1 : of_operator(operator.add),
|
||||
2 : of_operator(operator.mul),
|
||||
3 : get_input,
|
||||
4 : put_output,
|
||||
5 : jump_when(True),
|
||||
6 : jump_when(False),
|
||||
7 : of_operator(operator.lt),
|
||||
8 : of_operator(operator.eq),
|
||||
9 : adjust_base,
|
||||
99 : halt
|
||||
}
|
||||
|
||||
def parse(op):
|
||||
op = str(op)
|
||||
par_modes, op = op[:-2][::-1], int(op[-2:])
|
||||
|
||||
opcode = Opcode.ops[op]
|
||||
parnum = len(signature(opcode).parameters) - 1
|
||||
par_modes = par_modes + '0'*(parnum - len(par_modes))
|
||||
|
||||
return opcode, map(int, par_modes)
|
||||
|
||||
def run(state):
|
||||
opcode, par_modes = Opcode.parse(state.memory[state.i])
|
||||
pars = []
|
||||
for pn, mode in enumerate(par_modes, start=1):
|
||||
p = state.i + pn
|
||||
if mode == 0:
|
||||
p = state.memory[p]
|
||||
elif mode == 2:
|
||||
p = state.rel_base + state.memory[p]
|
||||
pars.append(p)
|
||||
op_state = opcode(state, *pars)
|
||||
return op_state
|
||||
|
||||
|
||||
|
||||
class Interpreter(object):
|
||||
def __init__(self, program, IO_class=StackIO, i=0, rel_base=0):
|
||||
self.memory = defaultlist(program[:], val_factory = lambda : 0)
|
||||
self.IO = IO_class()
|
||||
self.i = i
|
||||
self.rel_base = rel_base
|
||||
|
||||
def run(self):
|
||||
for exe_state in self:
|
||||
if exe_state != CONTINUE:
|
||||
break
|
||||
return exe_state
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
|
||||
def __next__(self):
|
||||
return self.run_opcode()
|
||||
if Opcode.run(self) == OpState.HALT:
|
||||
raise StopIteration
|
||||
while Opcode.run(self) == OpState.CONTINUE:
|
||||
continue
|
||||
return self.IO.flush()
|
||||
|
||||
def write(self, in_buff):
|
||||
self.IO.write(in_buff)
|
||||
|
||||
def run(self, in_buff=None):
|
||||
if in_buff != None:
|
||||
self.write(in_buff)
|
||||
out = next(self)
|
||||
if Opcode.run(self) != OpState.HALT:
|
||||
raise IntcodeError("expecting input")
|
||||
return out
|
||||
|
||||
def copy(self):
|
||||
return Computer(self.memory.memory[:], self.IO, self.i, self.rel_base)
|
||||
|
||||
class IntComp(Computer):
|
||||
run_opcode = run_op
|
||||
|
||||
def copy(self):
|
||||
return IntComp(self.memory.memory[:], self.IO, self.i, self.rel_base)
|
||||
memory = self.memory.copy()
|
||||
IO = IO.copy()
|
||||
return Emulator(memory, IO)
|
||||
|
||||
|
||||
class Display(defaultdict):
|
||||
|
@ -76,7 +217,6 @@ class Display(defaultdict):
|
|||
for y in range(Y))
|
||||
|
||||
|
||||
|
||||
class DisplayIO(object):
|
||||
def __init__(self, display):
|
||||
self.display = display
|
||||
|
@ -93,11 +233,3 @@ class DisplayIO(object):
|
|||
if len(self.block) == 3:
|
||||
self.display[self.block[:2]] = self.block[2]
|
||||
self.block = tuple()
|
||||
|
||||
def output(program, input=[], computer=IntComp):
|
||||
IO = StackIO()
|
||||
IO.input_stack = input[:]
|
||||
comp = computer(program, IO)
|
||||
if comp.run() != HALT:
|
||||
return False
|
||||
return IO.output_stack
|
||||
|
|
Loading…
Reference in New Issue