Admin Panel
from amitools.vamos.machine import Machine
from amitools.vamos.mem import MemoryAlloc
from amitools.vamos.schedule import Scheduler, Task, Stack
from amitools.vamos.machine.opcodes import *
from amitools.vamos.machine.regs import *
def setup():
machine = Machine()
sched = Scheduler(machine)
alloc = MemoryAlloc.for_machine(machine)
return machine, sched, alloc
def create_task(alloc, pc, start_regs=None, return_regs=None, name=None):
if name is None:
name = "task"
stack = Stack.alloc(alloc, 4096)
task = Task(name, pc, stack, start_regs, return_regs)
return task
def schedule_scheduler_simple_task_test():
machine, sched, alloc = setup()
mem = alloc.get_mem()
pc = machine.get_scratch_begin()
mem.w16(pc, op_nop)
mem.w16(pc+2, op_rts)
task = create_task(alloc, pc, {REG_D0: 42})
assert sched.add_task(task)
assert sched.schedule() is None
assert sched.get_num_tasks() == 0
assert task.get_result_regs() == {REG_D0: 42}
assert alloc.is_all_free()
machine.cleanup()
def schedule_scheduler_cb_test():
tasks = []
def cb(task):
tasks.append(task)
machine, sched, alloc = setup()
sched.set_cur_task_callback(cb)
mem = alloc.get_mem()
pc = machine.get_scratch_begin()
mem.w16(pc, op_nop)
mem.w16(pc+2, op_rts)
task = create_task(alloc, pc, {REG_D0: 42})
assert sched.add_task(task)
assert sched.schedule() is None
assert sched.get_num_tasks() == 0
assert task.get_result_regs() == {REG_D0: 42}
assert alloc.is_all_free()
machine.cleanup()
assert tasks == [task, None]
def schedule_scheduler_recursive_add_test():
tasks = []
def cb(task):
tasks.append(task)
machine, sched, alloc = setup()
sched.set_cur_task_callback(cb)
mem = alloc.get_mem()
pc = machine.get_scratch_begin()
pc2 = pc + 100
mem.w16(pc2, op_nop)
mem.w16(pc2+2, op_rts)
task2 = create_task(alloc, pc2, {REG_D0: 23})
def trap(op, pc):
assert sched.add_task(task2)
addr = machine.setup_quick_trap(trap)
mem.w16(pc, op_jmp)
mem.w32(pc+2, addr)
mem.w16(pc+6, op_rts)
task = create_task(alloc, pc, {REG_D0: 42})
assert sched.add_task(task)
assert sched.schedule() is None
assert sched.get_num_tasks() == 0
assert task.get_result_regs() == {REG_D0: 42}
assert task2.get_result_regs() == {REG_D0: 23}
assert alloc.is_all_free()
machine.cleanup()
assert tasks == [task, task2, task, None]
def schedule_scheduler_sub_task_test():
tasks = []
def cb(task):
tasks.append(task)
machine, sched, alloc = setup()
sched.set_cur_task_callback(cb)
mem = alloc.get_mem()
pc = machine.get_scratch_begin()
pc2 = pc + 100
mem.w16(pc2, op_nop)
mem.w16(pc2+2, op_rts)
task2 = create_task(alloc, pc2, {REG_D0: 23}, name="sub")
def trap(op, pc):
assert sched.run_sub_task(task2)
addr = machine.setup_quick_trap(trap)
mem.w16(pc, op_jmp)
mem.w32(pc+2, addr)
mem.w16(pc+6, op_rts)
task = create_task(alloc, pc, {REG_D0: 42})
assert sched.add_task(task)
assert sched.schedule() is None
assert sched.get_num_tasks() == 0
assert task.get_result_regs() == {REG_D0: 42}
assert task2.get_result_regs() == {REG_D0: 23}
assert alloc.is_all_free()
machine.cleanup()
assert tasks == [task, None]