Rewrote intcode in iterator style

master
Tibor Bizjak 2023-03-15 18:20:23 +01:00
parent dc91635224
commit 196399df32
7 changed files with 226 additions and 312 deletions

View File

@ -1,13 +1,12 @@
#!/usr/bin/env python3
from intcode import Interpreter, SingletonIO
from intcode import preproc
from lib import last
def preproc(puzzle_input):
return list(map(int, puzzle_input.split(',')))
def partI(program):
return Interpreter(program, SingletonIO).run(1)
def partI(prog):
return last(prog(1))
def partII(program):
return Interpreter(program, SingletonIO).run(5)
def partII(prog):
return last(prog(5))

View File

@ -1,28 +1,15 @@
#!/usr/bin/env python3
from intcode import Interpreter, SingletonIO
from intcode import preproc
from itertools import permutations
from lib import memoize
from lib import memoize, last
stack_size = 5
fst_amp_input = 0
def preproc(puzzle_input):
program = list(map(int, puzzle_input.split(',')))
def make_amp(phase):
amp = Interpreter(program, SingletonIO)
amp.eval(phase)
return amp
return make_amp
def partI(make_amp):
@memoize
def run_amp(phase, amp_in):
amp = make_amp(phase)
return amp.run(amp_in)
def partI(amp):
run_amp = memoize(lambda *args : last(amp(*args)))
phase_range = range(stack_size)
best = 0
@ -34,18 +21,18 @@ def partI(make_amp):
best = max(amp_in, best)
return best
def partII(make_amp):
def partII(amp):
phase_range = range(5, 10)
best = 0
for perm in permutations(phase_range):
amps = list(map(make_amp, perm))
for prev, amp in zip(amps, amps[1:]):
amp.pipe_from(prev)
head, *tail = map(amp, perm)
amp_chain = sum(tail, head)
amp_chain.append_input(fst_amp_input)
amp.write(fst_amp_input)
for amp_in in amp:
amp.write(amp_in)
for amp_in in amp_chain:
amp_chain.write_input(amp_in)
best = max(amp_in, best)
return best

View File

@ -1,15 +1,14 @@
#!/usr/bin/env python3
from intcode import Interpreter, SingletonIO
from intcode import preproc
from lib import last
def preproc(puzzle_input):
return list(map(int, puzzle_input.split(',')))
def partI(program):
return Interpreter(program, SingletonIO).run(1)
def partI(executable):
return last(executable(1))
def partII(program):
return Interpreter(program, SingletonIO).run(2)
def partII(executable):
return last(executable(2))
from main import Tests

View File

@ -1,7 +1,8 @@
#!/usr/bin/env python3
from intcode import Interpreter
import intcode
from collections import defaultdict
from itertools import cycle
BLACK = 0
WHITE = 1
@ -41,46 +42,36 @@ class Canvas(defaultdict):
return unicode(self).encode("utf-8")
class robotIO:
def __init__(self, canvas=Canvas(color)):
self.canvas = canvas
self.dir = UP
self.pos = 0, 0
self.paint = True
def pop_input(self):
return self.canvas[self.pos]
def append_output(self, x):
if self.paint:
self.canvas[self.pos] = x
else:
self.dir = left(self.dir) if x == LEFT else right(self.dir)
self.pos = add(self.pos, self.dir)
self.paint = not self.paint
def flush(self):
return self.canvas
class Robot:
def __init__(self, program, canvas=Canvas(color)):
self.canvas = canvas
self.program = program
def __init__(self, program):
self.pos = (0, 0)
self.dir = UP
def paint(self):
comp = Interpreter(self.program, robotIO)
comp.IO.canvas = self.canvas
return comp.run()
self.comp = intcode.emulator(program)
def paint(self, canvas):
self.comp.write_input(pop_input=lambda : canvas[self.pos])
paint_flags = cycle((True, False))
for pf, x in zip(paint_flags, self.comp):
if pf:
canvas[self.pos] = x
else:
self.dir = left(self.dir) if x == LEFT else right(self.dir)
self.pos = add(self.pos, self.dir)
def preproc(puzzle_input):
return list(map(int, puzzle_input.split(',')))
program = intcode.parse(puzzle_input)
paint = lambda c : Robot(program).paint(c)
return paint
def partI(program):
canvas = Robot(program).paint()
def partI(paint):
canvas = Canvas(color)
paint(canvas)
return len(canvas)
def partII(program):
def partII(paint):
canvas = Canvas(color)
canvas[(0, 0)] = WHITE
canvas = Robot(program, canvas).paint()
paint(canvas)
return '\n' + str(canvas)

