advent-of-code-2019/intcode.py

249 lines
7.2 KiB
Python

from inspect import signature
from lib import defaultlist
import operator
from itertools import chain
# Custom Exceptions -------------------------------------------------------
class WaitForInput(Exception):
"""Used for interrupting emulator execution."""
pass
class Halted(Exception):
"""Used when calling halted emulator."""
pass
class PipeError(Exception):
"""Used when b in pipe a | b needs input and a is halted."""
pass
class InputWriteError(Exception):
"""Raised when overwriting non-empty input buffer"""
# Emulator ----------------------------------------------------------------
def format_input_args(wfunc):
"""
Adds structure to arguments for input functions.
Creates an input iterator by chaining args, an optional input iterator
and an iterator constructed from a pop_input function.
"""
def decorated(self, *args, input_iter=iter([]), pop_input=lambda : None):
input_iter = chain(iter(args), input_iter, iter(pop_input, None))
return wfunc(self, input_iter)
return decorated
class emulator():
"""
Intcode emulator. The emulator is a generator yielding output.
"""
def __init__(self, program):
"""
The emulator state is composed of memory, current pointer and relative base.
Input is fed from an iterator. This iterator is empty by default. Use
write/append to add overwrite/append to the input iterator.
"""
self.memory = defaultlist(program.copy(), val_factory = lambda : 0)
self.i = 0
self.rel_base = 0
self.input_iter = iter([])
@format_input_args
def append_input(self, input_iter):
"""Appends input_iter to input iterator"""
self.input_iter = chain(self.input_iter, input_iter)
@format_input_args
def write_input(self, input_iter):
"""
Overwrites input iterator. Raises InputWriteError if input iterator is
not exhausted.
"""
try:
next(self.input_iter)
raise InputWriteError
except StopIteration:
self.input_iter = input_iter
# Opcode functions -----------------------------
def _of_operator(operator):
"""Make opcode from operator."""
def opcode(self, p1, p2, p3):
r = operator(self.memory[p1], self.memory[p2])
self.memory[p3] = r
self.i += 4
return opcode
def _jump_when(flag):
"""Returns jump when flag opcode"""
def opcode(self, p1, p2):
if (self.memory[p1] > 0) == flag:
self.i = self.memory[p2]
else:
self.i += 3
return opcode
def _get_input(self, p1):
"""Calls next on self.input_iterator. Raises WaitForInput if exhausted"""
try:
x = next(self.input_iter)
except StopIteration:
raise WaitForInput
self.memory[p1] = x
self.i += 2
def _put_output(self, p1):
"""Returns output. Only opcode that doesn't return None"""
self.i += 2
return self.memory[p1]
def _adjust_base(self, p1):
"""Adjusts relative base."""
self.rel_base += self.memory[p1]
self.i += 2
# Opcode codes ------------------------------
opcodes = {1 : _of_operator(operator.add),
2 : _of_operator(operator.mul),
3 : _get_input,
4 : _put_output,
5 : _jump_when(True),
6 : _jump_when(False),
7 : _of_operator(operator.lt),
8 : _of_operator(operator.eq),
9 : _adjust_base,
}
HALT = 99
def next_opcode(self):
"""
Calls next opcode and returns it's return value. Raises Halted
when program halts
"""
op = self.memory[self.i]
if op == self.HALT:
raise Halted
op = str(op)
par_modes, op = op[:-2][::-1], int(op[-2:])
opcode = self.opcodes[op]
parnum = len(signature(opcode).parameters) - 1
par_modes = par_modes + '0'*(parnum - len(par_modes))
par_modes = map(int, par_modes)
pars = []
for pn, mode in enumerate(par_modes, start=1):
p = self.i + pn
if mode == 0:
p = self.memory[p]
elif mode == 2:
p = self.rel_base + self.memory[p]
pars.append(p)
return opcode(self, *pars)
def __next__(self):
"""Executes until ouput is returned. Stops iteration when halted."""
try:
r = self.next_opcode()
while r == None:
r = self.next_opcode()
return r
except Halted:
raise StopIteration
def __iter__(self):
"""Return iterator"""
return self
def pipe_from(self, other):
"""Lazy pipe. Chains other to selfs input_iter."""
def handle():
try:
return next(other)
except StopIteration:
raise PipeError
self.append_input(pop_input = handle)
self.write_input = other.write_input
self.append_input = other.append_input
def __add__(self, other):
"""Pipes self to other and return other."""
other.pipe_from(self)
return other
@format_input_args
def copy(self, input_iter_copy):
"""
Copies emulator instance. The input iterator must be copied and provided
by the caller.
"""
new = emulator(self.memory)
new.i = self.i
new.rel_base = self.rel_base
new.input_iter = input_iter_copy
return new
# Repl class -------------------------------------------------------
class REPL:
"""Read-eval-print loop class."""
def __init__(self, emul, decode=list, encode=list):
"""Takes an emulator and decode/enocde functions"""
self.emul = emul
self.encode = encode
self.decode = decode
def __next__(self):
"""
Iterates over emulator output until WaitForInput is risen or the
emulator halts. Returns decoded output.
"""
out = []
try:
done = True
for o in self.emul:
done = False
out.append(o)
if done:
raise StopIteration
except WaitForInput:
pass
return self.decode(out)
def __iter__(self):
return self
def write(self, i):
"""Encodes input and writes it to emulator"""
self.emul.write_input(input_iter = self.encode(i))
def copy(self):
"""Copies repl instance."""
return REPL(self.emul.copy(), self.decode, self.encode)
# Useful functions --------------------------------------------------
def parse(s):
"""Parses program from string"""
return list(map(int, s.rstrip().split(',')))
def compile(program):
"""Binds program to emulator and returns it as a function taking input."""
def executable(*args, **kwargs):
emul = emulator(program)
emul.write_input(*args, **kwargs)
return emul
return executable
def preproc(s):
"""Prepoccessing function used by most programs using intcode"""
return compile(parse(s))