249 lines
7.2 KiB
Python
249 lines
7.2 KiB
Python
from inspect import signature
|
|
from lib import defaultlist
|
|
import operator
|
|
from itertools import chain
|
|
|
|
# Custom Exceptions -------------------------------------------------------
|
|
class WaitForInput(Exception):
|
|
"""Used for interrupting emulator execution."""
|
|
pass
|
|
|
|
class Halted(Exception):
|
|
"""Used when calling halted emulator."""
|
|
pass
|
|
|
|
class PipeError(Exception):
|
|
"""Used when b in pipe a | b needs input and a is halted."""
|
|
pass
|
|
|
|
class InputWriteError(Exception):
|
|
"""Raised when overwriting non-empty input buffer"""
|
|
|
|
|
|
# Emulator ----------------------------------------------------------------
|
|
def format_input_args(wfunc):
|
|
"""
|
|
Adds structure to arguments for input functions.
|
|
Creates an input iterator by chaining args, an optional input iterator
|
|
and an iterator constructed from a pop_input function.
|
|
"""
|
|
def decorated(self, *args, input_iter=iter([]), pop_input=lambda : None):
|
|
input_iter = chain(iter(args), input_iter, iter(pop_input, None))
|
|
return wfunc(self, input_iter)
|
|
return decorated
|
|
|
|
|
|
class emulator():
|
|
"""
|
|
Intcode emulator. The emulator is a generator yielding output.
|
|
"""
|
|
|
|
def __init__(self, program):
|
|
"""
|
|
The emulator state is composed of memory, current pointer and relative base.
|
|
Input is fed from an iterator. This iterator is empty by default. Use
|
|
write/append to add overwrite/append to the input iterator.
|
|
"""
|
|
self.memory = defaultlist(program.copy(), val_factory = lambda : 0)
|
|
self.i = 0
|
|
self.rel_base = 0
|
|
|
|
self.input_iter = iter([])
|
|
|
|
@format_input_args
|
|
def append_input(self, input_iter):
|
|
"""Appends input_iter to input iterator"""
|
|
self.input_iter = chain(self.input_iter, input_iter)
|
|
|
|
@format_input_args
|
|
def write_input(self, input_iter):
|
|
"""
|
|
Overwrites input iterator. Raises InputWriteError if input iterator is
|
|
not exhausted.
|
|
"""
|
|
try:
|
|
next(self.input_iter)
|
|
raise InputWriteError
|
|
except StopIteration:
|
|
self.input_iter = input_iter
|
|
|
|
|
|
# Opcode functions -----------------------------
|
|
def _of_operator(operator):
|
|
"""Make opcode from operator."""
|
|
def opcode(self, p1, p2, p3):
|
|
r = operator(self.memory[p1], self.memory[p2])
|
|
self.memory[p3] = r
|
|
self.i += 4
|
|
return opcode
|
|
|
|
def _jump_when(flag):
|
|
"""Returns jump when flag opcode"""
|
|
def opcode(self, p1, p2):
|
|
if (self.memory[p1] > 0) == flag:
|
|
self.i = self.memory[p2]
|
|
else:
|
|
self.i += 3
|
|
return opcode
|
|
|
|
def _get_input(self, p1):
|
|
"""Calls next on self.input_iterator. Raises WaitForInput if exhausted"""
|
|
try:
|
|
x = next(self.input_iter)
|
|
except StopIteration:
|
|
raise WaitForInput
|
|
self.memory[p1] = x
|
|
self.i += 2
|
|
|
|
def _put_output(self, p1):
|
|
"""Returns output. Only opcode that doesn't return None"""
|
|
self.i += 2
|
|
return self.memory[p1]
|
|
|
|
def _adjust_base(self, p1):
|
|
"""Adjusts relative base."""
|
|
self.rel_base += self.memory[p1]
|
|
self.i += 2
|
|
|
|
# Opcode codes ------------------------------
|
|
opcodes = {1 : _of_operator(operator.add),
|
|
2 : _of_operator(operator.mul),
|
|
3 : _get_input,
|
|
4 : _put_output,
|
|
5 : _jump_when(True),
|
|
6 : _jump_when(False),
|
|
7 : _of_operator(operator.lt),
|
|
8 : _of_operator(operator.eq),
|
|
9 : _adjust_base,
|
|
}
|
|
|
|
HALT = 99
|
|
|
|
|
|
def next_opcode(self):
|
|
"""
|
|
Calls next opcode and returns it's return value. Raises Halted
|
|
when program halts
|
|
"""
|
|
op = self.memory[self.i]
|
|
if op == self.HALT:
|
|
raise Halted
|
|
|
|
op = str(op)
|
|
par_modes, op = op[:-2][::-1], int(op[-2:])
|
|
|
|
opcode = self.opcodes[op]
|
|
parnum = len(signature(opcode).parameters) - 1
|
|
par_modes = par_modes + '0'*(parnum - len(par_modes))
|
|
par_modes = map(int, par_modes)
|
|
|
|
pars = []
|
|
for pn, mode in enumerate(par_modes, start=1):
|
|
p = self.i + pn
|
|
if mode == 0:
|
|
p = self.memory[p]
|
|
elif mode == 2:
|
|
p = self.rel_base + self.memory[p]
|
|
pars.append(p)
|
|
return opcode(self, *pars)
|
|
|
|
def __next__(self):
|
|
"""Executes until ouput is returned. Stops iteration when halted."""
|
|
try:
|
|
r = self.next_opcode()
|
|
while r == None:
|
|
r = self.next_opcode()
|
|
return r
|
|
except Halted:
|
|
raise StopIteration
|
|
|
|
def __iter__(self):
|
|
"""Return iterator"""
|
|
return self
|
|
|
|
def pipe_from(self, other):
|
|
"""Lazy pipe. Chains other to selfs input_iter."""
|
|
def handle():
|
|
try:
|
|
return next(other)
|
|
except StopIteration:
|
|
raise PipeError
|
|
self.append_input(pop_input = handle)
|
|
self.write_input = other.write_input
|
|
self.append_input = other.append_input
|
|
|
|
def __add__(self, other):
|
|
"""Pipes self to other and return other."""
|
|
other.pipe_from(self)
|
|
return other
|
|
|
|
@format_input_args
|
|
def copy(self, input_iter_copy):
|
|
"""
|
|
Copies emulator instance. The input iterator must be copied and provided
|
|
by the caller.
|
|
"""
|
|
new = emulator(self.memory)
|
|
new.i = self.i
|
|
new.rel_base = self.rel_base
|
|
|
|
new.input_iter = input_iter_copy
|
|
return new
|
|
|
|
# Repl class -------------------------------------------------------
|
|
class REPL:
|
|
"""Read-eval-print loop class."""
|
|
def __init__(self, emul, decode=list, encode=list):
|
|
"""Takes an emulator and decode/enocde functions"""
|
|
self.emul = emul
|
|
self.encode = encode
|
|
self.decode = decode
|
|
|
|
def __next__(self):
|
|
"""
|
|
Iterates over emulator output until WaitForInput is risen or the
|
|
emulator halts. Returns decoded output.
|
|
"""
|
|
out = []
|
|
try:
|
|
done = True
|
|
for o in self.emul:
|
|
done = False
|
|
out.append(o)
|
|
if done:
|
|
raise StopIteration
|
|
except WaitForInput:
|
|
pass
|
|
return self.decode(out)
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def write(self, i):
|
|
"""Encodes input and writes it to emulator"""
|
|
self.emul.write_input(input_iter = self.encode(i))
|
|
|
|
def copy(self):
|
|
"""Copies repl instance."""
|
|
return REPL(self.emul.copy(), self.decode, self.encode)
|
|
|
|
# Useful functions --------------------------------------------------
|
|
|
|
def parse(s):
|
|
"""Parses program from string"""
|
|
return list(map(int, s.rstrip().split(',')))
|
|
|
|
|
|
def compile(program):
|
|
"""Binds program to emulator and returns it as a function taking input."""
|
|
def executable(*args, **kwargs):
|
|
emul = emulator(program)
|
|
emul.write_input(*args, **kwargs)
|
|
return emul
|
|
return executable
|
|
|
|
def preproc(s):
|
|
"""Prepoccessing function used by most programs using intcode"""
|
|
return compile(parse(s))
|
|
|