View File

@ -1,4 +1,6 @@
from intcode import Interpreter, WaitForInput
#!/usr/bin/env python3
import intcode
NORTH = 1
SOUTH = 2
@ -22,52 +24,29 @@ ENCODING = {WALL : '#',
OXY : 'O'
}
def move(pos, dir):
if dir == NORTH:
def move(pos, d):
if d == NORTH:
return pos[0], pos[1] + 1
if dir == SOUTH:
if d == SOUTH:
return pos[0], pos[1] - 1
if dir == EAST:
if d == EAST:
return pos[0] + 1, pos[1]
if dir == WEST:
if d == WEST:
return pos[0] - 1, pos[1]
return NotImplemented
class DroidIO:
def __init__(self):
self._input = None
self._output = None
def pop_input(self):
i = self._input
if i == None:
raise WaitForInput
self._input = None
return i
def append_output(self, x):
self._output = x
def flush(self):
o = self._output
self._output = None
return o
def write(self, x):
self._input = x
class Droid(Interpreter):
class Droid:
def __init__(self, program):
super().__init__(program, DroidIO)
self.pos = 0, 0
self.comp = intcode.emulator(program)
def move(self, dir):
out = self.eval(dir)
def move(self, d):
self.comp.write_input(d)
out = next(self.comp)
if out != WALL:
self.pos = move(self.pos, dir)
self.pos = move(self.pos, d)
return out
class Map(dict):
def __init__(self, encoding):
super().__init__()
@ -87,39 +66,39 @@ class Map(dict):
r += '\n'
return r
def map_space(droid, map=None):
if map == None:
map = Map(ENCODING)
for dir in DIRS:
new_pos = move(droid.pos, dir)
if new_pos in map:
def map_space(droid, space_map=None):
if space_map == None:
space_map = Map(ENCODING)
for d in DIRS:
new_pos = move(droid.pos, d)
if new_pos in space_map:
continue
object = droid.move(dir)
map[new_pos] = object
if object == WALL:
obj = droid.move(d)
space_map[new_pos] = obj
if obj == WALL:
continue
map_space(droid, map)
droid.move(OPPOSITE[dir])
return map
map_space(droid, space_map)
droid.move(OPPOSITE[d])
return space_map
def shortest_paths(map, pos, visited=None):
def shortest_paths(space_map, pos, visited=None):
if visited == None:
visited = {pos : 0}
path = visited[pos] + 1
for dir in DIRS:
new = move(pos, dir)
if map[new] == WALL or (new in visited and visited[new] <= path):
for d in DIRS:
new = move(pos, d)
if space_map[new] == WALL or (new in visited and visited[new] <= path):
continue
visited[new] = path
shortest_paths(map, new, visited)
shortest_paths(space_map, new, visited)
return visited
def preproc(puzzle_input):
program = list(map(int, puzzle_input.split(',')))
program = intcode.parse(puzzle_input)
droid = Droid(program)
area_map = map_space(droid)
x, y = tuple(filter(lambda x: x[1]==OXY, area_map.items()))[0][0]
path_map = shortest_paths(area_map, (x, y))
space_map = map_space(droid)
x, y = tuple(filter(lambda x: x[1]==OXY, space_map.items()))[0][0]
path_map = shortest_paths(space_map, (x, y))
return path_map
def partI(path_map):

View File

