Admin Panel

import os
from amitools.vamos.path import VolumeManager, resolve_sys_path
from amitools.vamos.cfgcore import ConfigDict


def path_volume_resolve_sys_path_test(tmpdir):
  rsp = resolve_sys_path
  p = str(tmpdir)
  assert rsp(p) == p
  # user home
  assert rsp("~") == os.path.expanduser("~")
  # env var
  os.environ["TEST_PATH"] = p
  assert rsp("${TEST_PATH}") == p
  assert rsp("${TEST_PATH}/bla") == os.path.join(p, "bla")


def path_volume_add_del_test(tmpdir):
  v = VolumeManager()
  assert v.setup()
  assert v.get_all_names() == []
  my_path = str(tmpdir.mkdir("bla"))
  no_path = str(tmpdir.join("hugo"))
  # ok
  vol = v.add_volume("My:" + my_path)
  assert vol
  assert v.get_all_names() == ['My']
  assert v.is_volume('MY')
  assert vol.is_setup
  assert vol.get_path() == my_path
  assert v.add_volume("foo:" + my_path)
  assert v.get_all_names() == ['My', 'foo']
  # duplicate path mapping
  assert not v.add_volume("foo:" + my_path)
  # duplicate path name
  assert not v.add_volume("my:" + no_path)
  # invalid path
  assert not v.add_volume("foo:" + no_path)
  # ok
  assert v.del_volume("my")
  assert not vol.is_setup
  assert v.get_all_names() == ['foo']
  # invalid name
  assert not v.del_volume("baz")
  # shutdown
  v.shutdown()


def path_volume_add_local_test(tmpdir):
  vols_dir = str(tmpdir.join("volumes"))
  v = VolumeManager(vols_base_dir=vols_dir)
  v.setup()
  # without create
  assert not v.add_volume("My")
  # with create
  vol = v.add_volume("My?create")
  assert vol
  # check for vol dir
  vol_path = os.path.join(vols_dir, "My")
  assert os.path.isdir(vol_path)
  assert vol.get_path() == vol_path
  # create multiple
  vols = v.add_volumes(["foo?create", "bar?create"])
  assert vols
  for vol in vols:
    vol_path = os.path.join(vols_dir, vol.get_name())
    assert os.path.isdir(vol_path)
    assert vol.get_path() == vol_path
  # shutdown
  v.shutdown()


def path_volume_create_rel_sys_path_test(tmpdir):
  v = VolumeManager()
  org = tmpdir.mkdir("bla")
  my_path = str(org)
  # ok
  vol = v.add_volume("My:" + my_path)
  assert vol
  # single path
  path = vol.create_rel_sys_path("bla")
  assert path == str(org.join("bla"))
  assert os.path.isdir(path)
  # multi path
  path = vol.create_rel_sys_path(["foo", "bar"])
  assert path == str(org.join("foo").join("bar"))
  assert os.path.isdir(path)


def path_volume_sys_to_ami_test(tmpdir):
  v = VolumeManager()
  mp = tmpdir.mkdir("bla")
  my_path = str(mp)
  no_path = str(tmpdir.join("hugo"))
  mp2 = mp.mkdir("blub")
  my_path2 = str(mp2)
  assert v.add_volume("My:" + my_path)
  assert v.add_volume("nested:" + my_path2)
  # exisitng path
  s2a = v.sys_to_ami_path
  assert s2a(my_path) == 'My:'
  assert s2a(str(mp.join("foo"))) == 'My:foo'
  # expect nested path
  assert s2a(my_path2) == 'nested:'
  assert s2a(str(mp2.join("bla/blub"))) == "nested:bla/blub"
  # non existing
  assert s2a(str(tmpdir)) is None
  # not abosulte
  assert s2a("bla") is None


def path_volume_ami_to_sys_test(tmpdir):
  v = VolumeManager()
  mp = tmpdir.mkdir("bla")
  my_path = str(mp)
  mp2 = mp.mkdir("Foo").mkdir("BAR").mkdir("baZ")
  # case insensitive file system?
  ci_fs = os.path.exists(os.path.join(my_path, "foo"))
  sub_path = str(mp2)
  assert v.add_volume("My:" + my_path)
  # base path
  a2s = v.ami_to_sys_path
  assert a2s("my:") == my_path
  assert a2s("my:unkown/PATH") == os.path.join(my_path, "unkown", "PATH")
  # follow along case of path in sys fs
  assert a2s("my:foo/bar/baz") == sub_path
  # fast mode on case insensitive fs does not adjust ami path
  if ci_fs:
    assert a2s("my:foo", True) == os.path.join(my_path, "foo")
  else:
    assert a2s("my:foo", True) == os.path.join(my_path, "Foo")


def path_volume_cfg_test(tmpdir):
  my_path = str(tmpdir.mkdir("bla"))
  v = VolumeManager()
  cfg = ConfigDict({
      'volumes': [
          'my:' + my_path
      ]
  })
  assert v.parse_config(cfg)
  assert v.get_all_names() == ['my']
  assert v.is_volume('MY')


def path_volume_create_test(tmpdir):
  v = VolumeManager(str(tmpdir))
  assert v.setup()
  spec = "my:" + str(tmpdir) + "/bla"
  # dir does not exist -> can't create
  assert not v.add_volume(spec)
  # create
  assert v.add_volume(spec + "?create")
  # check
  assert tmpdir.join("bla").check(dir=1)
  # shutdown
  v.shutdown()


def path_volume_temp_test(tmpdir):
  v = VolumeManager(str(tmpdir))
  assert v.setup()
  spec = "my:" + str(tmpdir)
  # dir does exist -> no temp possible
  assert not v.add_volume(spec + "?temp")
  # create temp
  spec += "/bla"
  assert v.add_volume(spec + "?temp")
  # check that temp dir exists
  assert tmpdir.join("bla").check(dir=1)
  # shutdown
  v.shutdown()
  # now temp is gone
  assert not tmpdir.join("bla").check()