@ -1,43 +1,74 @@
from collections import defaultdict, deque
from enum import Enum
from inspect import signature
from lib import defaultlist
import operator
from itertools import chain
# Custom Exceptions -------------------------------------------------------
class WaitForInput(Exception):
"""Used for interrupting emulator execution."""
pass
class Halted(Exception):
"""Used when calling halted emulator."""
pass
class PipeError(Exception):
"""Used when b in pipe a | b needs input and a is halted."""
pass
class defaultlist(list):
"""Default list class. Allows writing and reading out of bounds."""
def __init__(self, lst, val_factory):
super().__init__(lst)
self.val_factory = val_factory
def __setitem__(self, i, x):
for _ in range((i - len(self) + 1)):
self.append(self.val_factory())
super().__setitem__(i, x)
def __getitem__(self, i):
if i >= len(self):
return self.val_factory()
return super().__getitem__(i)
class InputWriteError(Exception):
"""Raised when overwriting non-empty input buffer"""
class Emulator(object):
def __init__(self, program, get_input, put_output):
self.in_f = get_input
self.out_f = put_output
# Emulator ----------------------------------------------------------------
def format_input_args(wfunc):
"""
Adds structure to arguments for input functions.
Creates an input iterator by chaining args, an optional input iterator
and an iterator constructed from a pop_input function.
"""
def decorated(self, *args, input_iter=iter([]), pop_input=lambda : None):
input_iter = chain(iter(args), input_iter, iter(pop_input, None))
return wfunc(self, input_iter)
return decorated
class emulator():
"""
Intcode emulator. The emulator is a generator yielding output.
"""
def __init__(self, program):
"""
The emulator state is composed of memory, current pointer and relative base.
Input is fed from an iterator. This iterator is empty by default. Use
write/append to add overwrite/append to the input iterator.
"""
self.memory = defaultlist(program[:], val_factory = lambda : 0)
self.i = 0
self.rel_base = 0
self.input_iter = iter([])
@format_input_args
def append_input(self, input_iter):
"""Appends input_iter to input iterator"""
self.input_iter = chain(self.input_iter, input_iter)
@format_input_args
def write_input(self, input_iter):
"""
Overwrites input iterator. Raises InputWriteError if input iterator is
not exhausted.
"""
try:
next(self.input_iter)
raise InputWriteError
except StopIteration:
self.input_iter = input_iter
# Opcode functions -----------------------------
def _of_operator(operator):
"""Make opcode from operator."""
def opcode(self, p1, p2, p3):
@ -47,6 +78,7 @@ class Emulator(object):
return opcode
def _jump_when(flag):
"""Returns jump when flag opcode"""
def opcode(self, p1, p2):
if (self.memory[p1] > 0) == flag:
self.i = self.memory[p2]
@ -55,18 +87,25 @@ class Emulator(object):
return opcode
def _get_input(self, p1):
x = self.in_f()
"""Calls next on self.input_iterator. Raises WaitForInput if exhausted"""
try:
x = next(self.input_iter)
except StopIteration:
raise WaitForInput
self.memory[p1] = x
self.i += 2
def _put_output(self, p1):
"""Returns output. Only opcode that doesn't return None"""
self.i += 2
self.out_f(self.memory[p1])
return self.memory[p1]
def _adjust_base(self, p1):
"""Adjusts relative base."""
self.rel_base += self.memory[p1]
self.i += 2
# Opcode codes ------------------------------
opcodes = {1 : _of_operator(operator.add),
2 : _of_operator(operator.mul),
3 : _get_input,
@ -78,13 +117,19 @@ class Emulator(object):
9 : _adjust_base,
}
def __next__(self):
state = self
op = str(self.memory[state.i])
HALT = 99
if op == '99':
raise StopIteration
def next_opcode(self):
"""
Calls next opcode and returns it's return value. Raises Halted
when program halts
"""
op = self.memory[self.i]
if op == self.HALT:
raise Halted
op = str(op)
par_modes, op = op[:-2][::-1], int(op[-2:])
opcode = self.opcodes[op]
@ -94,170 +139,60 @@ class Emulator(object):
pars = []
for pn, mode in enumerate(par_modes, start=1):
p = state.i + pn
p = self.i + pn
if mode == 0:
p = state.memory[p]
p = self.memory[p]
elif mode == 2:
p = state.rel_base + state.memory[p]
p = self.rel_base + self.memory[p]
pars.append(p)
opcode(state, *pars)
return opcode(self, *pars)
def __iter__(self):
return self
def run(self):
if self.memory[self.i] == 99:
raise Halted
return deque(self, maxlen=0)
class Singleton(object):
def __init__(self, x=None):
self.x = x
def append(self, y):
self.x = y
def pop(self, _):
x = self.x
if x == None:
raise IndexError("pop from empty singleton")
self.x = None
return x
def __eq__(self, other):
return self.x == other
def __str__(self):
return str(self.x)
def format(self):
if self.x == None:
return None
return int(self.x)
def copy(self):
return self
def makeIO(in_buff_class, out_buff_class):
class IO(object):
def __init__(self, in_buff = in_buff_class()):
self.in_buff = in_buff
self.out_buff = out_buff_class()
def pop_input(self):
if self.in_buff == in_buff_class():
raise WaitForInput
return self.in_buff.pop(0)
def append_output(self, x):
self.out_buff.append(x)
def copy(self):
new = IO()
new.in_buff = self.in_buff.copy()
new.out_buff = self.out_buff.copy()
return new
def flush(self):
o = self.out_buff
self.out_buff = out_buff_class()
try:
return o.format()
except AttributeError:
return o
def write(self, input):
if self.in_buff != in_buff_class():
raise IntcodeError("writing to nonempty input")
self.in_buff = in_buff_class(input)
return IO
StackIO = makeIO(list, list)
SingletonIO = makeIO(Singleton, Singleton)
class Interpreter(object):
def __init__(self, program, IO_class=StackIO):
self.IO = IO_class()
self.comp = Emulator(program, self.IO.pop_input, self.IO.append_output)
def __iter__(self):
while True:
try:
self.comp.run()
break
except WaitForInput:
yield self.IO.flush()
yield self.IO.flush()
def write(self, in_buff):
self.IO.write(in_buff)
def run(self, in_buff=None):
if in_buff != None:
self.write(in_buff)
self.comp.run()
return self.IO.flush()
def eval(self, in_buff):
self.write(in_buff)
def __next__(self):
"""Executes until ouput is returned. Stops iteration when halted."""
try:
self.comp.run()
except WaitForInput:
pass
return self.IO.flush()
r = self.next_opcode()
while r == None:
r = self.next_opcode()
return r
except Halted:
raise StopIteration
def __iter__(self):
"""Return iterator"""
return self
def pipe_from(self, other):
class Output(Exception):
pass
def raise_output(o):
raise Output(o)
other.comp.out_f = raise_output
def get_input():
"""Lazy pipe. Chains other to selfs input_iter."""
def handle():
try:
other.run()
return next(other)
except StopIteration:
raise PipeError
except Output as o:
return int(str(o))
self.comp.in_f = get_input
self.IO.write = other.IO.write
self.append_input(pop_input = handle)
self.write_input = other.write_input
self.append_input = other.append_input
def __add__(self, other):
"""Pipes self to other and return other."""
other.pipe_from(self)
return other
# Useful functions --------------------------------------------------
def parse(s):
"""Parses program from string"""
return list(map(int, s.rstrip().split(',')))
def copy(self):
memory = self.memory.copy()
IO = IO.copy()
return Emulator(memory, IO)
def compile(program):
"""Binds program to emulator and returns it as a function taking input."""
def executable(*args, **kwargs):
emul = emulator(program)
emul.write_input(*args, **kwargs)
return emul
return executable
def preproc(s):
"""Prepoccessing function used by most programs using intcode"""
return compile(parse(s))
class Display(defaultdict):
def __init__(self, encoding):
super(Display, self).__init__(int)
self.encoding = encoding
def __str__(self):
xs, ys = zip(*self.keys())
X, Y = max(xs)+1, max(ys)+1
return '\n'.join(''.join(self.encoding[self[(x, y)]]
for x in range(X))
for y in range(Y))
class DisplayIO(object):
def __init__(self, display):
self.display = display
self.block = tuple()
def input(self):
return NotImplemented
def wait_for_input(self):
return NotImplemented
def output(self, x):
self.block += (x,)
if len(self.block) == 3:
self.display[self.block[:2]] = self.block[2]
self.block = tuple()

24
lib.py
View File

@ -1,5 +1,29 @@
#!/usr/bin/env python3
import itertools
from collections import deque
# Iterator recipes ----------------------------------------
def last(iterator):
return deque(iterator, maxlen=1)[0]
class defaultlist(list):
"""Default list class. Allows writing and reading out of bounds."""
def __init__(self, lst, val_factory):
super().__init__(lst)
self.val_factory = val_factory
def __setitem__(self, i, x):
for _ in range((i - len(self) + 1)):
self.append(self.val_factory())
super().__setitem__(i, x)
def __getitem__(self, i):
if i >= len(self):
return self.val_factory()
return super().__getitem__(i)
def memoize(f):
cache = dict()
def memf(*args):