# This file is part of Lisien, a framework for life simulation games.
# Copyright (c) Zachary Spector, public@zacharyspector.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Database connectors to persist Lisien's state to disk
The base class, :class:`AbstractDatabaseConnector`, defines the API that
database connectors need to use. Most databases should implement
:class:`ThreadedDatabaseConnector`, which does background processing in a
subthread.
The connectors also have import/export code to save and load the whole
database. One concrete class, :class:`PythonDatabaseConnector`, is provided
for when that's all the persistence you want. The other,
:class:`NullDatabaseConnector`, can't import, export, or persist anything at
all; it's not much good for anything but tests.
"""
from __future__ import annotations
import inspect
import os
import sys
from abc import ABC, abstractmethod
from ast import literal_eval
from collections import UserDict, defaultdict, deque
from contextlib import contextmanager
from functools import cached_property, partial, partialmethod, wraps
from io import IOBase, StringIO
from itertools import filterfalse, starmap
from pathlib import Path
from queue import Queue
from threading import Lock, Thread
from types import EllipsisType, FunctionType, MethodType
from typing import (
IO,
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Iterable,
Iterator,
Literal,
Mapping,
MutableMapping,
MutableSet,
Optional,
Set,
TypeVar,
get_args,
get_type_hints,
)
import networkx as nx
from attr import Factory
from attrs import define, field
from tblib import Traceback
if TYPE_CHECKING:
from xml.etree.ElementTree import Element, ElementTree
from xml.etree.ElementTree import indent as indent_tree
from xml.etree.ElementTree import parse
else:
try:
from lxml.etree import Element, ElementTree
from lxml.etree import indent as indent_tree
from lxml.etree import parse
except ImportError:
from xml.etree.ElementTree import (
ElementTree,
Element,
indent as indent_tree,
parse,
)
import lisien.types
from ..exc import KeyframeError
from ..facade import EngineFacade
from ..types import (
AbstractEngine,
ActionFuncName,
ActionRowType,
AssignmentRowListType,
AssignmentRowType,
Branch,
BranchRowType,
CharacterRulesHandledRowType,
CharDict,
CharName,
CharRulebookRowType,
EdgeKeyframe,
EdgeRowType,
EdgeValRowType,
EternalKey,
FuncName,
GraphEdgeValKeyframe,
GraphNodeValKeyframe,
GraphRowType,
GraphTypeStr,
GraphValKeyframe,
GraphValRowType,
Key,
Keyframe,
KeyframeExtensionRowType,
KeyframeGraphRowType,
KeyHint,
LoadedCharWindow,
LoadedDict,
NodeKeyframe,
NodeName,
NodeRowType,
NodeRulebookRowType,
NodeRulesHandledRowType,
NodeValRowType,
PickierDefaultDict,
Plan,
PlanTicksRowType,
PortalRulebookRowType,
PortalRulesHandledRowType,
PrereqFuncName,
PrereqRowType,
RuleBig,
RuleBigRowType,
RulebookName,
RulebookPriority,
RulebookRowType,
RulebooksKeyframe,
RuleFuncName,
RuleKeyframe,
RuleName,
RuleNeighborhood,
RuleNeighborhoodRowType,
RuleRowType,
Stat,
StatDict,
ThingRowType,
Tick,
Time,
TimeWindow,
TriggerFuncName,
TriggerRowType,
Turn,
TurnRowType,
UnitRowType,
UnitRulesHandledRowType,
UniversalKey,
UniversalKeyframe,
UniversalRowType,
Value,
ValueHint,
deannotate,
root_type,
sort_set,
)
from ..util import ILLEGAL_CHARACTER_NAMES, garbage
from ..window import (
AssignmentTimeDict,
BranchingTimeListDict,
LinearTimeListDict,
)
from ..wrap import DictWrapper, ListWrapper, SetWrapper
SCHEMAVER_B = b"\xb6_lisien_schema_version"
SCHEMA_VERSION = 2
SCHEMA_VERSION_B = SCHEMA_VERSION.to_bytes(1, "little")
XML_SCHEMA_VERSION = 2
class GlobalKeyValueStore(UserDict):
"""A dict-like object that keeps its contents in a table.
Mostly this is for holding the current branch and revision.
"""
def __init__(self, qe: AbstractDatabaseConnector, data: dict):
self.qe = qe
super().__init__()
self.data = data
def __getitem__(
self, k: Key
) -> Value | DictWrapper | ListWrapper | SetWrapper:
ret = super().__getitem__(k)
if ret is ...:
raise KeyError(k)
if isinstance(ret, dict):
return DictWrapper(
lambda: super().__getitem__(k),
self,
k,
)
elif isinstance(ret, list):
return ListWrapper(
lambda: super().__getitem__(k),
self,
k,
)
elif isinstance(ret, set):
return SetWrapper(
lambda: super().__getitem__(k),
self,
k,
)
return ret
def __setitem__(self, k: Key, v: Value):
if hasattr(v, "unwrap"):
v = v.unwrap()
self.qe.global_set(k, v)
super().__setitem__(k, v)
def __delitem__(self, k: Key):
super().__delitem__(k)
self.qe.global_del(k)
def __copy__(self) -> dict[EternalKey, Value]:
return {
EternalKey(k): Value(v.unwrap() if hasattr(v, "unwrap") else v)
for (k, v) in self.items()
}
@define
class ConnectionLooper(ABC):
connector: AbstractDatabaseConnector
_initialized: bool = field(init=False, default=False)
existence_lock: Lock = field(init=False, factory=Lock)
@existence_lock.validator
def _validate_existence_lock(self, attr, lock: Lock):
lock.acquire()
@cached_property
def inq(self) -> Queue:
return self.connector._inq
@cached_property
def outq(self) -> Queue:
return self.connector._outq
@cached_property
def lock(self):
return Lock()
@cached_property
def logger(self):
from logging import getLogger
return getLogger("lisien." + self.__class__.__name__)
def dispatch_special_instruction(self, inst):
raise TypeError(
"Don't know what to do with this type of instruction",
type(inst),
inst,
)
@abstractmethod
def call(self, name: str, *args, **kwargs): ...
@abstractmethod
def call_many(self, name: str, largs: list[tuple | dict]) -> Iterable: ...
def convert_response(self, resp):
return resp
def _dispatch_instruction(self, *inst):
match inst:
case ("one", cmd):
return self.call(cmd)
case ("one", cmd, args):
return self.call(cmd, *args)
case ("one", cmd, args, kwargs):
return self.call(cmd, *args, **kwargs)
case ("many", cmd, argls):
return self.call_many(cmd, argls)
case (cmd, args, kwargs):
return self.call(cmd, *args, **kwargs)
case (cmd, args):
return self.call(cmd, *args)
case (cmd,):
return self.call(cmd)
case _:
raise TypeError("Invalid instruction", inst)
def dispatch_instruction(self, *inst):
return self.convert_response(self._dispatch_instruction(*inst))
def dispatch_silent_instruction(self, *args) -> None:
self._dispatch_instruction(*args)
def begin(self):
"""Do anything needed to connect to the database and start a transaction
Might be nothing, if it's a file-based local database, such as ParquetDB.
"""
def run(self):
self.begin()
inq = self.inq
outq = self.outq
def send(output):
inq.task_done()
outq.put(output)
while (inst := inq.get()) != "close":
if inst == "commit":
self.commit()
send("committed")
elif inst == "initdb":
send(self.initdb())
elif not isinstance(inst, tuple):
try:
send(self.dispatch_special_instruction(inst))
except BaseException as exc:
send(exc)
continue
elif inst[0] == "silent":
try:
self.dispatch_silent_instruction(*inst[1:])
inq.task_done()
continue
except BaseException as exc:
message = f"{self.__class__.__name__}: Got exception while silenced: {exc}"
print(message, file=sys.stderr)
self.logger.error(message, exc_info=exc)
continue
elif inst[0] == "echo":
if len(inst) == 1:
send("echo")
continue
else:
send(inst[1])
continue
else:
try:
send(self.dispatch_instruction(*inst))
except BaseException as exc:
send(exc)
continue
self.close()
inq.task_done()
@abstractmethod
def initdb(self):
pass
@abstractmethod
def commit(self):
pass
@abstractmethod
def close(self):
pass
_ARGS = TypeVar("_ARGS")
_RET = TypeVar("_RET")
def mutexed(
func: Callable[[_ARGS, ...], _RET],
) -> Callable[[_ARGS, ...], _RET]:
"""Decorator for when an entire method's body holds a mutex lock
Should be applied to a method. Expects that the method's class has another
method, `mutex()`, that is a context manager, which holds the lock.
"""
@wraps(func)
def mutexy(self, *args, **kwargs):
with self.mutex():
return func(self, *args, **kwargs)
return mutexy
@define
class Batch(list):
"""A list of tuples to be serialized with a given function and sent to the database
Construct these with the :func:`batched` decorator.
"""
connector: AbstractDatabaseConnector
table: str
key_len: int
inc_rec_counter: bool
per_character: bool
@staticmethod
def _convert_sig(
serialize_record: Callable[[Value, ...], bytes],
) -> inspect.Signature:
return inspect.signature(serialize_record)
signature: inspect.Signature = field(converter=_convert_sig)
cached_properties: ClassVar[dict[str, cached_property]] = {}
"""`cached_property` objects produced by `@batched`"""
serializers: ClassVar[dict[str, Callable[[Value], bytes]]] = {}
"""Serialization functions decorated by `@batched`"""
validate: ClassVar[bool] = True
"""Whether to check that records added to the batch are correctly typed tuples"""
def cull(self, condition: Callable[..., bool]) -> None:
"""Remove records matching a condition from the batch
Records are unpacked before being passed into the condition function.
"""
datta = list(self)
self.clear()
self.extend(
filterfalse(
partial(self._call_with_unpacked_tuple, condition), datta
)
)
@cached_property
def deserialize(self) -> Callable[[tuple], tuple]:
sig = self.signature
unpack = self.connector.unpack
ret_annot = sig.return_annotation
if isinstance(ret_annot, str):
ret_annot = eval(ret_annot)
types_on_disk = get_args(ret_annot)
# I'd like a more informative return type
def deserialize(rec: types_on_disk) -> tuple:
assert len(rec) == len(types_on_disk)
return tuple(
unpack(item) if type_on_disk is bytes else item
for (item, type_on_disk) in zip(rec, types_on_disk)
)
return deserialize
@cached_property
def get_lists(self) -> Callable[[dict, Branch, Queue], tuple]:
if self.per_character:
sig = self.signature
char_index = -1
args = list(sig.parameters.keys())
for char_index, arg in enumerate(args):
param = sig.parameters[arg]
if isinstance(param.annotation, str):
annot = eval(param.annotation)
else:
annot = param.annotation
if annot is CharName:
break
else:
raise TypeError("per_character was set, but no CharName")
def get_lists(ret: dict, branch: Branch, outq: Queue) -> tuple:
while isinstance(got := outq.get(), list):
for rec in got:
if isinstance(rec, dict):
rec = tuple(rec[arg] for arg in args)
else:
rec = (branch, *rec)
charn = self.connector.unpack(rec[char_index])
try:
ret[charn][self.table].append(
self.deserialize(rec)
)
except TypeError as ex:
raise TypeError(*ex.args, self.table, rec) from ex
outq.task_done()
return got
else:
def get_lists(ret: dict, branch: Branch, outq: Queue) -> tuple:
while isinstance(got := outq.get(), list):
for rec in got:
if isinstance(rec, dict):
rec = tuple(
rec[arg]
for arg in self.signature.parameters.keys()
)
else:
rec = (branch, *rec)
try:
ret[self.table].append(self.deserialize(rec))
except TypeError as ex:
raise TypeError(*ex.args, self.table, rec) from ex
outq.task_done()
return got
return get_lists
@cached_property
def window_getter(
self,
) -> Callable[
[
ThreadedDatabaseConnector,
dict,
Branch,
Turn,
Tick,
Turn | None,
Tick | None,
],
None,
]:
table = self.table
get_lists = self.get_lists
def get_a_window(
self: ThreadedDatabaseConnector,
ret: dict,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Turn | None,
tick_to: Tick | None,
) -> None:
outq: Queue = self._outq
if (got := outq.get()) != (
"begin",
table,
branch,
turn_from,
tick_from,
turn_to,
tick_to,
):
raise RuntimeError("Expected beginning of " + table, got)
outq.task_done()
got = get_lists(ret, branch, outq)
if got != (
"end",
table,
branch,
turn_from,
tick_from,
turn_to,
tick_to,
):
raise RuntimeError("Expected end of " + table, got)
outq.task_done()
return get_a_window
@staticmethod
def _call_with_unpacked_tuple(func, tup):
return func(*tup)
def _validate(self, t: tuple):
if not isinstance(t, tuple):
raise TypeError("Can only batch tuples")
if len(t) != len(self.signature.parameters):
raise TypeError(
f"Need a tuple of length {len(self.signature.parameters)}, not {len(t)}"
)
for i, (param, value) in enumerate(
zip(self.signature.parameters.values(), t)
):
annot = param.annotation
if not isinstance(value, tuple(map(root_type, deannotate(annot)))):
raise TypeError(
f"While validating {self.table}: "
f"Tuple element {i} is of type {type(value)};"
f" should be {param.annotation}",
value,
)
def __setitem__(self, i: int, v):
if self.validate:
self._validate(v)
super().__setitem__(i, v)
def insert(self, i: int, v):
if self.validate:
self._validate(v)
super().insert(i, v)
def append(self, v):
if self.validate:
self._validate(v)
super().append(v)
def serialize_record(self, *args) -> tuple:
sig = self.signature
if len(sig.parameters) != len(args):
raise TypeError("Wrong record length", sig.parameters, args)
rett = sig.return_annotation
if isinstance(rett, str):
rett = eval(rett)
if hasattr(rett, "__value__"):
rett = rett.__value__
if rett is Time:
rett = (Branch, Turn, Tick)
else:
rett = get_args(rett)
if len(rett) != len(args):
raise TypeError(
"Wrong record length", sig.return_annotation, rett, args
)
pack = self.connector.pack
out = []
for arg, typ in zip(args, rett):
if isinstance(typ, str):
typ = eval(typ)
typ = root_type(typ)
if typ is bytes:
out.append(pack(arg))
elif not isinstance(arg, typ):
raise TypeError("Wrong type", arg, typ)
else:
out.append(arg)
return tuple(out)
def __call__(self):
if not self:
return 0
if self.key_len:
deduplicated = {
rec[: self.key_len]: rec[self.key_len :] for rec in self
}
records = starmap(
self.serialize_record,
((*key, *value) for (key, value) in deduplicated.items()),
)
else:
records = starmap(self.serialize_record, self)
data = list(records)
argnames = list(self.signature.parameters.keys())
if self.key_len:
self.connector.delete_many_silent(
self.table,
[
dict(zip(argnames[: self.key_len], datum))
for datum in {rec[: self.key_len] for rec in data}
],
)
self.connector.insert_many_silent(
self.table, [dict(zip(argnames, datum)) for datum in data]
)
n = len(data)
self.clear()
if self.inc_rec_counter:
self.connector._increc(n)
return n
def batched(
table: str,
serialize_record: Callable | None = None,
*,
key_len: int = 0,
inc_rec_counter: bool = True,
per_character: bool = False,
) -> partial | cached_property:
"""Decorator for serializers that operate on batches of records
Needs at least the name of the ``table`` the batch will be inserted into.
The type annotations on the decorated function will be used to generate the
schema for at least SQL and ParquetDB databases, and preferably every other.
The decorated function will never be called; it's just convenient to use
function signatures and annotations to express de/serialization.
:param key_len: How long the primary key is. Used to delete records matching
those in the batch.
:param inc_rec_counter: Whether to count these records toward the number
needed to trigger an automatic keyframe snap.
:param per_character: Whether to group the records by the character they
are about when loading. Default ``False``. Won't work unless the return
type has a ``CharName`` annotation.
"""
if serialize_record is None:
return partial(
batched,
table,
key_len=key_len,
inc_rec_counter=inc_rec_counter,
per_character=per_character,
)
Batch.serializers[table] = serialize_record
serialized_tuple_type = get_type_hints(serialize_record)["return"]
def the_batch(
self,
) -> Batch[serialized_tuple_type]:
if self is None:
mth = partial(serialize_record, EngineFacade(None))
else:
mth = MethodType(serialize_record, self)
return Batch(
self,
table,
key_len,
inc_rec_counter,
per_character,
mth,
)
return Batch.cached_properties.setdefault(
table, cached_property(the_batch)
)
def _fake_pack_or_unpack(obj):
return obj
[docs]
@define(eq=False)
class AbstractDatabaseConnector(ABC):
"""The interface between Lisien and wherever it's storing its data
Currently, there are three implementations (and
:class:`NullDatabaseConnector`, which does nothing, suitable only for tests).
This module contains the concrete class :class:`PythonDatabaseConnector`,
which stores all of Lisien's data as ordinary Python objects, to be
exported via :meth:`to_xml` and later imported by :meth:`load_xml`.
You should probably use the :meth:`lisien.engine.Engine.export`
and :meth:`lisien.engine.Engine.from_archive` methods instead. Lisien's
XML files are very large, and don't contain the game's code, but the
archives produced by :meth:`lisien.engine.Engine.export` are compressed
well, and include the code.
To more conveniently persist the state of the game between play sessions,
the modules :mod:`lisien.sql` and :mod:`lisien.pqdb` implement connectors
for SQL and ParquetDB databases, respectively. It shouldn't be necessary
to import them directly, as :class:`lisien.engine.Engine` will select
the appropriate connector class itself.
"""
db_type: ClassVar[str]
engine: AbstractEngine
@abstractmethod
def pack(
self,
obj: Key
| KeyHint
| EternalKey
| UniversalKey
| Stat
| ValueHint
| Value,
) -> bytes: ...
@abstractmethod
def unpack(self, b: bytes) -> Value: ...
@property
def keyframe_interval(self) -> int | None:
return self.engine.keyframe_interval
@property
def clear(self) -> bool:
return self.engine.clear
def kf_interval_override(self):
return self.engine._detect_kf_interval_override()
def snap_keyframe(self) -> None:
return self.engine.snap_keyframe(silent=True)
_initialized: bool = field(init=False, default=False)
_kf_interval_overridden: bool = field(init=False, default=False)
tree: ElementTree = field(init=False, eq=False)
@cached_property
def _records(self) -> int:
return 0
@cached_property
def _new_keyframe_times(self) -> set[Time]:
return set()
@cached_property
def all_rules(self) -> set[RuleName]:
return set()
@cached_property
def eternal(
self,
) -> MutableMapping[EternalKey | KeyHint, Value | ValueHint]:
return {
"branch": "trunk",
"turn": 0,
"tick": 0,
"language": "eng",
"trunk": "trunk",
"_lisien_schema_version": SCHEMA_VERSION,
}
@cached_property
def _lock(self) -> Lock:
return Lock()
@contextmanager
def mutex(self):
with self._lock:
yield
def dump_everything(self) -> dict[str, list[tuple]]:
"""Return the whole database in a Python dictionary.
You should probably use ``to_xml`` instead, but this could be helpful
for debugging, or if you have your own ideas about serialization.
"""
self.flush()
def entuple(rec):
if isinstance(rec, str):
return (rec,)
return tuple(rec)
return {
table: list(
map(entuple, getattr(self, f"{table}_dump")()),
)
for table in Batch.cached_properties
}
def __getstate__(
self,
) -> tuple[
dict[str, list[tuple]],
bool,
]:
return (
self.dump_everything(),
self._initialized,
)
def load_everything(self, state: dict[str, list[tuple]]):
for table, data in state.items():
prop = Batch.cached_properties[table]
batch = getattr(self, prop.attrname)
batch.extend(data)
self.flush()
def __setstate__(
self,
state: tuple[
dict[str, list[tuple]],
bool,
],
):
(
data,
self._initialized,
) = state
self.load_everything(data)
def __enter__(self) -> AbstractDatabaseConnector:
return self
def __exit__(self) -> None:
self.close()
@batched(
"global",
key_len=1,
inc_rec_counter=False,
)
def _eternal2set(
self, key: EternalKey, value: Value
) -> tuple[bytes, bytes]: ...
@batched(
"branches",
key_len=1,
inc_rec_counter=False,
)
def _branches2set(
self,
branch: Branch,
parent: Branch | None,
parent_turn: Turn,
parent_tick: Tick,
end_turn: Turn,
end_tick: Tick,
) -> tuple[Branch, Branch | None, Turn, Tick, Turn, Tick]: ...
@batched("turns", key_len=2)
def _turns2set(
self, branch: Branch, turn: Turn, end_tick: Tick, plan_end_tick: Tick
) -> tuple[Branch, Turn, Tick, Tick]: ...
@batched(
"turns_completed",
key_len=1,
)
def _turns_completed_to_set(
self, branch: Branch, turn: Turn
) -> tuple[Branch, Turn]: ...
def complete_turn(
self, branch: Branch, turn: Turn, discard_rules: bool = False
) -> None:
self._turns_completed_to_set.append((branch, turn))
if discard_rules:
self._char_rules_handled.clear()
self._unit_rules_handled_to_set.clear()
self._char_thing_rules_handled.clear()
self._char_place_rules_handled.clear()
self._char_portal_rules_handled.clear()
self._node_rules_handled_to_set.clear()
self._portal_rules_handled_to_set.clear()
@batched("plan_ticks", inc_rec_counter=False)
def _planticks2set(
self, plan: Plan, branch: Branch, turn: Turn, tick: Tick
) -> tuple[Plan, Branch, Turn, Tick]: ...
@batched("bookmarks", key_len=1, inc_rec_counter=False)
def _bookmarks2set(
self, key: Key, branch: Branch, turn: Turn, tick: Tick
) -> tuple[bytes, Branch, Turn, Tick]: ...
def set_bookmark(
self, key: Key, branch: Branch, turn: Turn, tick: Tick
) -> None:
self._bookmarks2set.append((key, branch, turn, tick))
@abstractmethod
def del_bookmark(self, key: Key) -> None: ...
@batched("universals", key_len=4)
def _universals2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
key: UniversalKey,
value: Value,
) -> tuple[Branch, Turn, Tick, bytes, bytes]: ...
@batched("rules", key_len=1)
def _rules2set(self, rule: RuleName) -> tuple[str]:
return (rule,)
@batched("rule_triggers", key_len=4)
def _triggers2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
rule: RuleName,
triggers: list[TriggerFuncName],
) -> tuple[Branch, Turn, Tick, RuleName, bytes]: ...
@batched("rule_prereqs", key_len=4)
def _prereqs2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
rule: RuleName,
prereqs: list[PrereqFuncName],
) -> tuple[Branch, Turn, Tick, RuleName, bytes]: ...
@batched("rule_actions", key_len=4)
def _actions2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
rule: RuleName,
actions: list[ActionFuncName],
) -> tuple[Branch, Turn, Tick, RuleName, bytes]: ...
@batched(
"rule_neighborhood",
key_len=4,
)
def _neighbors2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
rule: RuleName,
neighborhood: RuleNeighborhood,
) -> tuple[Branch, Turn, Tick, RuleName, RuleNeighborhood]: ...
@batched("rule_big", key_len=4)
def _big2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
rule: RuleName,
big: RuleBig,
) -> tuple[Branch, Turn, Tick, RuleName, RuleBig]: ...
@batched("rulebooks", key_len=4)
def _rulebooks2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
rulebook: RulebookName,
rules: list[RuleName] = (),
priority: RulebookPriority = 0.0,
) -> tuple[Branch, Turn, Tick, bytes, bytes, RulebookPriority]: ...
@batched("graphs", key_len=4)
def _graphs2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
graph: CharName,
type: GraphTypeStr,
) -> tuple[Branch, Turn, Tick, bytes, GraphTypeStr]: ...
@batched(
"character_rulebook",
key_len=4,
per_character=True,
)
def _character_rulebook_to_set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
character: CharName,
rulebook: RulebookName,
) -> tuple[Branch, Turn, Tick, bytes, bytes]: ...
@batched("unit_rulebook", key_len=4, per_character=True)
def _unit_rulebook_to_set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
character: CharName,
rulebook: RulebookName,
) -> tuple[Branch, Turn, Tick, bytes, bytes]: ...
@batched(
"character_thing_rulebook",
key_len=4,
per_character=True,
)
def _character_thing_rulebook_to_set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
character: CharName,
rulebook: RulebookName,
) -> tuple[Branch, Turn, Tick, bytes, bytes]: ...
@batched(
"character_place_rulebook",
key_len=4,
per_character=True,
)
def _character_place_rulebook_to_set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
character: CharName,
rulebook: RulebookName,
) -> tuple[Branch, Turn, Tick, bytes, bytes]: ...
@batched(
"character_portal_rulebook",
key_len=4,
per_character=True,
)
def _character_portal_rulebook_to_set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
character: CharName,
rulebook: RulebookName,
) -> tuple[Branch, Turn, Tick, bytes, bytes]: ...
@batched("node_rulebook", key_len=5, per_character=True)
def _noderb2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
character: CharName,
node: NodeName,
rulebook: RulebookName,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bytes]: ...
@batched(
"portal_rulebook",
key_len=6,
per_character=True,
)
def _portrb2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
character: CharName,
orig: NodeName,
dest: NodeName,
rulebook: RulebookName,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bytes, bytes]: ...
@batched("nodes", key_len=5, per_character=True)
def _nodes2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
graph: CharName,
node: NodeName,
extant: bool,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bool]: ...
@abstractmethod
def nodes_del_time(self, branch: Branch, turn: Turn, tick: Tick) -> None:
self._nodes2set.cull(
lambda b, r, t, *_: (b, r, t) == (branch, turn, tick)
)
@batched("edges", key_len=6, per_character=True)
def _edges2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
graph: CharName,
orig: NodeName,
dest: NodeName,
extant: bool,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bytes, bool]: ...
@abstractmethod
def edges_del_time(self, branch: Branch, turn: Turn, tick: Tick) -> None:
self._edges2set.cull(
lambda b, r, t, *_: (b, r, t) == (branch, turn, tick)
)
@batched("node_val", key_len=6, per_character=True)
def _nodevals2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
graph: CharName,
node: NodeName,
key: Stat,
value: Value,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bytes, bytes]: ...
@abstractmethod
def node_val_del_time(
self, branch: Branch, turn: Turn, tick: Tick
) -> None:
self._nodevals2set.cull(
lambda g, n, k, b, r, t, v: (b, r, t) == (branch, turn, tick)
)
@batched("edge_val", key_len=7, per_character=True)
def _edgevals2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
graph: CharName,
orig: NodeName,
dest: NodeName,
key: Stat,
value: Value,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bytes, bytes, bytes]: ...
@abstractmethod
def edge_val_del_time(
self, branch: Branch, turn: Turn, tick: Tick
) -> None:
self._edgevals2set.cull(
lambda b, r, t, *_: (b, r, t) == (branch, turn, tick)
)
@batched("graph_val", key_len=5, per_character=True)
def _graphvals2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
graph: CharName,
key: Stat,
value: Value,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bytes]: ...
@abstractmethod
def graph_val_del_time(
self, branch: Branch, turn: Turn, tick: Tick
) -> None:
self._graphvals2set.cull(
lambda g, k, b, r, t, v: (b, r, t) == (branch, turn, tick)
)
@batched(
"keyframes",
key_len=3,
inc_rec_counter=False,
)
def _new_keyframes(self, branch: Branch, turn: Turn, tick: Tick) -> Time:
return branch, turn, tick
@batched(
"keyframes_graphs",
key_len=4,
inc_rec_counter=False,
)
def _new_keyframes_graphs(
self,
branch: Branch,
turn: Turn,
tick: Tick,
graph: CharName,
nodes: NodeKeyframe,
edges: EdgeKeyframe,
graph_val: GraphValKeyframe,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bytes, bytes]: ...
@batched(
"keyframe_extensions",
key_len=3,
inc_rec_counter=False,
)
def _new_keyframe_extensions(
self,
branch: Branch,
turn: Turn,
tick: Tick,
universal: UniversalKeyframe,
rule: RuleKeyframe,
rulebook: RulebooksKeyframe,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bytes]: ...
@batched("character_rules_handled", key_len=5, inc_rec_counter=False)
def _char_rules_handled(
self,
branch: Branch,
turn: Turn,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
tick: Tick,
) -> tuple[
Branch,
Turn,
bytes,
bytes,
RuleName,
Tick,
]: ...
@batched("unit_rules_handled", key_len=7, inc_rec_counter=False)
def _unit_rules_handled_to_set(
self,
branch: Branch,
turn: Turn,
character: CharName,
graph: CharName,
unit: CharName,
rulebook: RulebookName,
rule: RuleName,
tick: Tick,
) -> tuple[Branch, Turn, bytes, bytes, bytes, bytes, RuleName, Tick]: ...
@batched("character_thing_rules_handled", key_len=6, inc_rec_counter=False)
def _char_thing_rules_handled(
self,
branch: Branch,
turn: Turn,
character: CharName,
thing: NodeName,
rulebook: RulebookName,
rule: RuleName,
tick: Tick,
) -> tuple[Branch, Turn, bytes, bytes, bytes, RuleName, Tick]: ...
@batched("character_place_rules_handled", key_len=6, inc_rec_counter=False)
def _char_place_rules_handled(
self,
branch: Branch,
turn: Turn,
character: CharName,
place: NodeName,
rulebook: RulebookName,
rule: RuleName,
tick: Tick,
) -> tuple[Branch, Turn, bytes, bytes, bytes, RuleName, Tick]: ...
@batched(
"character_portal_rules_handled", key_len=7, inc_rec_counter=False
)
def _char_portal_rules_handled(
self,
branch: Branch,
turn: Turn,
character: CharName,
orig: NodeName,
dest: NodeName,
rulebook: RulebookName,
rule: RuleName,
tick: Tick,
) -> tuple[Branch, Turn, bytes, bytes, bytes, bytes, RuleName, Tick]: ...
@batched("node_rules_handled", key_len=6, inc_rec_counter=False)
def _node_rules_handled_to_set(
self,
branch: Branch,
turn: Turn,
character: CharName,
node: NodeName,
rulebook: RulebookName,
rule: RuleName,
tick: Tick,
) -> tuple[Branch, Turn, bytes, bytes, bytes, RuleName, Tick]: ...
@batched("portal_rules_handled", key_len=7, inc_rec_counter=False)
def _portal_rules_handled_to_set(
self,
branch: Branch,
turn: Turn,
character: CharName,
orig: NodeName,
dest: NodeName,
rulebook: RulebookName,
rule: RuleName,
tick: Tick,
) -> tuple[Branch, Turn, bytes, bytes, bytes, bytes, RuleName, Tick]: ...
@batched("units", key_len=6, per_character=True)
def _unitness(
self,
branch: Branch,
turn: Turn,
tick: Tick,
character_graph: CharName,
unit_graph: CharName,
unit_node: NodeName,
is_unit: bool,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bytes, bool]: ...
@batched("things", key_len=5, per_character=True)
def _things2set(
self,
branch: Branch,
turn: Turn,
tick: Tick,
character: CharName,
thing: NodeName,
location: NodeName | EllipsisType,
) -> tuple[Branch, Turn, Tick, bytes, bytes, bytes]: ...
@abstractmethod
def universal_get(
self, key: UniversalKey, branch: Branch, turn: Turn, tick: Tick
) -> Value: ...
def universal_set(
self,
key: UniversalKey,
branch: Branch,
turn: Turn,
tick: Tick,
value: Value | type(...),
) -> None:
self._universals2set.append((branch, turn, tick, key, value))
def universal_del(
self, key: UniversalKey, branch: Branch, turn: Turn, tick: Tick
) -> None:
self.universal_set(key, branch, turn, tick, ...)
def exist_node(
self,
graph: CharName,
node: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
extant: bool,
) -> None:
self._nodes2set.append((branch, turn, tick, graph, node, extant))
@cached_property
def _all_keyframe_times(self):
return set()
def keyframe_insert(self, branch: Branch, turn: Turn, tick: Tick) -> None:
self._new_keyframes.append((branch, turn, tick))
self._all_keyframe_times.add((branch, turn, tick))
def keyframe_graph_insert(
self,
graph: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
nodes: NodeKeyframe,
edges: EdgeKeyframe,
graph_val: CharDict,
) -> None:
self._new_keyframes_graphs.append(
(branch, turn, tick, graph, nodes, edges, graph_val)
)
def keyframe_extension_insert(
self,
branch: Branch,
turn: Turn,
tick: Tick,
universal: UniversalKeyframe,
rule: RuleKeyframe,
rulebook: RulebooksKeyframe,
):
self._new_keyframe_extensions.append(
(
branch,
turn,
tick,
universal,
rule,
rulebook,
)
)
def node_val_set(
self,
graph: CharName,
node: NodeName,
key: Stat,
branch: Branch,
turn: Turn,
tick: Tick,
value: Value,
):
self._nodevals2set.append(
(branch, turn, tick, graph, node, key, value)
)
def edge_val_set(
self,
graph: CharName,
orig: NodeName,
dest: NodeName,
key: Stat,
branch: Branch,
turn: Turn,
tick: Tick,
value: Value,
) -> None:
self._edgevals2set.append(
(branch, turn, tick, graph, orig, dest, key, value)
)
def plans_insert(
self, plan: Plan, branch: Branch, turn: Turn, tick: Tick
) -> None:
self._planticks2set.append((plan, branch, turn, tick))
self._upd_plan_times(plan, branch, turn, tick)
self._upd_plan_ticks(plan, branch, turn, tick)
def _upd_plan_ticks(
self, plan: Plan, branch: Branch, turn: Turn, tick: Tick
):
if plan in self._plan_ticks:
if branch in self._plan_ticks[plan]:
self._plan_ticks[plan][branch].insert_time(turn, tick)
else:
self._plan_ticks[plan][branch] = LinearTimeListDict(
{turn: [tick]}
)
else:
self._plan_ticks[plan] = BranchingTimeListDict(
{branch: {turn: [tick]}}
)
def _upd_plan_times(
self, plan: Plan, branch: Branch, turn: Turn, tick: Tick
):
if plan in self._plan_times:
self._plan_times[plan].add((branch, turn, tick))
else:
self._plan_times[plan] = {(branch, turn, tick)}
def plans_insert_many(
self, many: list[tuple[Plan, Branch, Turn, Tick]]
) -> None:
self._planticks2set.extend(many)
for tup in many:
self._upd_plan_times(*tup)
self._upd_plan_ticks(*tup)
@garbage
def flush(self):
"""Put all pending changes into the SQL transaction, or write to disk."""
if (wat := self.echo("ready")) != "ready":
raise RuntimeError("Not ready to flush", wat)
self._flush()
if (wat := self.echo("flushed")) != "flushed":
raise RuntimeError("Failed flush", wat)
@mutexed
def _flush(self):
for table, serializer in Batch.serializers.items():
batch = getattr(self, serializer.__name__)
if not isinstance(batch, Batch):
raise TypeError(
"Batch was overwritten", table, serializer.__name__, batch
)
batch()
@cached_property
def logger(self):
from logging import getLogger
return getLogger("lisien." + self.__class__.__name__)
def log(self, level, msg, *args):
self.logger.log(level, msg, *args)
def debug(self, msg, *args):
self.logger.debug(msg, *args)
def info(self, msg, *args):
self.logger.info(msg, *args)
def warning(self, msg, *args):
self.logger.warning(msg, *args)
def error(self, msg, *args):
self.logger.error(msg, *args)
def critical(self, msg, *args):
self.logger.critical(msg, *args)
@abstractmethod
def echo(self, *args): ...
@abstractmethod
def call(self, query_name: str, *args, **kwargs): ...
@abstractmethod
def call_silent(self, query_name: str, *args, **kwargs): ...
@abstractmethod
def call_many(self, query_name: str, args: list) -> None: ...
@abstractmethod
def call_many_silent(self, query_name: str, args: list) -> None: ...
@abstractmethod
def insert_many(self, table_name: str, args: list[dict]) -> None: ...
@abstractmethod
def insert_many_silent(
self, table_name: str, args: list[dict]
) -> None: ...
@abstractmethod
def delete_many_silent(
self, table_name: str, args: list[dict]
) -> None: ...
@abstractmethod
def get_keyframe_extensions(
self, branch: Branch, turn: Turn, tick: Tick
) -> tuple[UniversalKeyframe, RuleKeyframe, RulebooksKeyframe]:
pass
@abstractmethod
def keyframes_dump(self) -> Iterator[Time]:
pass
@abstractmethod
def delete_keyframe(self, branch: Branch, turn: Turn, tick: Tick) -> None:
pass
def graphs_insert(
self,
graph: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
type: GraphTypeStr,
) -> None:
self._graphs2set.append((branch, turn, tick, graph, type))
def graph_val_set(
self,
graph: CharName,
key: Stat,
branch: Branch,
turn: Turn,
tick: Tick,
val: Value,
) -> None:
self._graphvals2set.append((branch, turn, tick, graph, key, val))
def exist_edge(
self,
graph: CharName,
orig: NodeName,
dest: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
extant: bool,
) -> None:
self._edges2set.append((branch, turn, tick, graph, orig, dest, extant))
@abstractmethod
def keyframes_graphs(
self,
) -> Iterator[tuple[CharName, Branch, Turn, Tick]]:
pass
@abstractmethod
def have_branch(self, branch: Branch) -> bool:
pass
@abstractmethod
def branches_dump(
self,
) -> Iterator[BranchRowType]:
pass
@abstractmethod
def global_get(self, key: Key) -> Value:
pass
@abstractmethod
def global_dump(self) -> Iterator[tuple[EternalKey, Value]]:
pass
@abstractmethod
def get_branch(self) -> Branch:
pass
@abstractmethod
def get_turn(self) -> Turn:
pass
@abstractmethod
def get_tick(self) -> Tick:
pass
def global_set(self, key: EternalKey, value: Value):
self._eternal2set.append((key, value))
def global_del(self, key: EternalKey) -> None:
self._eternal2set.append((key, ...))
def set_branch(
self,
branch: Branch,
parent: Branch,
parent_turn: Turn,
parent_tick: Tick,
end_turn: Turn,
end_tick: Tick,
) -> None:
self._branches2set.append(
(branch, parent, parent_turn, parent_tick, end_turn, end_tick)
)
def set_turn(
self, branch: Branch, turn: Turn, end_tick: Tick, plan_end_tick: Tick
) -> None:
self._turns2set.append((branch, turn, end_tick, plan_end_tick))
@abstractmethod
def turns_dump(self) -> Iterator[TurnRowType]:
pass
@abstractmethod
def graph_val_dump(self) -> Iterator[GraphValRowType]:
pass
@abstractmethod
def graphs_types(
self,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Optional[Turn] = None,
tick_to: Optional[Tick] = None,
) -> Iterator[GraphRowType]:
pass
@abstractmethod
def graphs_dump(
self,
) -> Iterator[GraphRowType]:
pass
@abstractmethod
def nodes_dump(self) -> Iterator[NodeRowType]:
pass
@abstractmethod
def node_val_dump(self) -> Iterator[NodeValRowType]:
pass
@abstractmethod
def edges_dump(self) -> Iterator[EdgeRowType]:
pass
@abstractmethod
def edge_val_dump(self) -> Iterator[EdgeValRowType]:
pass
@abstractmethod
def plan_ticks_dump(self) -> Iterator[PlanTicksRowType]:
pass
@abstractmethod
def commit(self) -> None:
pass
@abstractmethod
def close(self) -> None:
pass
@abstractmethod
def truncate_all(self) -> None:
pass
_infixes2load = [
"graphs",
"nodes",
"edges",
"graph_val",
"node_val",
"edge_val",
"things",
"units",
"character_rulebook",
"unit_rulebook",
"character_thing_rulebook",
"character_place_rulebook",
"character_portal_rulebook",
"node_rulebook",
"portal_rulebook",
"universals",
"rulebooks",
"rule_triggers",
"rule_prereqs",
"rule_actions",
"rule_neighborhood",
"rule_big",
"character_rules_handled",
"unit_rules_handled",
"character_thing_rules_handled",
"character_place_rules_handled",
"character_portal_rules_handled",
"node_rules_handled",
"portal_rules_handled",
]
def _increc(self, n: int = 1):
"""Snap a keyframe, if the keyframe interval has passed.
But the engine can override this behavior when it'd be impractical,
such as during a rule's execution. This defers the keyframe snap
until next we get a falsy result from the override function.
Not to be called directly. Instead, use a batch, likely created via
the ``@batch`` decorator.
"""
if n == 0:
return
if n < 0:
raise ValueError("Don't reduce the count of written records")
self._records += n
if not self._initialized:
return
override: bool | None = self.kf_interval_override()
if override:
self._kf_interval_overridden = True
return
elif self.keyframe_interval is not None and (
self._kf_interval_overridden
or self._records % self.keyframe_interval == 0
):
self.snap_keyframe()
self._kf_interval_overridden = False
@abstractmethod
def get_all_keyframe_graphs(
self, branch: Branch, turn: Turn, tick: Tick
) -> Iterator[tuple[CharName, NodeKeyframe, EdgeKeyframe, StatDict]]:
pass
def get_keyframe(self, branch: Branch, turn: Turn, tick: Tick) -> Keyframe:
universal_kf, rule_kf, rulebook_kf = self.get_keyframe_extensions(
branch, turn, tick
)
kf: Keyframe = {
"universal": universal_kf,
"rulebook": rulebook_kf,
**rule_kf,
}
for (
char,
node_val,
edge_val,
graph_val,
) in self.get_all_keyframe_graphs(branch, turn, tick):
if "node_val" in kf:
kf["node_val"][char] = node_val
else:
kf["node_val"] = {char: node_val}
if "edge_val" in kf:
kf["edge_val"][char] = edge_val
else:
kf["edge_val"] = {char: edge_val}
if "graph_val" in kf:
kf["graph_val"][char] = graph_val
else:
kf["graph_val"] = {char: graph_val}
return kf
@abstractmethod
def keyframes_graphs_dump(
self,
) -> Iterator[KeyframeGraphRowType]: ...
@abstractmethod
def keyframe_extensions_dump(
self,
) -> Iterator[KeyframeExtensionRowType]: ...
@abstractmethod
def universals_dump(
self,
) -> Iterator[UniversalRowType]:
pass
@abstractmethod
def rulebooks_dump(
self,
) -> Iterator[RulebookRowType]:
pass
@abstractmethod
def rules_dump(self) -> Iterator[RuleName]:
pass
@abstractmethod
def rule_triggers_dump(
self,
) -> Iterator[TriggerRowType]:
pass
@abstractmethod
def rule_prereqs_dump(
self,
) -> Iterator[PrereqRowType]:
pass
@abstractmethod
def rule_actions_dump(
self,
) -> Iterator[ActionRowType]:
pass
@abstractmethod
def rule_neighborhood_dump(
self,
) -> Iterator[RuleNeighborhoodRowType]:
pass
@abstractmethod
def rule_big_dump(
self,
) -> Iterator[RuleBigRowType]: ...
@abstractmethod
def node_rulebook_dump(
self,
) -> Iterator[NodeRulebookRowType]:
pass
@abstractmethod
def portal_rulebook_dump(
self,
) -> Iterator[PortalRulebookRowType]:
pass
@abstractmethod
def character_rulebook_dump(
self,
) -> Iterator[CharRulebookRowType]:
pass
@abstractmethod
def unit_rulebook_dump(
self,
) -> Iterator[CharRulebookRowType]:
pass
@abstractmethod
def character_thing_rulebook_dump(
self,
) -> Iterator[CharRulebookRowType]:
pass
@abstractmethod
def character_place_rulebook_dump(
self,
) -> Iterator[CharRulebookRowType]:
pass
@abstractmethod
def character_portal_rulebook_dump(
self,
) -> Iterator[CharRulebookRowType]:
pass
@abstractmethod
def character_rules_handled_dump(
self,
) -> Iterator[CharacterRulesHandledRowType]:
pass
@abstractmethod
def unit_rules_handled_dump(
self,
) -> Iterator[UnitRulesHandledRowType]:
pass
@abstractmethod
def character_thing_rules_handled_dump(
self,
) -> Iterator[NodeRulesHandledRowType]:
pass
@abstractmethod
def character_place_rules_handled_dump(
self,
) -> Iterator[NodeRulesHandledRowType]:
pass
@abstractmethod
def character_portal_rules_handled_dump(
self,
) -> Iterator[PortalRulesHandledRowType]:
pass
@abstractmethod
def node_rules_handled_dump(
self,
) -> Iterator[NodeRulesHandledRowType]:
pass
@abstractmethod
def portal_rules_handled_dump(
self,
) -> Iterator[PortalRulesHandledRowType]:
pass
@abstractmethod
def things_dump(
self,
) -> Iterator[tuple[CharName, NodeName, Branch, Turn, Tick, NodeName]]:
pass
@abstractmethod
def units_dump(
self,
) -> Iterator[
tuple[CharName, CharName, NodeName, Branch, Turn, Tick, bool]
]:
pass
@abstractmethod
def count_all_table(self, tbl: str) -> int:
pass
def set_rule_triggers(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
triggers: list[TriggerFuncName],
):
if rule in self.all_rules:
self._triggers2set.append((branch, turn, tick, rule, triggers))
else:
self.create_rule(rule, branch, turn, tick, triggers=triggers)
def set_rule_prereqs(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
prereqs: list[PrereqFuncName],
):
if rule in self.all_rules:
self._prereqs2set.append((branch, turn, tick, rule, prereqs))
else:
self.create_rule(rule, branch, turn, tick, prereqs=prereqs)
def set_rule_actions(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
actions: list[ActionFuncName],
):
if rule in self.all_rules:
self._actions2set.append((branch, turn, tick, rule, actions))
else:
self.create_rule(rule, branch, turn, tick, actions=actions)
def set_rule_neighborhood(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
neighborhood: RuleNeighborhood,
):
if rule in self.all_rules:
self._neighbors2set.append(
(branch, turn, tick, rule, neighborhood)
)
else:
self.create_rule(
rule, branch, turn, tick, neighborhood=neighborhood
)
def set_rule_big(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
big: RuleBig,
) -> None:
if rule in self.all_rules:
self._big2set.append((branch, turn, tick, rule, big))
else:
self.create_rule(rule, branch, turn, tick, big=big)
def create_rule(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
triggers: Iterable[TriggerFuncName] = (),
prereqs: Iterable[PrereqFuncName] = (),
actions: Iterable[ActionFuncName] = (),
neighborhood: RuleNeighborhood = None,
big: RuleBig = False,
) -> None:
self._triggers2set.append((branch, turn, tick, rule, list(triggers)))
self._prereqs2set.append((branch, turn, tick, rule, list(prereqs)))
self._actions2set.append((branch, turn, tick, rule, list(actions)))
self._neighbors2set.append((branch, turn, tick, rule, neighborhood))
self._big2set.append((branch, turn, tick, rule, big))
self._rules2set.append((rule,))
self.all_rules.add(rule)
def set_rulebook(
self,
name: RulebookName,
branch: Branch,
turn: Turn,
tick: Tick,
rules: Optional[list[RuleName]] = None,
prio: RulebookPriority = 0.0,
):
self._rulebooks2set.append(
(branch, turn, tick, name, rules or [], prio)
)
def set_character_rulebook(
self,
char: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
rb: RulebookName,
):
self._character_rulebook_to_set.append((branch, turn, tick, char, rb))
def set_unit_rulebook(
self,
char: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
rb: RulebookName,
):
self._unit_rulebook_to_set.append((branch, turn, tick, char, rb))
def set_character_thing_rulebook(
self,
char: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
rb: RulebookName,
):
self._character_thing_rulebook_to_set.append(
(branch, turn, tick, char, rb)
)
def set_character_place_rulebook(
self,
char: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
rb: RulebookName,
):
self._character_place_rulebook_to_set.append(
(branch, turn, tick, char, rb)
)
def set_character_portal_rulebook(
self,
char: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
rb: RulebookName,
):
self._character_portal_rulebook_to_set.append(
(branch, turn, tick, char, rb)
)
def rulebook_set(
self,
rulebook: RulebookName,
branch: Branch,
turn: Turn,
tick: Tick,
rules: list[RuleName],
priority: RulebookPriority,
):
self._rulebooks2set.append(
(branch, turn, tick, rulebook, rules, priority)
)
def set_node_rulebook(
self,
character: CharName,
node: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
rulebook: RulebookName,
):
self._noderb2set.append(
(branch, turn, tick, character, node, rulebook)
)
def set_portal_rulebook(
self,
character: CharName,
orig: NodeName,
dest: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
rulebook: RulebookName,
):
self._portrb2set.append(
(branch, turn, tick, character, orig, dest, rulebook)
)
def handled_character_rule(
self,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
):
self._char_rules_handled.append(
(branch, turn, character, rulebook, rule, tick)
)
def handled_unit_rule(
self,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
graph: CharName,
unit: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
):
self._unit_rules_handled_to_set.append(
(branch, turn, character, graph, unit, rulebook, rule, tick)
)
def handled_character_thing_rule(
self,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
thing: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
):
self._char_thing_rules_handled.append(
(branch, turn, character, thing, rulebook, rule, tick)
)
def handled_character_place_rule(
self,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
place: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
):
self._char_place_rules_handled.append(
(branch, turn, character, place, rulebook, rule, tick)
)
def handled_character_portal_rule(
self,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
orig: NodeName,
dest: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
):
self._char_portal_rules_handled.append(
(branch, turn, character, orig, dest, rulebook, rule, tick)
)
def handled_node_rule(
self,
character: CharName,
node: NodeName,
rulebook: RulebookName,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
):
self._node_rules_handled_to_set.append(
(branch, turn, character, node, rulebook, rule, tick)
)
def handled_portal_rule(
self,
character: CharName,
orig: NodeName,
dest: NodeName,
rulebook: RulebookName,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
):
self._portal_rules_handled_to_set.append(
(branch, turn, character, orig, dest, rulebook, rule, tick)
)
def set_thing_loc(
self,
character: CharName,
thing: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
loc: NodeName | EllipsisType,
):
self._things2set.append((branch, turn, tick, character, thing, loc))
@abstractmethod
def things_del_time(self, branch: Branch, turn: Turn, tick: Tick):
self._things2set.cull(
lambda b, r, t, *_: (b, r, t) == (branch, turn, tick)
)
def unit_set(
self,
character: CharName,
graph: CharName,
node: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
is_unit: bool,
):
self._unitness.append(
(branch, turn, tick, character, graph, node, is_unit)
)
@abstractmethod
def turns_completed_dump(self) -> Iterator[tuple[Branch, Turn]]:
pass
@abstractmethod
def bookmarks_dump(self) -> Iterator[tuple[Key, Time]]: ...
@abstractmethod
def _load_windows_into(self, ret: dict, windows: list[TimeWindow]): ...
@staticmethod
def empty_char() -> LoadedCharWindow:
nodes_l: list[NodeRowType] = []
edges_l: list[EdgeRowType] = []
graph_val_l: list[GraphValRowType] = []
node_val_l: list[NodeValRowType] = []
edge_val_l: list[EdgeValRowType] = []
things_l: list[ThingRowType] = []
units_l: list[UnitRowType] = []
character_rulebook_l: list[CharRulebookRowType] = []
unit_rulebook_l: list[CharRulebookRowType] = []
char_thing_rulebook_l: list[CharRulebookRowType] = []
char_place_rulebook_l: list[CharRulebookRowType] = []
char_portal_rulebook_l: list[CharRulebookRowType] = []
node_rulebook_l: list[NodeRulebookRowType] = []
portal_rulebook_l: list[PortalRulebookRowType] = []
return {
"nodes": nodes_l,
"edges": edges_l,
"graph_val": graph_val_l,
"node_val": node_val_l,
"edge_val": edge_val_l,
"things": things_l,
"units": units_l,
"character_rulebook": character_rulebook_l,
"unit_rulebook": unit_rulebook_l,
"character_thing_rulebook": char_thing_rulebook_l,
"character_place_rulebook": char_place_rulebook_l,
"character_portal_rulebook": char_portal_rulebook_l,
"node_rulebook": node_rulebook_l,
"portal_rulebook": portal_rulebook_l,
}
def load_windows(self, windows: list[TimeWindow]) -> LoadedDict:
self.debug(f"load_windows({windows})")
ret: LoadedDict = defaultdict(self.empty_char)
ret["universals"]: list[UniversalRowType] = []
ret["rule_triggers"]: list[RuleRowType] = []
ret["rule_prereqs"]: list[RuleRowType] = []
ret["rule_actions"]: list[RuleRowType] = []
ret["rule_neighborhood"]: list[RuleRowType] = []
ret["rule_big"]: list[RuleRowType] = []
ret["rulebooks"]: list[RulebookRowType] = []
ret["character_rules_handled"]: list[CharacterRulesHandledRowType] = []
ret["unit_rules_handled"]: list[UnitRulesHandledRowType] = []
ret["character_thing_rules_handled"]: list[
NodeRulesHandledRowType
] = []
ret["character_place_rules_handled"]: list[
NodeRulesHandledRowType
] = []
ret["character_portal_rules_handled"]: list[
PortalRulesHandledRowType
] = []
ret["node_rules_handled"]: list[NodeRulesHandledRowType] = []
ret["portal_rules_handled"]: list[PortalRulesHandledRowType] = []
ret["graphs"]: list[GraphRowType] = []
self.flush()
self._load_windows_into(ret, windows)
self.debug(f"finished loading windows {windows}")
for k, v in ret.items():
if isinstance(k, bytes):
raise TypeError("Character name not unpacked", k)
elif isinstance(k, str) and k.endswith("handled"):
# The rules-handled tables have the tick at the end because it's
# not in the primary key
v.sort(key=lambda t: (t[0], t[1], t[-1], *t[2:-1]))
elif isinstance(v, list):
v.sort()
elif isinstance(v, dict):
for kk, vv in v.items():
vv.sort()
else:
raise TypeError("Bad loaded dictionary", v)
return dict(ret)
@cached_property
def _plan_ticks(self) -> dict[Plan, BranchingTimeListDict]:
return PickierDefaultDict(int, BranchingTimeListDict)
def to_etree(self, name: str) -> ElementTree:
self.tree = ElementTree(Element("lisien"))
root = self.tree.getroot()
self.commit()
eternals = dict(self.eternal.items())
root.set(
"db-schema-version", str(eternals.pop("_lisien_schema_version"))
)
root.set("xml-schema-version", str(XML_SCHEMA_VERSION))
root.set("trunk", str(eternals.pop("trunk")))
root.set("branch", str(eternals.pop("branch")))
root.set("turn", str(eternals.pop("turn")))
root.set("tick", str(eternals.pop("tick")))
if "language" in eternals:
root.set("language", str(eternals.pop("language")))
for k in sort_set(eternals.keys()):
el = Element("dict-item", key=repr(k))
root.append(el)
el.append(self._value_to_xml_el(eternals[k]))
trunks = set()
branches_d = {}
branch_descendants = {}
turn_end_plan_d: dict[Branch, dict[Turn, tuple[Tick, Tick]]] = {}
branch_elements = {}
playtrees: dict[Branch, Element] = {}
turns_completed_d: dict[Branch, Turn] = dict(
self.turns_completed_dump()
)
keyframe_times: set[Time] = set(self.keyframes_dump())
for (
branch,
turn,
last_real_tick,
last_planned_tick,
) in self.turns_dump():
if branch in turn_end_plan_d:
turn_end_plan_d[branch][turn] = (
last_real_tick,
last_planned_tick,
)
else:
turn_end_plan_d[branch] = {
turn: (last_real_tick, last_planned_tick)
}
branch2do = deque(sorted(self.branches_dump()))
while branch2do:
(
branch,
parent,
parent_turn,
parent_tick,
end_turn,
end_tick,
) = branch2do.popleft()
branches_d[branch] = (
parent,
parent_turn,
parent_tick,
end_turn,
end_tick,
)
if parent is None:
trunks.add(branch)
playtree = Element("playtree", trunk=branch)
if name is not None:
playtree.set("game", name)
playtrees[branch] = playtree
branch_element = branch_elements[branch] = Element(
"branch",
{
"name": branch,
"start-turn": "0",
"start-tick": "0",
"end-turn": str(end_turn),
"end-tick": str(end_tick),
},
)
if branch in turns_completed_d:
branch_element.set(
"last-turn-completed", str(turns_completed_d[branch])
)
root.append(playtree)
playtree.append(branch_element)
else:
if parent in branch_descendants:
branch_descendants[parent].add(branch)
else:
branch_descendants[parent] = {branch}
if parent in branch_elements:
branch_el = Element(
"branch",
{
"name": branch,
"parent": parent,
"start-turn": str(parent_turn),
"start-tick": str(parent_tick),
"end-turn": str(end_turn),
"end-tick": str(end_tick),
},
)
if branch in turns_completed_d:
branch_el.set(
"last-turn-completed",
str(turns_completed_d[branch]),
)
branch_elements[parent].append(branch_el)
else:
branch2do.append(
(
branch,
parent,
parent_turn,
parent_tick,
end_turn,
end_tick,
)
)
def recurse_branch(b: Branch):
parent, turn_from, tick_from, turn_to, tick_to = branches_d[b]
if b in turn_end_plan_d:
turn_to, tick_to = max(
[
(turn_to, tick_to),
*(
(r, t)
for r, (_, t) in turn_end_plan_d[branch].items()
),
]
)
data = self.load_windows(
[(b, turn_from, tick_from, turn_to, tick_to)]
)
self._fill_branch_element(
branch_elements[b],
turn_end_plan_d[b],
keyframe_times,
data,
)
if b in branch_descendants:
for desc in sorted(branch_descendants[b], key=branches_d.get):
recurse_branch(desc)
for trunk in trunks:
recurse_branch(trunk)
return self.tree
@classmethod
def _value_to_xml_el(cls, value: Value | dict[Key, Value]) -> Element:
if value is ...:
return Element("Ellipsis")
elif value is None:
return Element("None")
elif isinstance(value, bool):
return Element("bool", value="true" if value else "false")
elif isinstance(value, int):
return Element("int", value=str(value))
elif isinstance(value, float):
return Element("float", value=str(value))
elif isinstance(value, str):
return Element("str", value=value)
elif isinstance(value, lisien.types.DiGraph):
# Since entity names are restricted to what we can use for dict
# keys and also serialize to msgpack, I don't think there's any name
# an entity can have that can't be repr'd
return Element("character", name=repr(value.name))
elif isinstance(value, lisien.types.Node):
return Element(
"node",
character=repr(value.character.name),
name=repr(value.name),
)
elif isinstance(value, lisien.types.Edge):
return Element(
"portal",
character=repr(value.character.name),
origin=repr(value.orig),
destination=repr(value.dest),
)
elif isinstance(value, nx.Graph):
return nx.readwrite.GraphMLWriter(value).myElement
elif isinstance(value, FunctionType) or isinstance(value, MethodType):
if value.__module__ not in (
"trigger",
"prereq",
"action",
"function",
"method",
):
raise ValueError(
"Callable is not stored in the Lisien engine", value
)
return Element(value.__module__, name=value.__name__)
elif isinstance(value, Exception):
# weird but ok
el = Element("exception", pyclass=value.__class__.__name__)
if hasattr(value, "__traceback__"):
el.set("traceback", str(Traceback(value.__traceback__)))
for arg in value.args:
el.append(cls._value_to_xml_el(arg))
return el
elif isinstance(value, list):
el = Element("list")
for v in value:
el.append(cls._value_to_xml_el(v))
return el
elif isinstance(value, tuple):
el = Element("tuple")
for v in value:
el.append(cls._value_to_xml_el(v))
return el
elif isinstance(value, Set):
if isinstance(value, (set, MutableSet)):
el = Element("set")
for v in value:
el.append(cls._value_to_xml_el(v))
return el
else:
el = Element("frozenset")
for v in value:
el.append(cls._value_to_xml_el(v))
return el
elif isinstance(value, Mapping):
el = Element("dict")
for k, v in value.items():
dict_item = Element("dict-item", key=repr(k))
dict_item.append(cls._value_to_xml_el(v))
el.append(dict_item)
return el
else:
raise TypeError("Can't convert to XML", value)
@classmethod
def _add_keyframe_to_turn_el(
cls,
turn_el: Element,
tick: Tick,
keyframe: Keyframe,
) -> None:
kfel = Element("keyframe", tick=str(tick))
turn_el.append(kfel)
universal_d: dict[Key, Value] = keyframe.get("universal", {})
univel = cls._value_to_xml_el(universal_d)
univel.tag = "universal"
kfel.append(univel)
triggers_kf: dict[RuleName, list[TriggerFuncName]] = keyframe.get(
"triggers", {}
)
prereqs_kf: dict[RuleName, list[PrereqFuncName]] = keyframe.get(
"prereqs", {}
)
actions_kf: dict[RuleName, list[ActionFuncName]] = keyframe.get(
"actions", {}
)
neighborhoods_kf: dict[RuleName, RuleNeighborhood] = keyframe.get(
"neighborhood", {}
)
bigs_kf: dict[RuleName, RuleBig] = keyframe.get("big", {})
for rule_name in sorted(
triggers_kf.keys() | prereqs_kf.keys() | actions_kf.keys()
):
rule_name: RuleName
rule_el = Element(
"rule",
name=rule_name,
)
kfel.append(rule_el)
if rule_name in bigs_kf and bigs_kf[rule_name]:
rule_el.set("big", "true")
if (
rule_name in neighborhoods_kf
and neighborhoods_kf[rule_name] is not None
):
rule_el.set(
"neighborhood",
str(neighborhoods_kf[rule_name]),
)
if trigs := triggers_kf.get(rule_name):
for trig in trigs:
rule_el.append(Element("trigger", name=trig))
if preqs := prereqs_kf.get(rule_name):
for preq in preqs:
rule_el.append(Element("prereq", name=preq))
if acts := actions_kf.get(rule_name):
for act in acts:
rule_el.append(Element("action", name=act))
rulebook_kf: dict[
RulebookName, tuple[list[RuleName], RulebookPriority]
] = keyframe.get("rulebook", {})
for rulebook_name, (rule_list, priority) in rulebook_kf.items():
rulebook_el = Element(
"rulebook", name=repr(rulebook_name), priority=repr(priority)
)
kfel.append(rulebook_el)
for rule_name in rule_list:
rulebook_el.append(Element("rule", name=rule_name))
char_els: dict[CharName, Element] = {}
graph_val_kf: GraphValKeyframe = keyframe.get("graph_val", {})
for char_name, vals in sorted(graph_val_kf.items()):
graph_el = char_els[char_name] = Element(
"character", name=repr(char_name)
)
kfel.append(graph_el)
if units_kf := vals.pop("units", {}):
units_el = Element("units")
any_unit_graphs = False
for graph, nodes in units_kf.items():
unit_graphs_el = Element("graph", character=repr(graph))
any_unit_nodes = False
for node, is_unit in nodes.items():
if is_unit:
any_unit_nodes = True
unit_graphs_el.append(
Element("unit", node=repr(node))
)
if any_unit_nodes:
units_el.append(unit_graphs_el)
any_unit_graphs = True
if any_unit_graphs:
graph_el.append(units_el)
if "character_rulebook" in vals:
graph_el.set(
"character-rulebook",
repr(
vals.pop(
"character_rulebook",
("character_rulebook", char_name),
)
),
)
if "unit_rulebook" in vals:
graph_el.set(
"unit-rulebook",
repr(
vals.pop("unit_rulebook", ("unit_rulebook", char_name))
),
)
if "character_thing_rulebook" in vals:
graph_el.set(
"character-thing-rulebook",
repr(
vals.pop(
"character_thing_rulebook",
("character_thing_rulebook", char_name),
)
),
)
if "character_place_rulebook" in vals:
graph_el.set(
"character-place-rulebook",
repr(
vals.pop(
"character_place_rulebook",
("character_place_rulebook", char_name),
)
),
)
if "character_portal_rulebook" in vals:
graph_el.set(
"character-portal-rulebook",
repr(
vals.pop(
"character_portal_rulebook",
("character_portal_rulebook", char_name),
)
),
)
for k, v in vals.items():
item_el = Element("dict-item", key=repr(k))
graph_el.append(item_el)
item_el.append(cls._value_to_xml_el(v))
node_val_kf: GraphNodeValKeyframe = keyframe.get("node_val", {})
for char_name, node_vals in node_val_kf.items():
if char_name in char_els:
char_el = char_els[char_name]
else:
char_el = char_els[char_name] = Element(
"character", name=repr(char_name)
)
kfel.append(char_el)
for node, val in node_vals.items():
node_el = Element(
"node",
name=repr(node),
)
if "rulebook" in val:
node_el.set("rulebook", repr(val.pop("rulebook")))
char_el.append(node_el)
for k, v in val.items():
item_el = Element("dict-item", key=repr(k))
node_el.append(item_el)
item_el.append(cls._value_to_xml_el(v))
edge_val_kf: GraphEdgeValKeyframe = keyframe.get("edge_val", {})
for char_name, edge_vals in edge_val_kf.items():
if char_name in char_els:
char_el = char_els[char_name]
else:
char_el = char_els[char_name] = Element(
"character", name=repr(char_name)
)
kfel.append(char_el)
for orig, dests in edge_vals.items():
for dest, val in dests.items():
edge_el = Element(
"edge",
orig=repr(orig),
dest=repr(dest),
)
if "rulebook" in val:
edge_el.set("rulebook", repr(val.pop("rulebook")))
char_el.append(edge_el)
for k, v in val.items():
item_el = Element("dict-item", key=repr(k))
edge_el.append(item_el)
item_el.append(cls._value_to_xml_el(v))
[docs]
def to_xml(self, name: str, indent: bool = True) -> str:
"""Return a string XML representation of the whole database
Use ``load_xml`` to load it later.
:param name: What to call the game in the XML.
:param indent: Whether to format the XML for human readers. Default
``True``.
"""
file = StringIO()
self.write_xml(file, name, indent)
return file.getvalue()
[docs]
def write_xml(
self,
file: str | os.PathLike | IO[str | bytes],
name: str | None = None,
indent: bool = True,
) -> None:
"""Serialize the whole database to an XML file
:param file: A file name, or a file object.
:param name: Optional name to give to the game in the XML. Defaults
to the name of the file, minus .xml extension.
:param indent: Whether to format the XML for a human reader. Default
``True``.
"""
if not isinstance(file, os.PathLike) and not isinstance(file, IOBase):
if file is None:
if name is None:
raise ValueError("Need a name or a path")
file = os.path.join(os.getcwd(), name + ".xml")
file = Path(file)
name = name.removesuffix(".xml")
tree = self.to_etree(name)
if indent:
indent_tree(tree)
tree.write(file, encoding="utf-8")
def _set_plans(
self, el: Element, branch: Branch, turn: Turn, tick: Tick
) -> None:
plans = []
for plan, times in self._plan_times.items():
if (branch, turn, tick) in times:
plans.append(plan)
if plans:
el.set("plans", ",".join(map(str, plans)))
def _universals_el(self, universal_rec: UniversalRowType) -> Element:
b, r, t, key, val = universal_rec
univ_el = Element(
"universal",
key=repr(key),
tick=str(t),
)
univ_el.append(self._value_to_xml_el(val))
self._set_plans(univ_el, b, r, t)
return univ_el
def _rulebooks_el(self, rulebook_rec: RulebookRowType) -> Element:
b, r, t, rb, rules, prio = rulebook_rec
rb_el = Element(
"rulebook",
name=repr(rb),
priority=repr(prio),
tick=str(t),
)
for rule in rules:
rb_el.append(Element("rule", name=rule))
self._set_plans(rb_el, b, r, t)
return rb_el
def _rule_flist_el(
self,
typ: str,
rec: TriggerRowType | PrereqRowType | ActionRowType,
) -> Element:
branch, turn, tick, rule, funcs = rec
func_el = Element(f"{typ}s", rule=rule, tick=str(tick))
for func in funcs:
func_el.append(Element(typ[5:], name=func))
self._set_plans(func_el, branch, turn, tick)
return func_el
_rule_triggers_el = partialmethod(_rule_flist_el, "rule-trigger")
_rule_prereqs_el = partialmethod(_rule_flist_el, "rule-prereq")
_rule_actions_el = partialmethod(_rule_flist_el, "rule-action")
def _rule_neighborhood_el(
self, nbr_rec: RuleNeighborhoodRowType
) -> Element:
branch, turn, tick, rule, nbr = nbr_rec
if nbr is not None:
nbr_el = Element(
"rule-neighborhood",
rule=rule,
tick=str(tick),
neighbors=str(nbr),
)
else:
nbr_el = Element("rule-neighborhood", rule=rule, tick=str(tick))
self._set_plans(nbr_el, branch, turn, tick)
return nbr_el
def _rule_big_el(self, big_rec: RuleBigRowType) -> Element:
branch, turn, tick, rule, big = big_rec
el = Element(
"rule-big",
rule=rule,
tick=str(tick),
big="true" if big else "false",
)
self._set_plans(el, branch, turn, tick)
return el
def _graphs_el(self, graph: GraphRowType) -> Element:
b, r, t, char, typ_str = graph
graph_el = Element(
"graph",
character=repr(char),
tick=str(t),
type=typ_str,
)
self._set_plans(graph_el, b, r, t)
return graph_el
def _graph_val_el(self, graph_val: GraphValRowType) -> Element:
b, r, t, char, stat, val = graph_val
graph_val_el = Element(
"graph-val",
character=repr(char),
key=repr(stat),
tick=str(t),
)
graph_val_el.append(self._value_to_xml_el(val))
self._set_plans(graph_val_el, b, r, t)
return graph_val_el
def _nodes_el(self, nodes: NodeRowType) -> Element:
b, r, t, char, node, ex = nodes
node_el = Element(
"node",
character=repr(char),
name=repr(node),
tick=str(t),
exists="true" if ex else "false",
)
self._set_plans(node_el, b, r, t)
return node_el
def _node_val_el(self, node_val: NodeValRowType) -> Element:
b, r, t, char, node, stat, val = node_val
node_val_el = Element(
"node-val",
character=repr(char),
node=repr(node),
key=repr(stat),
tick=str(t),
)
node_val_el.append(self._value_to_xml_el(val))
self._set_plans(node_val_el, b, r, t)
return node_val_el
def _edges_el(self, edges: EdgeRowType) -> Element:
b, r, t, char, orig, dest, ex = edges
edge_el = Element(
"edge",
character=repr(char),
orig=repr(orig),
dest=repr(dest),
tick=str(t),
exists="true" if ex else "false",
)
self._set_plans(edge_el, b, r, t)
return edge_el
def _edge_val_el(self, edge_val: EdgeValRowType) -> Element:
b, r, t, char, orig, dest, stat, val = edge_val
edge_val_el = Element(
"edge-val",
character=repr(char),
orig=repr(orig),
dest=repr(dest),
key=repr(stat),
tick=str(t),
)
edge_val_el.append(self._value_to_xml_el(val))
self._set_plans(edge_val_el, b, r, t)
return edge_val_el
def _things_el(self, thing: ThingRowType) -> Element:
b, r, t, char, thing, loc = thing
loc_el = Element(
"location",
character=repr(char),
thing=repr(thing),
tick=str(t),
location=repr(loc),
)
self._set_plans(loc_el, b, r, t)
return loc_el
def _units_el(self, unit: UnitRowType) -> Element:
b, r, t, char, graph, node, is_unit = unit
unit_el = Element(
"unit",
{
"character-graph": repr(char),
"unit-graph": repr(graph),
"unit-node": repr(node),
"tick": str(t),
},
)
unit_el.set("is-unit", "true" if is_unit else "false")
self._set_plans(unit_el, b, r, t)
return unit_el
def _char_rb_el(self, rbtyp: str, rbrow: CharRulebookRowType) -> Element:
b, r, t, char, rb = rbrow
chrbel = Element(
rbtyp,
character=repr(char),
tick=str(t),
rulebook=repr(rb),
)
self._set_plans(chrbel, b, r, t)
return chrbel
def _node_rulebook_el(self, nrb_row: NodeRulebookRowType) -> Element:
b, r, t, char, node, rb = nrb_row
nrb_el = Element(
"node-rulebook",
character=repr(char),
node=repr(node),
tick=str(t),
rulebook=repr(rb),
)
self._set_plans(nrb_el, b, r, t)
return nrb_el
def _portal_rulebook_el(
self, port_rb_row: PortalRulebookRowType
) -> Element:
b, r, t, char, orig, dest, rb = port_rb_row
porb_el = Element(
"portal-rulebook",
character=repr(char),
orig=repr(orig),
dest=repr(dest),
tick=str(t),
rulebook=repr(rb),
)
self._set_plans(porb_el, b, r, t)
return porb_el
@staticmethod
def _get_current_rule_el(data: dict) -> Element | None:
# Rules record the time they finished running, not the time they
# started. So we need to peek ahead to find out what the current
# rule is.
earliest = (float("inf"), float("inf"))
earliest_key = None
if data["character_rules_handled"]:
rec = data["character_rules_handled"][0]
turn: Turn = rec[1]
tick: Tick = rec[-1]
handled_at = (turn, tick)
if handled_at < earliest:
earliest = handled_at
earliest_key = "character_rules_handled"
if data["unit_rules_handled"]:
rec = data["unit_rules_handled"][0]
turn: Turn = rec[1]
tick: Tick = rec[-1]
handled_at = (turn, tick)
if handled_at < earliest:
earliest = handled_at
earliest_key = "unit_rules_handled"
if data["character_thing_rules_handled"]:
rec = data["character_thing_rules_handled"][0]
turn: Turn = rec[1]
tick: Tick = rec[-1]
handled_at = (turn, tick)
if handled_at < earliest:
earliest = handled_at
earliest_key = "character_thing_rules_handled"
if data["character_place_rules_handled"]:
rec = data["character_place_rules_handled"][0]
turn: Turn = rec[1]
tick: Tick = rec[-1]
handled_at = (turn, tick)
if handled_at < earliest:
earliest = handled_at
earliest_key = "character_place_rules_handled"
if data["character_portal_rules_handled"]:
rec = data["character_portal_rules_handled"][0]
turn: Turn = rec[1]
tick: Tick = rec[-1]
handled_at = (turn, tick)
if handled_at < earliest:
earliest = handled_at
earliest_key = "character_portal_rules_handled"
if data["node_rules_handled"]:
rec = data["node_rules_handled"][0]
turn: Turn = rec[1]
tick: Tick = rec[-1]
handled_at = (turn, tick)
if handled_at < earliest:
earliest = handled_at
earliest_key = "node_rules_handled"
if data["portal_rules_handled"]:
rec = data["portal_rules_handled"][0]
turn: Turn = rec[1]
tick: Tick = rec[-1]
handled_at = (turn, tick)
if handled_at < earliest:
earliest = handled_at
earliest_key = "portal_rules_handled"
if earliest_key is None:
return None
ret = Element(
"rule",
{"end-tick": str(earliest[1])},
)
match earliest_key:
case "character_rules_handled":
b, r, char, rb, rule, t = data["character_rules_handled"].pop(
0
)
ret.set("type", "character")
ret.set("character", repr(char))
ret.set("rulebook", repr(rb))
ret.set("name", rule)
case "unit_rules_handled":
b, r, char, graph, unit, rb, rule, t = data[
"unit_rules_handled"
].pop(0)
ret.set("type", "unit")
ret.set("character", repr(char))
ret.set("graph", repr(graph))
ret.set("unit", repr(unit))
ret.set("rulebook", repr(rb))
ret.set("name", rule)
case "character_thing_rules_handled":
b, r, char, thing, rb, rule, t = data[
"character_thing_rules_handled"
].pop(0)
ret.set("type", "character-thing")
ret.set("character", repr(char))
ret.set("thing", repr(thing))
ret.set("rulebook", repr(rb))
ret.set("name", rule)
case "character_place_rules_handled":
b, r, char, place, rb, rule, t = data[
"character_place_rules_handled"
].pop(0)
ret.set("type", "character-place")
ret.set("character", repr(char))
ret.set("place", repr(place))
ret.set("rulebook", repr(rb))
ret.set("name", rule)
case "character_portal_rules_handled":
b, r, char, orig, dest, rb, rule, t = data[
"character_portal_rules_handled"
].pop(0)
ret.set("type", "character-portal")
ret.set("character", repr(char))
ret.set("origin", repr(orig))
ret.set("destination", repr(dest))
ret.set("rulebook", repr(rb))
ret.set("name", rule)
case "node_rules_handled":
b, r, char, node, rb, rule, t = data["node_rules_handled"].pop(
0
)
ret.set("type", "node")
ret.set("character", repr(char))
ret.set("node", repr(node))
ret.set("rulebook", repr(rb))
ret.set("name", rule)
return ret
def _fill_branch_element(
self,
branch_el: Element,
turn_ends: dict[Turn, tuple[Tick, Tick]],
keyframe_times: set[Time],
data: LoadedDict,
):
branch_ = branch_el.get("name")
if branch_ is None:
raise TypeError("branch missing")
branch_now = Branch(branch_)
uncharacterized = ILLEGAL_CHARACTER_NAMES
for turn_now, (ending_tick, plan_ending_tick) in sorted(
turn_ends.items()
):
turn_el = Element(
"turn",
{
"number": str(turn_now),
"end-tick": str(ending_tick),
"plan-end-tick": str(plan_ending_tick),
},
)
branch_el.append(turn_el)
current_rule_el: Element | None = self._get_current_rule_el(data)
if current_rule_el is not None:
turn_el.append(current_rule_el)
def get_current_el() -> Element:
if current_rule_el is None:
return turn_el
return current_rule_el
tick_now: Tick
for tick_now in range(plan_ending_tick + 1):
now = (branch_now, turn_now, tick_now)
if current_rule_el is None:
current_rule_el = self._get_current_rule_el(data)
if current_rule_el is not None:
turn_el.append(current_rule_el)
# Loop over rules that didn't result in any changes, if needed
while current_rule_el is not None and tick_now >= (
int(current_rule_el.get("end-tick"))
):
current_rule_el = self._get_current_rule_el(data)
if current_rule_el is not None:
assert "name" in current_rule_el.keys()
turn_el.append(current_rule_el)
if now in keyframe_times:
kf = self.get_keyframe(branch_now, turn_now, tick_now)
self._add_keyframe_to_turn_el(turn_el, tick_now, kf)
keyframe_times.remove((branch_now, turn_now, tick_now))
for uncharacter in sorted(uncharacterized):
if recs := data.get(uncharacter):
rec = recs[0]
while rec[:3] == now:
el = getattr(self, f"_{uncharacter}_el")(rec)
get_current_el().append(el)
del recs[0]
if not recs:
break
rec = recs[0]
for char_name in sort_set(data.keys() - uncharacterized):
char_data: LoadedCharWindow = data[char_name]
charkey_literal = Literal[
"graph_val",
"nodes",
"node_val",
"edges",
"edge_val",
"units",
"node_rulebook",
"portal_rulebook",
"things",
]
charkey: charkey_literal
for charkey in get_args(charkey_literal):
if recs := char_data.get(charkey):
rec = recs[0]
while rec[:3] == now:
el = getattr(self, f"_{charkey}_el")(rec)
get_current_el().append(el)
del recs[0]
if not recs:
break
rec = recs[0]
char_rb_typ_literal = Literal[
"character_rulebook",
"unit_rulebook",
"character_thing_rulebook",
"character_place_rulebook",
"character_portal_rulebook",
]
char_rb_typ: char_rb_typ_literal
for char_rb_typ in get_args(char_rb_typ_literal):
if char_rb_rows := char_data.get(char_rb_typ):
char_rb_rows: list[CharRulebookRowType]
char_rb_row = char_rb_rows[0]
while now == char_rb_row[:3]:
crbel = self._char_rb_el(
char_rb_typ.replace("_", "-"),
char_rb_row,
)
get_current_el().append(crbel)
del char_rb_rows[0]
if not char_rb_rows:
break
char_rb_row = char_rb_rows[0]
for k in uncharacterized:
if k in data:
assert not data[k], f"Leftover data in {k}: {data[k]}"
for char_name in data.keys() - uncharacterized:
for k, v in data[char_name].items():
assert not v, f"Leftover data in {char_name}'s {k}: {v}"
assert not keyframe_times, keyframe_times
@cached_property
def _known_triggers(
self,
) -> dict[
RuleName,
dict[
Branch,
AssignmentTimeDict[list[TriggerFuncName]],
],
]:
return defaultdict(partial(defaultdict, AssignmentTimeDict))
@cached_property
def _known_prereqs(
self,
) -> dict[
RuleName,
dict[
Branch,
AssignmentTimeDict[list[PrereqFuncName]],
],
]:
return defaultdict(partial(defaultdict, AssignmentTimeDict))
@cached_property
def _known_actions(
self,
) -> dict[
RuleName,
dict[
Branch,
AssignmentTimeDict[list[ActionFuncName]],
],
]:
return defaultdict(partial(defaultdict, AssignmentTimeDict))
@cached_property
def _known_neighborhoods(
self,
) -> dict[
RuleName,
dict[
Branch,
AssignmentTimeDict[RuleNeighborhood],
],
]:
return defaultdict(partial(defaultdict, AssignmentTimeDict))
@cached_property
def _known_big(
self,
) -> dict[
RuleName,
dict[Branch, AssignmentTimeDict[RuleBig]],
]:
return defaultdict(partial(defaultdict, AssignmentTimeDict))
@cached_property
def _plan_times(self) -> dict[Plan, set[Time]]:
return {}
def _element_to_value(self, el: Element) -> Value | EllipsisType:
eng = self.engine
match el.tag:
case "Ellipsis":
return ...
case "None":
return Value(None)
case "int":
return Value(int(el.get("value")))
case "float":
return Value(float(el.get("value")))
case "str":
return Value(el.get("value"))
case "bool":
return Value(el.get("value") in {"T", "true"})
case "character":
name = CharName(literal_eval(el.get("name")))
return Value(eng.character[name])
case "node":
char_name = CharName(literal_eval(el.get("character")))
place_name = NodeName(literal_eval(el.get("name")))
return Value(eng.character[char_name].node[place_name])
case "portal":
char_name = CharName(literal_eval(el.get("character")))
orig = NodeName(literal_eval(el.get("origin")))
dest = NodeName(literal_eval(el.get("destination")))
return Value(eng.character[char_name].portal[orig][dest])
case "list":
return Value([self._element_to_value(listel) for listel in el])
case "tuple":
return Value(
tuple(self._element_to_value(tupel) for tupel in el)
)
case "set":
return Value({self._element_to_value(setel) for setel in el})
case "frozenset":
return Value(
frozenset(self._element_to_value(setel) for setel in el)
)
case "dict":
ret = {}
for dict_item_el in el:
ret[literal_eval(dict_item_el.get("key"))] = (
self._element_to_value(dict_item_el[0])
)
return Value(ret)
case "exception":
raise NotImplementedError(
"Deserializing exceptions from XML not implemented"
)
case s if s in {
"trigger",
"prereq",
"action",
"function",
"method",
}:
return getattr(getattr(eng, s), el.get("name"))
case default:
raise ValueError("Can't deserialize the element", default)
@staticmethod
def _get_time(branch_el: Element, turn_el: Element, el: Element) -> Time:
ret = (
Branch(branch_el.get("name")),
Turn(int(turn_el.get("number"))),
Tick(int(el.get("tick"))),
)
if not isinstance(ret[0], str):
raise TypeError("nonstring branch", ret[0])
return ret
def _keyframe_rec(
self, branch_el: Element, turn_el: Element, kf_el: Element
):
branch, turn, tick = self._get_time(branch_el, turn_el, kf_el)
self.keyframe_insert(branch, turn, tick)
universal_kf: UniversalKeyframe = {}
triggers_kf: dict[RuleName, list[TriggerFuncName]] = {}
prereqs_kf: dict[RuleName, list[PrereqFuncName]] = {}
actions_kf: dict[RuleName, list[ActionFuncName]] = {}
neighborhoods_kf: dict[RuleName, RuleNeighborhood] = {}
bigs_kf: dict[RuleName, RuleBig] = {}
rule_kf: RuleKeyframe = {
"triggers": triggers_kf,
"prereqs": prereqs_kf,
"actions": actions_kf,
"neighborhood": neighborhoods_kf,
"big": bigs_kf,
}
rulebook_kf: dict[
RulebookName, tuple[list[RuleName], RulebookPriority]
] = {}
graph_val_kf: GraphValKeyframe = {}
node_val_kf: GraphNodeValKeyframe = {}
edge_val_kf: GraphEdgeValKeyframe = {}
for subel in kf_el:
if subel.tag == "universal":
for univel in subel:
k = literal_eval(univel.get("key"))
v = self._element_to_value(univel[0])
universal_kf[k] = v
elif subel.tag == "rule":
rule = RuleName(subel.get("name"))
if rule is None:
raise TypeError("Rules need names")
if "big" in subel.keys():
bigs_kf[rule] = RuleBig(subel.get("big") in {"T", "true"})
if "neighborhood" in subel.keys():
neighborhoods_kf[rule] = int(subel.get("neighborhood"))
else:
neighborhoods_kf[rule] = None
for funcl_el in subel:
name = FuncName(funcl_el.get("name"))
if not isinstance(name, str):
raise TypeError("Function name must be str", name)
if funcl_el.tag == "trigger":
if rule in triggers_kf:
triggers_kf[rule].append(TriggerFuncName(name))
else:
triggers_kf[rule] = [TriggerFuncName(name)]
elif funcl_el.tag == "prereq":
if rule in prereqs_kf:
prereqs_kf[rule].append(PrereqFuncName(name))
else:
prereqs_kf[rule] = [PrereqFuncName(name)]
elif funcl_el.tag == "action":
if rule in actions_kf:
actions_kf[rule].append(ActionFuncName(name))
else:
actions_kf[rule] = [ActionFuncName(name)]
else:
raise ValueError("Unknown rule tag", funcl_el.tag)
elif subel.tag == "rulebook":
name = subel.get("name")
if name is None:
raise TypeError("rulebook tag missing name")
name = literal_eval(name)
if not isinstance(name, Key):
raise TypeError("Rulebook name must be Key", name)
name = RulebookName(name)
prio = subel.get("priority")
if prio is None:
raise TypeError("rulebook tag missing priority")
prio = RulebookPriority(float(prio))
rules: list[RuleName] = []
for rule_el in subel:
if rule_el.tag != "rule":
raise ValueError("Expected a rule tag", rule_el.tag)
rules.append(RuleName(rule_el.get("name")))
rulebook_kf[name] = (rules, prio)
elif subel.tag == "character":
name = subel.get("name")
if name is None:
raise TypeError("character tag missing name")
name = literal_eval(name)
if not isinstance(name, Key):
raise TypeError("character names must be Key", name)
char_name = CharName(name)
if isinstance(self.engine, EngineFacade):
# Only needed for deserializing later entities.
# Real Engines don't require this, because they have the
# keyframe to work with.
self.engine.add_character(char_name)
graph_vals = graph_val_kf[char_name] = {}
for k in (
"character-rulebook",
"unit-rulebook",
"character-thing-rulebook",
"character-place-rulebook",
"character-portal-rulebook",
):
if k in subel.keys():
graph_vals[k.replace("-", "_")] = literal_eval(
subel.get(k)
)
node_vals = node_val_kf[char_name] = {}
edge_vals = edge_val_kf[char_name] = {}
for key_el in subel:
if key_el.tag == "dict-item":
key = literal_eval(key_el.get("key"))
graph_vals[key] = self._element_to_value(key_el[0])
elif key_el.tag == "node":
name = literal_eval(key_el.get("name"))
if isinstance(self.engine, EngineFacade):
self.engine.character[char_name].add_node(name)
if name in node_vals:
val = node_vals[name]
else:
val = node_vals[name] = {}
if "rulebook" in key_el.keys():
val["rulebook"] = literal_eval(
key_el.get("rulebook")
)
for item_el in key_el:
val[literal_eval(item_el.get("key"))] = (
self._element_to_value(item_el[0])
)
elif key_el.tag == "edge":
orig = literal_eval(key_el.get("orig"))
dest = literal_eval(key_el.get("dest"))
if isinstance(self.engine, EngineFacade):
self.engine.character[char_name].add_edge(
orig, dest
)
if orig not in edge_vals:
edge_vals[orig] = {dest: {}}
if dest not in edge_vals[orig]:
edge_vals[orig][dest] = {}
val = edge_vals[orig][dest]
if "rulebook" in key_el.keys():
val["rulebook"] = literal_eval(
key_el.get("rulebook")
)
for item_el in key_el:
val[literal_eval(item_el.get("key"))] = (
self._element_to_value(item_el[0])
)
elif key_el.tag == "units":
graph_vals["units"] = {}
for unit_graph_el in key_el:
unit_graph_name = literal_eval(
unit_graph_el.get("character")
)
unit_graph_nodes_d = graph_vals["units"][
unit_graph_name
] = {}
for unit_node_el in unit_graph_el:
unit_graph_nodes_d[
literal_eval(unit_node_el.get("node"))
] = True
else:
raise ValueError(
"Don't know how to deal with tag", key_el.tag
)
else:
raise ValueError("Don't know how to deal with tag", subel.tag)
self.keyframe_insert(branch, turn, tick)
self.keyframe_extension_insert(
branch, turn, tick, universal_kf, rule_kf, rulebook_kf
)
for graph in (
graph_val_kf.keys() | node_val_kf.keys() | edge_val_kf.keys()
):
self.keyframe_graph_insert(
graph,
branch,
turn,
tick,
node_val_kf.get(graph, {}),
edge_val_kf.get(graph, {}),
graph_val_kf.get(graph, {}),
)
def _universal_rec(
self, branch_el: Element, turn_el: Element, el: Element
):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
key = UniversalKey(literal_eval(el.get("key")))
value = self._element_to_value(el[0])
self.universal_set(key, branch, turn, tick, value)
def _get_plans(
self, el: Element, branch: Branch, turn: Turn, tick: Tick
) -> None:
if "plans" in el.keys():
plan_id: Plan
for plan_id in map(int, el.get("plans").split(",")):
if plan_id in self._plan_times:
self._plan_times[plan_id].add((branch, turn, tick))
else:
self._plan_times[plan_id] = {(branch, turn, tick)}
def _rule_func_list(
self,
what: Literal["triggers", "prereqs", "actions"],
branch_el: Element,
turn_el: Element,
el: Element,
):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
rule = RuleName(el.get("rule"))
funcs: list[RuleFuncName] = [func_el.get("name") for func_el in el]
self._memorize_rule(what, rule, branch, turn, tick, funcs)
def _memorize_rule(
self,
what: Literal["triggers", "prereqs", "actions", "neighborhood", "big"],
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
datum: list[TriggerFuncName]
| list[PrereqFuncName]
| list[ActionFuncName]
| RuleNeighborhood
| RuleBig,
):
if what == "triggers":
d = self._known_triggers
elif what == "prereqs":
d = self._known_prereqs
elif what == "actions":
d = self._known_actions
elif what == "neighborhood":
d = self._known_neighborhoods
elif what == "big":
d = self._known_big
else:
raise ValueError(what)
d[rule][branch].store_at(turn, tick, datum)
_rule_triggers_rec = partialmethod(_rule_func_list, "triggers")
_rule_prereqs_rec = partialmethod(_rule_func_list, "prereqs")
_rule_actions_rec = partialmethod(_rule_func_list, "actions")
def _rule_neighborhood_rec(
self, branch_el: Element, turn_el: Element, el: Element
):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
rule = RuleName(el.get("rule"))
neighborhood = el.get("neighbors")
if neighborhood is not None:
neighborhood = int(neighborhood)
self._memorize_rule(
"neighborhood",
rule,
branch,
turn,
tick,
neighborhood,
)
def _rule_big_rec(self, branch_el: Element, turn_el: Element, el: Element):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
big = RuleBig(el.get("big") in {"T", "true"})
rule = RuleName(el.get("rule"))
self._memorize_rule("big", rule, branch, turn, tick, big)
def _rulebook_rec(
self, branch_el: Element, turn_el: Element, el: Element
) -> None:
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
rbn = el.get("name")
try:
rulebook = RulebookName(eval(rbn))
except TypeError as ex:
raise TypeError("Invalid rulebook name", rbn, *ex.args) from ex
pri = el.get("priority")
try:
priority = RulebookPriority(float(pri))
except TypeError as ex:
raise TypeError("Invalid rulebook priority", pri, *ex.args) from ex
child_el: Element
rules: list[RuleName] = []
for child_el in el:
rule = RuleName(child_el.get("name"))
if not isinstance(rule, str):
raise TypeError("Invalid rule name", rule)
rules.append(rule)
self._rulebooks2set.append(
(branch, turn, tick, rulebook, rules, priority)
)
def _graph_rec(self, branch_el: Element, turn_el: Element, el: Element):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
graph = CharName(literal_eval(el.get("character")))
typ_str_ = el.get("type")
if typ_str_ is None:
raise TypeError("Missing graph type", el)
if hasattr(GraphTypeStr, "evaluate_value"):
literal = GraphTypeStr.evaluate_value()
elif hasattr(GraphTypeStr, "__value__"):
literal = GraphTypeStr.__value__
else:
literal = GraphTypeStr
if typ_str_ not in get_args(literal):
raise TypeError("Unknown graph type", typ_str_)
typ_str: GraphTypeStr = typ_str_
self.graphs_insert(graph, branch, turn, tick, typ_str)
def _graph_val_rec(
self, branch_el: Element, turn_el: Element, el: Element
):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
graph = CharName(literal_eval(el.get("character")))
key = Stat(literal_eval(el.get("key")))
value = self._element_to_value(el[0])
self.graph_val_set(graph, key, branch, turn, tick, value)
def _node_rec(self, branch_el: Element, turn_el: Element, el: Element):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
char = CharName(literal_eval(el.get("character")))
node = NodeName(literal_eval(el.get("name")))
ex = el.get("exists") in {"T", "true"}
self.exist_node(char, node, branch, turn, tick, ex)
def _node_val_rec(self, branch_el: Element, turn_el: Element, el: Element):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
char = CharName(literal_eval(el.get("character")))
node = NodeName(literal_eval(el.get("node")))
key = Stat(literal_eval(el.get("key")))
val = self._element_to_value(el[0])
self.node_val_set(char, node, key, branch, turn, tick, val)
def _edge_rec(self, branch_el: Element, turn_el: Element, el: Element):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
char = CharName(literal_eval(el.get("character")))
orig = NodeName(literal_eval(el.get("orig")))
dest = NodeName(literal_eval(el.get("dest")))
ex = el.get("exists") in {"T", "true"}
self.exist_edge(char, orig, dest, branch, turn, tick, ex)
def _edge_val_rec(self, branch_el: Element, turn_el: Element, el: Element):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
char = CharName(literal_eval(el.get("character")))
orig = NodeName(literal_eval(el.get("orig")))
dest = NodeName(literal_eval(el.get("dest")))
key = Stat(literal_eval(el.get("key")))
val = self._element_to_value(el[0])
self.edge_val_set(char, orig, dest, key, branch, turn, tick, val)
def _location_rec(self, branch_el: Element, turn_el: Element, el: Element):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
char = CharName(literal_eval(el.get("character")))
thing = NodeName(literal_eval(el.get("thing")))
location = NodeName(literal_eval(el.get("location")))
self.set_thing_loc(char, thing, branch, turn, tick, location)
def _unit_rec(self, branch_el: Element, turn_el: Element, el: Element):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
char = CharName(literal_eval(el.get("character-graph")))
graph = CharName(literal_eval(el.get("unit-graph")))
node = NodeName(literal_eval(el.get("unit-node")))
self.unit_set(
char,
graph,
node,
branch,
turn,
tick,
el.get("is-unit", "false") in {"T", "true"},
)
def _some_character_rulebook(
self, branch_el: Element, turn_el: Element, rbtyp: str, el: Element
):
meth = getattr(self, f"set_{rbtyp}")
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
char = CharName(literal_eval(el.get("character")))
rb = RulebookName(literal_eval(el.get("rulebook")))
meth(char, branch, turn, tick, rb)
def _character_rulebook_rec(
self, branch_el: Element, turn_el: Element, el: Element
):
self._some_character_rulebook(
branch_el, turn_el, "character_rulebook", el
)
def _unit_rulebook_rec(
self, branch_el: Element, turn_el: Element, el: Element
):
self._some_character_rulebook(branch_el, turn_el, "unit_rulebook", el)
def _character_thing_rulebook_rec(
self, branch_el: Element, turn_el: Element, el: Element
):
self._some_character_rulebook(
branch_el, turn_el, "character_thing_rulebook", el
)
def _character_place_rulebook_rec(
self, branch_el: Element, turn_el: Element, el: Element
):
self._some_character_rulebook(
branch_el, turn_el, "character_place_rulebook", el
)
def _character_portal_rulebook_rec(
self, branch_el: Element, turn_el: Element, el: Element
):
self._some_character_rulebook(
branch_el, turn_el, "character_portal_rulebook", el
)
def _node_rulebook_rec(
self, branch_el: Element, turn_el: Element, el: Element
):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
char = CharName(literal_eval(el.get("character")))
node = NodeName(literal_eval(el.get("node")))
rb = RulebookName(literal_eval(el.get("rulebook")))
self.set_node_rulebook(char, node, branch, turn, tick, rb)
def _portal_rulebook_rec(
self, branch_el: Element, turn_el: Element, el: Element
):
branch, turn, tick = self._get_time(branch_el, turn_el, el)
self._get_plans(el, branch, turn, tick)
char = CharName(literal_eval(el.get("character")))
orig = NodeName(literal_eval(el.get("orig")))
dest = NodeName(literal_eval(el.get("dest")))
rb = RulebookName(literal_eval(el.get("rulebook")))
self.set_portal_rulebook(char, orig, dest, branch, turn, tick, rb)
@classmethod
def _iter_descendants(
cls,
branch_descendants: dict[Branch, list[Branch]],
branch: Branch = "trunk",
stop=lambda obj: False,
key=None,
):
branch_descendants[branch].sort(key=key)
for desc in branch_descendants[branch]:
yield desc
if stop(desc):
continue
if desc in branch_descendants:
yield from cls._iter_descendants(branch_descendants, desc)
def _create_rule_from_etree(
self, rule: RuleName, branch: Branch, turn: Turn, tick: Tick
):
kwargs = {}
for mapping, kwarg in [
(self._known_triggers, "triggers"),
(self._known_prereqs, "prereqs"),
(self._known_actions, "actions"),
(self._known_neighborhoods, "neighborhood"),
(self._known_big, "big"),
]:
if (
rule in mapping
and branch in mapping[rule]
and turn in mapping[rule][branch]
and tick in mapping[rule][branch][turn]
):
kwargs[kwarg] = mapping[rule][branch][turn].pop(tick)
self.create_rule(rule, branch, turn, tick, **kwargs)
def _rule(
self, branch_el: Element, turn_el: Element, rule_el: Element
) -> None:
branch = Branch(branch_el.get("name"))
turn = Turn(int(turn_el.get("number")))
tick = Tick(int(rule_el.get("end-tick")))
character = CharName(literal_eval(rule_el.get("character")))
rulebook = RulebookName(literal_eval(rule_el.get("rulebook")))
rule = RuleName(rule_el.get("name"))
if not isinstance(rule, str):
raise TypeError("Invalid rule name", rule)
match rule_el.get("type"):
case "character":
self.handled_character_rule(
character,
rulebook,
rule,
branch,
turn,
tick,
)
case "unit":
graph = CharName(literal_eval(rule_el.get("graph")))
unit = NodeName(literal_eval(rule_el.get("unit")))
self.handled_unit_rule(
character,
rulebook,
rule,
graph,
unit,
branch,
turn,
tick,
)
case "character-thing":
thing = NodeName(literal_eval(rule_el.get("thing")))
self.handled_character_thing_rule(
character,
rulebook,
rule,
thing,
branch,
turn,
tick,
)
case "character-place":
place = NodeName(literal_eval(rule_el.get("place")))
self.handled_character_place_rule(
character,
rulebook,
rule,
place,
branch,
turn,
tick,
)
case "character-portal":
orig = NodeName(literal_eval(rule_el.get("origin")))
dest = NodeName(literal_eval(rule_el.get("destination")))
self.handled_character_portal_rule(
character,
rulebook,
rule,
orig,
dest,
branch,
turn,
tick,
)
case "node":
node = NodeName(literal_eval(rule_el.get("node")))
self.handled_node_rule(
character,
node,
rulebook,
rule,
branch,
turn,
tick,
)
case "portal":
orig = NodeName(literal_eval(rule_el.get("origin")))
dest = NodeName(literal_eval(rule_el.get("destination")))
self.handled_portal_rule(
character,
orig,
dest,
rulebook,
rule,
branch,
turn,
tick,
)
@cached_property
def _element_dispatch_table(self):
return {
"keyframe": self._keyframe_rec,
"universal": self._universal_rec,
"rule-triggers": self._rule_triggers_rec,
"rule-prereqs": self._rule_prereqs_rec,
"rule-actions": self._rule_actions_rec,
"rule-neighborhood": self._rule_neighborhood_rec,
"rule-big": self._rule_big_rec,
"rulebook": self._rulebook_rec,
"graph": self._graph_rec,
"graph-val": self._graph_val_rec,
"node": self._node_rec,
"node-val": self._node_val_rec,
"edge": self._edge_rec,
"edge-val": self._edge_val_rec,
"location": self._location_rec,
"unit": self._unit_rec,
"character-rulebook": self._character_rulebook_rec,
"unit-rulebook": self._unit_rulebook_rec,
"character-thing-rulebook": self._character_thing_rulebook_rec,
"character-place-rulebook": self._character_place_rulebook_rec,
"character-portal-rulebook": self._character_portal_rulebook_rec,
"node-rulebook": self._node_rulebook_rec,
"portal-rulebook": self._portal_rulebook_rec,
}
def _dispatch_element(
self, branch_el: Element, turn_el: Element, elem: Element
) -> None:
self._element_dispatch_table[elem.tag](branch_el, turn_el, elem)
def load_etree(
self,
tree: ElementTree,
) -> None:
root = tree.getroot()
branch_descendants: dict[Branch, list[Branch]] = {Branch("trunk"): []}
branch_starts: dict[Branch, tuple[Turn, Tick]] = {}
if "_lisien_schema_version" in self.eternal:
if self.eternal["_lisien_schema_version"] != int(
root.get("db-schema-version")
):
raise RuntimeError("Incompatible database versions")
else:
self.eternal["_lisien_schema_version"] = int(
root.get("db-schema-version")
)
if "xml-schema-version" in root.keys():
ver = int(root.get("xml-schema-version"))
if ver > XML_SCHEMA_VERSION:
raise RuntimeError("Incompatible XML schema version", ver)
self.eternal["trunk"] = root.get("trunk")
self.eternal["branch"] = root.get("branch")
self.eternal["turn"] = int(root.get("turn"))
self.eternal["tick"] = int(root.get("tick"))
for el in root:
if el.tag == "language":
continue
if el.tag == "playtree":
for branch_el in el:
parent: Branch | None = branch_el.get("parent")
branch = Branch(branch_el.get("name"))
if parent is not None:
if parent in branch_descendants:
branch_descendants[parent].append(branch)
else:
branch_descendants[parent] = [branch]
start_turn = Turn(int(branch_el.get("start-turn")))
start_tick = Tick(int(branch_el.get("start-tick")))
branch_starts[branch] = (start_turn, start_tick)
end_turn = Turn(int(branch_el.get("end-turn")))
end_tick = Tick(int(branch_el.get("end-tick")))
self.set_branch(
branch,
parent,
start_turn,
start_tick,
end_turn,
end_tick,
)
if "last-turn-completed" in branch_el.keys():
last_completed_turn = Turn(
int(branch_el.get("last-turn-completed"))
)
self.complete_turn(branch, last_completed_turn, False)
for turn_el in branch_el:
turn = Turn(int(turn_el.get("number")))
end_tick = Tick(int(turn_el.get("end-tick")))
plan_end_tick = Tick(int(turn_el.get("plan-end-tick")))
self.set_turn(branch, turn, end_tick, plan_end_tick)
for elem in turn_el:
if elem.tag == "rule":
self._rule(branch_el, turn_el, elem)
for element in elem:
self._dispatch_element(
branch_el, turn_el, element
)
else:
self._dispatch_element(
branch_el, turn_el, elem
)
known_rules = (
self._known_triggers.keys()
| self._known_prereqs.keys()
| self._known_actions.keys()
| self._known_neighborhoods.keys()
| self._known_big.keys()
)
trunk = Branch(el.get("trunk"))
rules_created = set(self.rules_dump())
for rule in known_rules:
for mapp in [
self._known_triggers,
self._known_prereqs,
self._known_actions,
self._known_neighborhoods,
self._known_big,
]:
if rule not in mapp:
continue
if rule not in rules_created:
# Iterate depth first down the timestream, but no
# deeper than when the rule is first set.
# The game may have a rule by the same name
# created in many branches independently.
for branch in (
trunk,
*self._iter_descendants(
branch_descendants,
trunk,
mapp[rule].__contains__,
branch_starts.get,
),
):
turn, tick = mapp[rule][branch].start_time()
self._create_rule_from_etree(
rule, branch, turn, tick
)
rules_created.add(rule)
for mapp, setter in [
(
self._known_triggers,
self.set_rule_triggers,
),
(self._known_prereqs, self.set_rule_prereqs),
(self._known_actions, self.set_rule_actions),
(
self._known_neighborhoods,
self.set_rule_neighborhood,
),
(self._known_big, self.set_rule_big),
]:
for rule, branches in mapp.items():
for branch, assignments in branches.items():
for turn, tick in assignments.iter_times():
# Turn and tick are guaranteed to be in
# chronological order here, because that's what
# an AssignmentTimeDict does.
datum = assignments.retrieve_exact(turn, tick)
setter(rule, branch, turn, tick, datum)
for plan, times in self._plan_times.items():
for branch, turn, tick in times:
self.plans_insert(plan, branch, turn, tick)
else:
k = literal_eval(el.get("key"))
v = self._element_to_value(el[0])
self.eternal[k] = v
self.commit()
[docs]
def load_xml(self, xml_or_file_path: str | os.PathLike | IO[str | bytes]):
"""Restore data from an XML export
Supports a string with the XML in it, a path to an XML file, or
a file object.
"""
self.load_etree(parse(xml_or_file_path))
_T = TypeVar("_T")
[docs]
@define(getstate_setstate=False, eq=False)
class PythonDatabaseConnector(AbstractDatabaseConnector):
"""Database connector that holds all data in memory
You'll have to write it to disk yourself. Use the ``write_xml`` method
for that.
This does not start any threads, unlike the connectors that really
connect to databases, making it an appropriate choice if running in
an environment that lacks threading, such as WASI.
"""
engine: AbstractEngine
_old_data: dict | None = field(default=None)
is_python: ClassVar = True
db_type: ClassVar = "python"
@_old_data.validator
def _load_old_data(self, _, old_data):
if old_data is None:
return
self.load_everything(old_data)
def is_empty(self) -> bool:
for att in dir(self):
if att.startswith("__"):
continue
try:
val = getattr(self, att)
except AttributeError:
continue
if isinstance(val, dict) or isinstance(val, set):
if val:
print(f"{att} is nonempty")
return False
return True
@cached_property
def _bookmarks(self) -> dict[Key, Time]:
return {}
def _plan_ticks_insert_rec(
self, plan: Plan, branch: Branch, turn: Turn, tick: Tick
):
b = self._plan_ticks[plan][branch]
if turn not in b or tick not in b[turn]:
b.insert_time(turn, tick)
@cached_property
def _keyframe_extensions(
self,
) -> dict[
Branch,
AssignmentTimeDict[
tuple[UniversalKeyframe, RuleKeyframe, RulebooksKeyframe]
],
]:
return defaultdict(AssignmentTimeDict)
@cached_property
def _keyframes(self) -> set[Time]:
return set()
@property
def _all_keyframe_times(self) -> set[Time]:
return self._keyframes.copy()
@cached_property
def _keyframes_graphs(
self,
) -> dict[
Branch,
AssignmentTimeDict[
dict[CharName, tuple[NodeKeyframe, EdgeKeyframe, StatDict]]
],
]:
return defaultdict(AssignmentTimeDict)
def _keyframes_graphs_insert_rec(
self,
branch: Branch,
turn: Turn,
tick: Tick,
graph: CharName,
nodes: NodeKeyframe,
edges: EdgeKeyframe,
graph_val: StatDict,
):
try:
self._keyframes_graphs[branch].retrieve_exact(turn, tick)[
graph
] = (
nodes,
edges,
graph_val,
)
except KeyError:
self._keyframes_graphs[branch].store_at(
turn, tick, {graph: (nodes, edges, graph_val)}
)
@cached_property
def _branches(self) -> dict[Branch, tuple[Branch, Turn, Tick, Turn, Tick]]:
return {}
@cached_property
def _global(self) -> list[tuple[EternalKey, Value]]:
return []
@cached_property
def eternal(self) -> GlobalKeyValueStore:
initial = {
EternalKey(Key(k)): Value(v)
for (k, v) in {
"branch": "trunk",
"turn": 0,
"tick": 0,
"language": "eng",
"trunk": "trunk",
"_lisien_schema_version": SCHEMA_VERSION,
}.items()
}
initial.update(self._global)
return GlobalKeyValueStore(self, initial)
@cached_property
def _turns(self) -> dict[tuple[Branch, Turn], tuple[Tick, Tick]]:
return {}
@cached_property
def _graphs(
self,
) -> dict[Branch, AssignmentTimeDict[dict[CharName, GraphTypeStr]]]:
return defaultdict(AssignmentTimeDict)
def _graphs_insert_rec(
self,
branch: Branch,
turn: Turn,
tick: Tick,
graph: CharName,
type: GraphTypeStr,
) -> None:
try:
d = self._graphs[branch].retrieve_exact(turn, tick)
except KeyError:
d = {}
d[graph] = type
self._graphs[branch].store_at(turn, tick, d)
@cached_property
def _graph_val(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[CharName, Stat, Value]]]:
return defaultdict(AssignmentTimeDict)
@cached_property
def _nodes(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[CharName, NodeName, bool]]]:
return defaultdict(AssignmentTimeDict)
@cached_property
def _node_val(
self,
) -> dict[
Branch, AssignmentTimeDict[tuple[CharName, NodeName, Stat, Value]]
]:
return defaultdict(AssignmentTimeDict)
@cached_property
def _edges(
self,
) -> dict[
Branch, AssignmentTimeDict[tuple[CharName, NodeName, NodeName, bool]]
]:
return defaultdict(AssignmentTimeDict)
@cached_property
def _edge_val(
self,
) -> dict[
Branch,
AssignmentTimeDict[tuple[CharName, NodeName, NodeName, Stat, Value]],
]:
return defaultdict(AssignmentTimeDict)
@cached_property
def _universals(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[UniversalKey, Value]]]:
return defaultdict(AssignmentTimeDict)
def _universals_insert_rec(
self,
branch: Branch,
turn: Turn,
tick: Tick,
key: UniversalKey,
value: Value,
) -> None:
self._universals[branch].store_at(turn, tick, (key, value))
def universal_get(
self, key: UniversalKey, branch: Branch, turn: Turn, tick: Tick
) -> Value:
if branch not in self._universals:
raise KeyError("Universal key not set in this branch", branch)
td = self._universals[branch]
if turn in td:
for tck, (k, v) in (
td[turn].past(tick, include_same_rev=True).items()
):
if k == key:
return v
for trn, tck in reversed(td.iter_times(time_to=(turn, tick))):
(k, v) = td.retrieve_exact(trn, tck)
if k == key:
return v
raise KeyError(
"Universal key not set at this time", key, branch, turn, tick
)
@cached_property
def _rules(self) -> set[RuleName]:
return set()
@cached_property
def _rulebooks(
self,
) -> dict[
Branch,
AssignmentTimeDict[
tuple[RulebookName, list[RuleName], RulebookPriority]
],
]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _rule_triggers(
self,
) -> dict[
Branch, AssignmentTimeDict[tuple[RuleName, list[TriggerFuncName]]]
]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _rule_neighborhood(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[RuleName, RuleNeighborhood]]]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _rule_prereqs(
self,
) -> dict[
Branch, AssignmentTimeDict[tuple[RuleName, list[PrereqFuncName]]]
]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _rule_actions(
self,
) -> dict[
Branch, AssignmentTimeDict[tuple[RuleName, list[ActionFuncName]]]
]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _rule_big(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[RuleName, RuleBig]]]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _character_rulebook(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[CharName, RulebookName]]]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _unit_rulebook(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[CharName, RulebookName]]]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _character_thing_rulebook(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[CharName, RulebookName]]]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _character_place_rulebook(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[CharName, RulebookName]]]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _character_portal_rulebook(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[CharName, RulebookName]]]:
return PickierDefaultDict(str, AssignmentTimeDict)
@cached_property
def _node_rules_handled(
self,
) -> dict[
tuple[Branch, Turn, CharName, NodeName, RulebookName, RuleName], Tick
]:
return {}
@cached_property
def _portal_rules_handled(
self,
) -> dict[
tuple[
Branch, Turn, CharName, NodeName, NodeName, RulebookName, RuleName
],
Tick,
]:
return {}
@cached_property
def _things(
self,
) -> dict[Branch, AssignmentTimeDict[tuple[CharName, NodeName, NodeName]]]:
return defaultdict(AssignmentTimeDict)
@cached_property
def _node_rulebook(
self,
) -> dict[
Branch, AssignmentTimeDict[tuple[CharName, NodeName, RulebookName]]
]:
return defaultdict(AssignmentTimeDict)
@cached_property
def _portal_rulebook(
self,
) -> dict[
Branch,
AssignmentTimeDict[tuple[CharName, NodeName, NodeName, RulebookName]],
]:
return defaultdict(AssignmentTimeDict)
@cached_property
def _units(
self,
) -> dict[
Branch, AssignmentTimeDict[tuple[CharName, CharName, NodeName, bool]]
]:
return defaultdict(AssignmentTimeDict)
@cached_property
def _character_rules_handled(
self,
) -> dict[tuple[Branch, Turn, CharName, RulebookName, RuleName], Tick]:
return {}
@cached_property
def _unit_rules_handled(
self,
) -> dict[
tuple[
Branch, Turn, CharName, CharName, NodeName, RulebookName, RuleName
],
Tick,
]:
return {}
@cached_property
def _character_thing_rules_handled(
self,
) -> dict[
tuple[Branch, Turn, CharName, RulebookName, RuleName, NodeName], Tick
]:
return {}
@cached_property
def _character_place_rules_handled(
self,
) -> dict[
tuple[Branch, Turn, CharName, NodeName, RulebookName, RuleName], Tick
]:
return {}
@cached_property
def _character_portal_rules_handled(
self,
) -> dict[
tuple[
Branch, Turn, CharName, NodeName, NodeName, RulebookName, RuleName
],
Tick,
]:
return {}
@cached_property
def _turns_completed(self) -> dict[Branch, Turn]:
return {}
_table_names = [
"_bookmarks",
"_global",
"_branches",
"_turns",
"_graphs",
"_keyframes",
"_keyframes_graphs",
"_keyframe_extensions",
"_graph_val",
"_nodes",
"_node_val",
"_edges",
"_edge_val",
"_plans",
"_plan_ticks",
"_universals",
"_rules",
"_rulebooks",
"_rule_triggers",
"_rule_neighborhood",
"_rule_prereqs",
"_rule_actions",
"_rule_big",
"_character_rulebook",
"_unit_rulebook",
"_character_thing_rulebook",
"_character_place_rulebook",
"_character_portal_rulebook",
"_node_rules_handled",
"_portal_rules_handled",
"_things",
"_node_rulebook",
"_portal_rulebook",
"_units",
"_character_rules_handled",
"_unit_rules_handled",
"_character_thing_rules_handled",
"_character_place_rules_handled",
"_character_portal_rules_handled",
"_turns_completed",
]
@cached_property
def _lock(self) -> Lock:
return Lock()
def pack(
self,
obj: Key
| KeyHint
| EternalKey
| UniversalKey
| Stat
| ValueHint
| Value,
) -> Value:
return Value(obj)
def unpack(self, b: Value) -> Value:
return b
def _load_window(
self,
ret: LoadedDict,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Turn | None = None,
tick_to: Tick | None = None,
) -> None:
if turn_to is None:
turn_to = float("inf")
if tick_to is None:
tick_to = float("inf")
universals: list[UniversalRowType] = ret.setdefault("universals", [])
rulebooks: list[RulebookRowType] = ret.setdefault("rulebooks", [])
rule_triggers: list[TriggerRowType] = ret.setdefault(
"rule_triggers", []
)
rule_prereqs: list[PrereqRowType] = ret.setdefault("rule_prereqs", [])
rule_actions: list[ActionRowType] = ret.setdefault("rule_actions", [])
rule_neighborhood: list[RuleNeighborhoodRowType] = ret.setdefault(
"rule_neighborhood", []
)
rule_big: list[RuleBigRowType] = ret.setdefault("rule_big", [])
character_rules_handled: list[CharacterRulesHandledRowType] = (
ret.setdefault("character_rules_handled", [])
)
unit_rules_handled: list[UnitRulesHandledRowType] = ret.setdefault(
"unit_rules_handled", []
)
character_thing_rules_handled: list[NodeRulesHandledRowType] = (
ret.setdefault("character_thing_rules_handled", [])
)
character_place_rules_handled: list[NodeRulesHandledRowType] = (
ret.setdefault("character_place_rules_handled", [])
)
character_portal_rules_handled: list[PortalRulesHandledRowType] = (
ret.setdefault("character_portal_rules_handled", [])
)
node_rules_handled: list[NodeRulesHandledRowType] = ret.setdefault(
"node_rules_handled", []
)
portal_rules_handled: list[PortalRulesHandledRowType] = ret.setdefault(
"portal_rules_handled", []
)
graphs: list[GraphRowType] = ret.setdefault("graphs", [])
rbs = self._rulebooks
if branch in rbs:
rbb = rbs[branch]
for turn, tick in list(
rbb.iter_times((turn_from, tick_from), (turn_to, tick_to))
):
rulebook, rules, prio = rbb.retrieve_exact(turn, tick)
rulebooks.append(
(branch, turn, tick, rulebook, rules.copy(), prio)
)
if branch in self._graphs:
turns = self._graphs[branch]
for turn, tick in list(
turns.iter_times((turn_from, tick_from), (turn_to, tick_to))
):
d = turns.retrieve_exact(turn, tick)
graphs.extend(
(branch, turn, tick, graph, typ)
for (graph, typ) in d.items()
)
if branch in self._universals:
turns = self._universals[branch]
for turn, tick in list(
turns.iter_times((turn_from, tick_from), (turn_to, tick_to))
):
k, v = turns.retrieve_exact(turn, tick)
universals.append((branch, turn, tick, k, v))
for uncharacter_l, my_table in [
(rule_triggers, self._rule_triggers),
(rule_prereqs, self._rule_prereqs),
(rule_actions, self._rule_actions),
(rule_neighborhood, self._rule_neighborhood),
(rule_big, self._rule_big),
]:
if branch in my_table:
for turn, tick in list(
my_table[branch].iter_times(
(turn_from, tick_from), (turn_to, tick_to)
)
):
rule, the_datum = my_table[branch].retrieve_exact(
turn, tick
)
if isinstance(the_datum, list):
the_datum = the_datum.copy()
uncharacter_l.append((branch, turn, tick, rule, the_datum))
my_table: dict[Branch, AssignmentTimeDict]
for char_d_key, my_table in [
("nodes", self._nodes),
("node_val", self._node_val),
("edges", self._edges),
("edge_val", self._edge_val),
("graph_val", self._graph_val),
("things", self._things),
("units", self._units),
("character_rulebook", self._character_rulebook),
("unit_rulebook", self._unit_rulebook),
("character_thing_rulebook", self._character_thing_rulebook),
("character_place_rulebook", self._character_place_rulebook),
("character_portal_rulebook", self._character_portal_rulebook),
("node_rulebook", self._node_rulebook),
("portal_rulebook", self._portal_rulebook),
]:
if branch not in my_table:
continue
assignments = my_table[branch]
for r, t in list(
assignments.iter_times(
(turn_from, tick_from), (turn_to, tick_to)
)
):
row: AssignmentRowType = assignments.retrieve_exact(r, t)
g: CharName = row[0]
the_list: AssignmentRowListType = ret[g][char_d_key]
the_list.append((branch, r, t, *row))
for handled_l, my_table in [
(node_rules_handled, self._node_rules_handled),
(portal_rules_handled, self._portal_rules_handled),
(character_rules_handled, self._character_rules_handled),
(unit_rules_handled, self._unit_rules_handled),
(
character_thing_rules_handled,
self._character_thing_rules_handled,
),
(
character_place_rules_handled,
self._character_place_rules_handled,
),
(
character_portal_rules_handled,
self._character_portal_rules_handled,
),
]:
for rec in sort_set(
{(k[0], k[1], v, *k[2:]) for (k, v) in my_table.items()}
):
b, turn, tick = rec[:3]
if b != branch or not (
(turn_from, tick_from)
<= (turn, tick)
<= (turn_to, tick_to)
):
continue
datum = (b, turn, *rec[3:], tick)
handled_l.append(datum)
def _load_windows_into(
self, ret: LoadedDict, windows: list[TimeWindow]
) -> None:
for branch, turn_from, tick_from, turn_to, tick_to in windows:
self._load_window(
ret, branch, turn_from, tick_from, turn_to, tick_to
)
def del_bookmark(self, key: Key) -> None:
self._bookmarks2set.cull(lambda k, _: k == key)
if key in self._bookmarks:
del self._bookmarks[key]
def echo(self, *args):
if len(args) == 0:
return
elif len(args) == 1:
return args[0]
return args
def call(self, query_name: str, *args, **kwargs):
raise TypeError("Not a real database, so can't call it")
def call_silent(self, query_name: str, *args, **kwargs):
raise TypeError("Not a real database, so can't call it")
def call_many(self, query_name: str, args: list) -> None:
raise TypeError("Not a real database, so can't call it")
def call_many_silent(self, query_name: str, args: list) -> None:
raise TypeError("Not a real database, so can't call it")
def insert_many(self, table_name: str, args: list[dict]) -> None:
tab_serializer = Batch.serializers[table_name]
key_len = getattr(
self, Batch.cached_properties[table_name].attrname
).key_len
if key_len < 1:
key_len = ...
tab_spec = inspect.getfullargspec(tab_serializer)
tab = getattr(self, "_" + table_name)
if isinstance(tab, list):
tab.extend(
tuple(d[arg] for arg in tab_spec.args[1:]) for d in args
)
elif isinstance(tab, set):
if len(tab_spec.args) == 2: # self, and one actual column name
the_arg = tab_spec.args[-1]
tab.update(d[the_arg] for d in args)
else:
tab.update(
tuple(d[arg] for arg in tab_spec.args[1:]) for d in args
)
elif isinstance(tab, dict):
if mth := getattr(self, "_" + table_name + "_insert_rec", None):
for rec in args:
mth(**rec)
elif key_len is ...:
raise TypeError("dict table without key_len")
elif key_len == 1:
key_name = tab_spec.args[1]
if len(tab_spec.args) == 3:
val_name = tab_spec.args[-1]
for d in args:
key = d[key_name]
tab[key] = d[val_name]
else:
for d in args:
key = d[key_name]
tab[key] = tuple(d[k] for k in tab_spec.args[2:])
elif tab_spec.args[1:4] == ["branch", "turn", "tick"]:
for d in args:
record = tuple(d[k] for k in tab_spec.args[4:])
tab[d["branch"]].store_at(d["turn"], d["tick"], record)
elif (
key_len == len(tab_spec.args) - 2
): # the self argument, and the value
for d in args:
key = tuple(d[k] for k in tab_spec.args[1:-1])
tab[key] = d[tab_spec.args[-1]]
else:
for d in args:
key = tuple(d[k] for k in tab_spec.args[1 : key_len + 1])
tab[key] = tuple(
d[k] for k in tab_spec.args[key_len + 1 :]
)
else:
raise TypeError("Don't know how to insert here", tab)
insert_many_silent = insert_many
def delete_many_silent(self, table_name: str, args: list[dict]) -> None:
cached: cached_property = Batch.cached_properties[table_name]
the_batch: Batch = getattr(self, cached.attrname)
tab_serializer = Batch.serializers[table_name]
tab_spec = inspect.getfullargspec(tab_serializer)
tab = getattr(self, "_" + table_name)
if the_batch.key_len >= 1:
key_len = the_batch.key_len
key_args = tab_spec.args[1 : the_batch.key_len]
else:
key_args = tab_spec.args[1:]
key_len = len(key_args)
keys2del = set(tuple(d[arg] for arg in key_args) for d in args)
if isinstance(tab, list):
setattr(
self,
"_" + table_name,
list(
filterfalse(
lambda t: t[:key_len] in keys2del,
tab,
)
),
)
elif isinstance(tab, dict):
for key in keys2del & tab.keys():
del tab[key]
elif isinstance(tab, set):
tab.difference_update(keys2del)
else:
raise TypeError("Don't know how to delete from this table", tab)
def get_keyframe_extensions(
self, branch: Branch, turn: Turn, tick: Tick
) -> tuple[UniversalKeyframe, RuleKeyframe, RulebooksKeyframe]:
try:
return self._keyframe_extensions[branch].retrieve_exact(turn, tick)
except KeyError:
raise KeyframeError("No keyframe at that time", branch, turn, tick)
def keyframes_dump(self) -> Iterator[Time]:
with self._lock:
yield from sorted(self._keyframes)
def delete_keyframe(self, branch: Branch, turn: Turn, tick: Tick) -> None:
with self._lock:
self._keyframes.remove((branch, turn, tick))
del self._keyframe_extensions[branch][turn][tick]
def keyframes_graphs(
self,
) -> Iterator[tuple[CharName, Branch, Turn, Tick]]:
with self._lock:
for b, r, t, g in sort_set(self._keyframes):
yield g, b, r, t
def have_branch(self, branch: Branch) -> bool:
return branch in self._branches
def branches_dump(
self,
) -> Iterator[BranchRowType]:
with self._lock:
for branch in sort_set(self._branches.keys()):
parent, r0, t0, r1, t1 = self._branches[branch]
yield branch, parent, r0, t0, r1, t1
def global_get(self, key: EternalKey) -> Value:
return self.eternal[key]
def global_dump(self) -> Iterator[tuple[Key, Value]]:
with self._lock:
yield from self.eternal.items()
def get_branch(self) -> Branch:
b = self.eternal[EternalKey(Key("branch"))]
assert isinstance(b, str)
return Branch(b)
def get_turn(self) -> Turn:
r = self.eternal[EternalKey(Key("turn"))]
assert isinstance(r, int)
return Turn(r)
def get_tick(self) -> Tick:
t = self.eternal[EternalKey(Key("tick"))]
assert isinstance(t, int)
return Tick(t)
def turns_dump(self) -> Iterator[TurnRowType]:
with self._lock:
for (branch, turn), (
end_tick,
plan_end_tick,
) in sorted(self._turns.items()):
yield branch, turn, end_tick, plan_end_tick
def graph_val_dump(self) -> Iterator[GraphValRowType]:
with self._lock:
gv = self._graph_val
for branch in sort_set(gv.keys()):
for turn, tick in gv[branch].iter_times():
graph, key, value = gv[branch].retrieve_exact(turn, tick)
yield branch, turn, tick, graph, key, value
def graph_val_del_time(
self, branch: Branch, turn: Turn, tick: Tick
) -> None:
super().graph_val_del_time(branch, turn, tick)
try:
del self._graph_val[branch][turn][tick]
except KeyError:
pass
def edges_del_time(self, branch: Branch, turn: Turn, tick: Tick) -> None:
super().edges_del_time(branch, turn, tick)
try:
del self._edges[branch][turn][tick]
except KeyError:
pass
def graphs_types(
self,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Optional[Turn] = None,
tick_to: Optional[Tick] = None,
) -> Iterator[tuple[CharName, Branch, Turn, Tick, GraphTypeStr]]:
if (turn_to is None) ^ (tick_to is None):
raise TypeError(
"Need both or neither of 'turn_to' and 'tick_to'",
turn_to,
tick_to,
)
if branch not in self._graphs:
return
with self._lock:
time_from = (turn_from, tick_from)
time_to = (
None if None in (turn_to, tick_to) else (turn_to, tick_to)
)
for turn, tick in self._graphs[branch].iter_times(
time_from, time_to
):
for g, v in self._graphs[branch][turn][tick].items():
yield g, branch, turn, tick, v
def _chron_dump(
self,
table: dict[Branch, AssignmentTimeDict[_T]],
) -> Iterator[tuple[Time, _T]]:
with self._lock:
for branch in sort_set(table.keys()):
turns = table[branch]
for turn, tick in turns.iter_times():
row = turns.retrieve_exact(turn, tick)
yield (branch, turn, tick), row
def graphs_dump(
self,
) -> Iterator[GraphRowType]:
for (branch, turn, tick), d in self._chron_dump(self._graphs):
for graph in sort_set(d.keys()):
yield branch, turn, tick, graph, d[graph]
def nodes_del_time(self, branch: Branch, turn: Turn, tick: Tick) -> None:
super().nodes_del_time(branch, turn, tick)
with self._lock:
try:
del self._nodes[branch][turn][tick]
except KeyError:
pass
def nodes_dump(self) -> Iterator[NodeRowType]:
for (branch, turn, tick), (char, node, ex) in self._chron_dump(
self._nodes
):
yield branch, turn, tick, char, node, ex
def node_val_dump(self) -> Iterator[NodeValRowType]:
for (branch, turn, tick), (char, node, key, val) in self._chron_dump(
self._node_val
):
yield branch, turn, tick, char, node, key, val
def node_val_del_time(
self, branch: Branch, turn: Turn, tick: Tick
) -> None:
super().node_val_del_time(branch, turn, tick)
with self._lock:
try:
del self._node_val[branch][turn][tick]
except KeyError:
pass
def edges_dump(self) -> Iterator[EdgeRowType]:
for (branch, turn, tick), (char, orig, dest, ex) in self._chron_dump(
self._edges
):
yield branch, turn, tick, char, orig, dest, ex
def edge_val_dump(self) -> Iterator[EdgeValRowType]:
for (branch, turn, tick), (
char,
orig,
dest,
key,
val,
) in self._chron_dump(self._edge_val):
yield branch, turn, tick, char, orig, dest, key, val
def edge_val_del_time(
self, branch: Branch, turn: Turn, tick: Tick
) -> None:
super().edge_val_del_time(branch, turn, tick)
with self._lock:
try:
del self._edge_val[branch][turn][tick]
except KeyError:
pass
def plan_ticks_dump(self) -> Iterator[tuple[Plan, Branch, Turn, Tick]]:
with self._lock:
plan_ticks = self._plan_ticks
for plan in sorted(self._plan_ticks.keys()):
branches = plan_ticks[plan]
for branch in sorted(branches.keys()):
turns = branches[branch]
for turn in sorted(turns):
for tick in turns[turn]:
yield plan, branch, turn, tick
def commit(self):
self.flush()
def close(self):
self.flush()
def truncate_all(self) -> None:
for table in Batch.cached_properties:
getattr(self, "_" + table).clear()
def get_all_keyframe_graphs(
self, branch: Branch, turn: Turn, tick: Tick
) -> Iterator[tuple[CharName, NodeKeyframe, EdgeKeyframe, StatDict]]:
with self._lock:
try:
kf = self._keyframes_graphs[branch].retrieve_exact(turn, tick)
except KeyError:
return
for g, (nkf, ekf, gvkf) in kf.items():
yield (
g,
{node: stats.copy() for (node, stats) in nkf.items()},
{
orig: {
dest: stats.copy()
for (dest, stats) in dests.items()
}
for (orig, dests) in ekf.items()
},
gvkf.copy(),
)
def keyframes_graphs_dump(
self,
) -> Iterator[KeyframeGraphRowType]:
kfg = self._keyframes_graphs
with self._lock:
for branch in sort_set(kfg.keys()):
kfgb = kfg[branch]
for turn, tick in kfgb.iter_times():
d = kfgb.retrieve_exact(turn, tick)
for g in sort_set(d.keys()):
nkf, ekf, gkf = d[g]
yield branch, turn, tick, g, nkf, ekf, gkf
def keyframe_extensions_dump(
self,
) -> Iterator[KeyframeExtensionRowType]:
for (branch, turn, tick), (ukf, rkf, rbkf) in self._chron_dump(
self._keyframe_extensions
):
yield branch, turn, tick, ukf, rkf, rbkf
def universals_dump(
self,
) -> Iterator[UniversalRowType]:
univ = self._universals
with self._lock:
for branch in sort_set(univ.keys()):
univb = univ[branch]
for turn, tick in univb.iter_times():
k, v = univb.retrieve_exact(turn, tick)
yield branch, turn, tick, k, v
def rulebooks_dump(
self,
) -> Iterator[RulebookRowType]:
with self._lock:
for branch in sorted(self._rulebooks.keys()):
for turn, tick in self._rulebooks[branch].iter_times():
rb, rs, prio = self._rulebooks[branch].retrieve_exact(
turn, tick
)
yield branch, turn, tick, rb, rs.copy(), prio
def rules_dump(self) -> Iterator[RuleName]:
with self._lock:
yield from sort_set(self._rules)
def _rule_something_dump(
self,
tab: dict[
Branch,
AssignmentTimeDict[
tuple[
RuleName, list[RuleFuncName] | RuleNeighborhood | RuleBig
]
],
],
):
with self._lock:
for branch in sort_set(tab.keys()):
turns = tab[branch]
for turn, tick in turns.iter_times():
rule, funcs = turns.retrieve_exact(turn, tick)
if isinstance(funcs, list):
funcs = funcs.copy()
yield branch, turn, tick, rule, funcs
def rule_triggers_dump(
self,
) -> Iterator[TriggerRowType]:
return self._rule_something_dump(self._rule_triggers)
def rule_prereqs_dump(
self,
) -> Iterator[PrereqRowType]:
return self._rule_something_dump(self._rule_prereqs)
def rule_actions_dump(
self,
) -> Iterator[ActionRowType]:
return self._rule_something_dump(self._rule_actions)
def rule_neighborhood_dump(
self,
) -> Iterator[RuleNeighborhoodRowType]:
return self._rule_something_dump(self._rule_neighborhood)
def rule_big_dump(
self,
) -> Iterator[RuleBigRowType]:
return self._rule_something_dump(self._rule_big)
def node_rulebook_dump(
self,
) -> Iterator[NodeRulebookRowType]:
with self._lock:
for branch in sort_set(self._node_rulebook.keys()):
nrb = self._node_rulebook[branch]
for turn, tick in nrb.iter_times():
char, node, rb = nrb.retrieve_exact(turn, tick)
yield branch, turn, tick, char, node, rb
def portal_rulebook_dump(
self,
) -> Iterator[PortalRulebookRowType]:
with self._lock:
for branch in sort_set(self._portal_rulebook.keys()):
porb = self._portal_rulebook[branch]
for turn, tick in porb.iter_times():
char, orig, dest, rb = porb.retrieve_exact(turn, tick)
yield branch, turn, tick, char, orig, dest, rb
def _character_something_rulebook_dump(
self,
what: dict[Branch, AssignmentTimeDict[tuple[CharName, RulebookName]]],
) -> Iterator[CharRulebookRowType]:
with self._lock:
for branch in sort_set(what.keys()):
that = what[branch]
for turn, tick in that.iter_times():
graph, rb = that.retrieve_exact(turn, tick)
yield branch, turn, tick, graph, rb
def character_rulebook_dump(
self,
) -> Iterator[CharRulebookRowType]:
return self._character_something_rulebook_dump(
self._character_rulebook
)
def unit_rulebook_dump(
self,
) -> Iterator[CharRulebookRowType]:
return self._character_something_rulebook_dump(self._unit_rulebook)
def character_thing_rulebook_dump(
self,
) -> Iterator[CharRulebookRowType]:
return self._character_something_rulebook_dump(
self._character_thing_rulebook
)
def character_place_rulebook_dump(
self,
) -> Iterator[CharRulebookRowType]:
return self._character_something_rulebook_dump(
self._character_place_rulebook
)
def character_portal_rulebook_dump(
self,
) -> Iterator[CharRulebookRowType]:
return self._character_something_rulebook_dump(
self._character_portal_rulebook
)
def character_rules_handled_dump(
self,
) -> Iterator[CharacterRulesHandledRowType]:
with self._lock:
crh = self._character_rules_handled
for b, r, g, rb, rn in sort_set(crh.keys()):
t = crh[b, r, g, rb, rn]
yield b, r, g, rb, rn, t
def unit_rules_handled_dump(
self,
) -> Iterator[UnitRulesHandledRowType]:
with self._lock:
urh = self._unit_rules_handled
for b, r, char, graph, node, rb, rn in sort_set(urh.keys()):
t = urh[b, r, char, graph, node, rb, rn]
yield b, r, char, graph, node, rb, rn, t
def character_thing_rules_handled_dump(
self,
) -> Iterator[NodeRulesHandledRowType]:
with self._lock:
ctrh = self._character_thing_rules_handled
for b, r, g, rb, rn, n in sort_set(ctrh.keys()):
t = ctrh[b, r, g, rb, rn, n]
yield b, r, g, n, rb, rn, t
def character_place_rules_handled_dump(
self,
) -> Iterator[NodeRulesHandledRowType]:
with self._lock:
cprh = self._character_place_rules_handled
for b, r, g, n, rb, rn in sort_set(cprh.keys()):
t = cprh[b, r, g, n, rb, rn]
yield b, r, g, n, rb, rn, t
def character_portal_rules_handled_dump(
self,
) -> Iterator[PortalRulesHandledRowType]:
with self._lock:
cporh = self._character_portal_rules_handled
for b, r, g, o, d, rb, rn in sort_set(cporh.keys()):
t = cporh[b, r, g, o, d, rb, rn]
yield b, r, g, o, d, rb, rn, t
def node_rules_handled_dump(
self,
) -> Iterator[NodeRulesHandledRowType]:
with self._lock:
nrh = self._node_rules_handled
for b, r, g, n, rb, rn in sort_set(nrh.keys()):
t = nrh[b, r, g, n, rb, rn]
yield b, r, g, n, rb, rn, t
def portal_rules_handled_dump(
self,
) -> Iterator[PortalRulesHandledRowType]:
with self._lock:
porh = self._portal_rules_handled
for b, r, g, o, d, rb, rn in sort_set(porh.keys()):
t = porh[b, r, g, o, d, rb, rn]
yield b, r, g, o, d, rb, rn, t
def things_dump(
self,
) -> Iterator[ThingRowType]:
things = self._things
with self._lock:
for branch in sort_set(things.keys()):
thb = things[branch]
for turn, tick in thb.iter_times():
char, thing, loc = thb.retrieve_exact(turn, tick)
yield branch, turn, tick, char, thing, loc
def units_dump(
self,
) -> Iterator[UnitRowType]:
units = self._units
with self._lock:
for branch in sort_set(units.keys()):
ub = units[branch]
for turn, tick in ub.iter_times():
char, graph, node, is_unit = ub.retrieve_exact(turn, tick)
yield branch, turn, tick, char, graph, node, is_unit
def count_all_table(self, tbl: str) -> int:
return len(getattr(self, "_" + tbl))
def rules_insert(self, rule: RuleName):
with self._lock:
self._rules.add(rule)
def things_del_time(self, branch: Branch, turn: Turn, tick: Tick) -> None:
super().things_del_time(branch, turn, tick)
with self._lock:
try:
del self._things[branch][turn][tick]
except KeyError:
pass
def turns_completed_dump(self) -> Iterator[tuple[Branch, Turn]]:
with self._lock:
yield from sorted(self._turns_completed.items())
def bookmarks_dump(self) -> Iterator[tuple[Key, Time]]:
with self._lock:
yield from sort_set(self._bookmarks.items())
[docs]
@define
class NullDatabaseConnector(AbstractDatabaseConnector):
"""Database connector that does nothing, connects to no database
This will never return any data, either. If you want it to hold data
you put into it, instead use :class:`PythonDatabaseConnector`.
"""
engine: AbstractEngine
def pack(
self,
obj: Key
| KeyHint
| EternalKey
| UniversalKey
| Stat
| ValueHint
| Value,
) -> Value:
return obj
def unpack(self, b: Value) -> Value:
return b
def echo(self, *args):
if len(args) == 1:
return args[0]
return args
def call(self, query_name: str, *args, **kwargs):
pass
def call_silent(self, query_name: str, *args, **kwargs):
pass
def call_many(self, query_name: str, args: list) -> None:
pass
def call_many_silent(self, query_name: str, args: list) -> None:
pass
def delete_many_silent(self, table_name: str, args: list[dict]) -> None:
pass
def insert_many(self, table_name: str, args: list[dict]) -> None:
pass
def insert_many_silent(self, table_name: str, args: list[dict]) -> None:
pass
def rules_insert(self, rule: RuleName):
pass
def get_keyframe_extensions(
self, branch: Branch, turn: Turn, tick: Tick
) -> tuple[UniversalKeyframe, RuleKeyframe, RulebooksKeyframe]:
return {}, {}, {}
def keyframes_dump(self) -> Iterator[tuple[Branch, Turn, Tick]]:
return iter(())
def new_graph(
self, graph: CharName, branch: Branch, turn: Turn, tick: Tick, typ: str
) -> None:
pass
def get_all_keyframe_graphs(
self, branch: Branch, turn: Turn, tick: Tick
) -> Iterator[
tuple[CharName, NodeKeyframe, EdgeKeyframe, GraphValKeyframe]
]:
return iter(())
def keyframes_graphs_dump(
self,
) -> Iterator[
tuple[
CharName,
Branch,
Turn,
Tick,
NodeKeyframe,
EdgeKeyframe,
GraphValKeyframe,
]
]:
return iter(())
def keyframe_extensions_dump(
self,
) -> Iterator[
tuple[
Branch,
Turn,
Tick,
UniversalKeyframe,
RuleKeyframe,
RulebooksKeyframe,
]
]:
return iter(())
def graphs_insert(
self, graph: CharName, branch: Branch, turn: Turn, tick: Tick, typ: str
) -> None:
pass
def keyframes_graphs(
self,
) -> Iterator[tuple[CharName, Branch, Turn, Tick]]:
return iter(())
def delete_keyframe(self, branch: Branch, turn: Turn, tick: Tick) -> None:
pass
def have_branch(self, branch: Branch) -> bool:
pass
def branches_dump(
self,
) -> Iterator[BranchRowType]:
return iter(())
def global_get(self, key: Key) -> Any:
return self.eternal[key]
def global_dump(self) -> Iterator[tuple[Key, Any]]:
return iter(self.eternal.items())
def get_branch(self) -> Branch:
return self.eternal["branch"]
def get_turn(self) -> Turn:
return self.eternal["turn"]
def get_tick(self) -> Tick:
return self.eternal["tick"]
def global_set(self, key: Key, value: Any):
self.eternal[key] = value
def global_del(self, key: Key):
del self.eternal[key]
def set_branch(
self,
branch: Branch,
parent: Branch,
parent_turn: Turn,
parent_tick: Tick,
end_turn: Turn,
end_tick: Tick,
):
pass
def set_turn(
self, branch: Branch, turn: Turn, end_tick: Tick, plan_end_tick: Tick
):
pass
def turns_dump(self):
return iter(())
def graph_val_dump(self) -> Iterator[GraphValRowType]:
return iter(())
def graph_val_set(
self,
graph: CharName,
key: Key,
branch: Branch,
turn: Turn,
tick: Tick,
val: Any,
):
pass
def graph_val_del_time(self, branch: Branch, turn: Turn, tick: Tick):
pass
def graphs_types(
self,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Optional[Turn] = None,
tick_to: Optional[Tick] = None,
) -> Iterator[GraphRowType]:
return iter(())
def graphs_dump(
self,
) -> Iterator[GraphRowType]:
return iter(())
def exist_node(
self,
graph: CharName,
node: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
extant: bool,
):
pass
def nodes_del_time(self, branch: Branch, turn: Turn, tick: Tick):
pass
def nodes_dump(self) -> Iterator[NodeRowType]:
return iter(())
def node_val_dump(self) -> Iterator[NodeValRowType]:
return iter(())
def node_val_del_time(self, branch: Branch, turn: Turn, tick: Tick):
pass
def edges_dump(self) -> Iterator[EdgeRowType]:
return iter(())
def edges_del_time(self, branch: Branch, turn: Turn, tick: Tick):
pass
def edge_val_dump(self) -> Iterator[EdgeValRowType]:
return iter(())
def edge_val_del_time(self, branch: Branch, turn: Turn, tick: Tick):
pass
def plan_ticks_dump(self) -> Iterator:
return iter(())
def flush(self):
pass
def commit(self):
pass
def close(self):
pass
def _init_db(self):
pass
def truncate_all(self):
pass
def universals_dump(self) -> Iterator[tuple[Key, Branch, Turn, Tick, Any]]:
return iter(())
def rulebooks_dump(
self,
) -> Iterator[
tuple[RulebookName, Branch, Turn, Tick, tuple[list[RuleName], float]]
]:
return iter(())
def rules_dump(self) -> Iterator[str]:
return iter(())
def rule_triggers_dump(
self,
) -> Iterator[tuple[RuleName, Branch, Turn, Tick, list[TriggerFuncName]]]:
return iter(())
def rule_prereqs_dump(
self,
) -> Iterator[tuple[RuleName, Branch, Turn, Tick, list[PrereqFuncName]]]:
return iter(())
def rule_actions_dump(
self,
) -> Iterator[tuple[RuleName, Branch, Turn, Tick, list[ActionFuncName]]]:
return iter(())
def rule_neighborhood_dump(
self,
) -> Iterator[tuple[RuleName, Branch, Turn, Tick, RuleNeighborhood]]:
return iter(())
def rule_big_dump(
self,
) -> Iterator[tuple[RuleName, Branch, Turn, Tick, RuleBig]]:
return iter(())
def node_rulebook_dump(
self,
) -> Iterator[tuple[CharName, NodeName, Branch, Turn, Tick, RulebookName]]:
return iter(())
def portal_rulebook_dump(
self,
) -> Iterator[
tuple[CharName, NodeName, NodeName, Branch, Turn, Tick, RulebookName]
]:
return iter(())
def character_rulebook_dump(
self,
) -> Iterator[tuple[CharName, Branch, Turn, Tick, RulebookName]]:
return iter(())
def unit_rulebook_dump(
self,
) -> Iterator[tuple[CharName, Branch, Turn, Tick, RulebookName]]:
return iter(())
def character_thing_rulebook_dump(
self,
) -> Iterator[tuple[CharName, Branch, Turn, Tick, RulebookName]]:
return iter(())
def character_place_rulebook_dump(
self,
) -> Iterator[tuple[CharName, Branch, Turn, Tick, RulebookName]]:
return iter(())
def character_portal_rulebook_dump(
self,
) -> Iterator[tuple[CharName, Branch, Turn, Tick, RulebookName]]:
return iter(())
def character_rules_handled_dump(
self,
) -> Iterator[tuple[CharName, RulebookName, RuleName, Branch, Turn, Tick]]:
return iter(())
def unit_rules_handled_dump(
self,
) -> Iterator[
tuple[
CharName,
CharName,
NodeName,
RulebookName,
RuleName,
Branch,
Turn,
Tick,
]
]:
return iter(())
def character_thing_rules_handled_dump(
self,
) -> Iterator[
tuple[CharName, NodeName, RulebookName, RuleName, Branch, Turn, Tick]
]:
return iter(())
def character_place_rules_handled_dump(
self,
) -> Iterator[
tuple[CharName, NodeName, RulebookName, RuleName, Branch, Turn, Tick]
]:
return iter(())
def character_portal_rules_handled_dump(
self,
) -> Iterator[
tuple[
CharName,
NodeName,
NodeName,
RulebookName,
RuleName,
Branch,
Turn,
Tick,
]
]:
return iter(())
def node_rules_handled_dump(
self,
) -> Iterator[
tuple[CharName, NodeName, RulebookName, RuleName, Branch, Turn, Tick]
]:
return iter(())
def portal_rules_handled_dump(
self,
) -> Iterator[
tuple[
CharName,
NodeName,
NodeName,
RulebookName,
RuleName,
Branch,
Turn,
Tick,
]
]:
return iter(())
def things_dump(
self,
) -> Iterator[tuple[CharName, NodeName, Branch, Turn, Tick, NodeName]]:
return iter(())
def units_dump(
self,
) -> Iterator[
tuple[CharName, CharName, NodeName, Branch, Turn, Tick, bool]
]:
return iter(())
def universal_get(
self, key: UniversalKey, branch: Branch, turn: Turn, tick: Tick
) -> Value:
raise KeyError("NOTHING")
def universal_set(
self,
key: UniversalKey,
branch: Branch,
turn: Turn,
tick: Tick,
val: Any,
):
pass
def count_all_table(self, tbl: str) -> int:
return 0
def create_rule(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
triggers: Iterable[TriggerFuncName] = (),
prereqs: Iterable[PrereqFuncName] = (),
actions: Iterable[ActionFuncName] = (),
neighborhood: RuleNeighborhood = None,
big: RuleBig = False,
) -> bool:
return False
def set_rule_triggers(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
flist: list[TriggerFuncName],
):
pass
def set_rule_prereqs(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
flist: list[PrereqFuncName],
):
pass
def set_rule_actions(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
flist: list[ActionFuncName],
):
pass
def set_rule_neighborhood(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
neighborhood: RuleNeighborhood,
):
pass
def set_rule_big(
self,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
big: RuleBig,
) -> None:
pass
def set_rulebook(
self,
name: RulebookName,
branch: Branch,
turn: Turn,
tick: Tick,
rules: Optional[list[RuleName]] = None,
prio: RulebookPriority = 0.0,
):
pass
def set_character_rulebook(
self,
char: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
rb: RulebookName,
):
pass
def set_unit_rulebook(
self,
char: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
rb: RulebookName,
):
pass
def set_character_thing_rulebook(
self,
char: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
rb: RulebookName,
):
pass
def set_character_place_rulebook(
self,
char: CharName,
branch: Branch,
turn: Turn,
tick: Tick,
rb: RulebookName,
):
pass
def set_character_portal_rulebook(
self,
char: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
rb: RulebookName,
):
pass
def set_node_rulebook(
self,
character: CharName,
node: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
rulebook: RulebookName,
):
pass
def set_portal_rulebook(
self,
character: CharName,
orig: NodeName,
dest: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
rulebook: RulebookName,
):
pass
def handled_character_rule(
self,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
):
pass
def handled_unit_rule(
self,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
graph: CharName,
unit: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
):
pass
def handled_character_thing_rule(
self,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
thing: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
):
pass
def handled_character_place_rule(
self,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
place: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
):
pass
def handled_character_portal_rule(
self,
character: CharName,
rulebook: RulebookName,
rule: RuleName,
orig: NodeName,
dest: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
):
pass
def handled_node_rule(
self,
character: CharName,
node: NodeName,
rulebook: RulebookName,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
):
pass
def handled_portal_rule(
self,
character: CharName,
orig: NodeName,
dest: NodeName,
rulebook: RulebookName,
rule: RuleName,
branch: Branch,
turn: Turn,
tick: Tick,
):
pass
def set_thing_loc(
self,
character: CharName,
thing: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
loc: NodeName,
):
pass
def things_del_time(self, branch: Branch, turn: Turn, tick: Tick):
pass
def unit_set(
self,
character: CharName,
graph: CharName,
node: NodeName,
branch: Branch,
turn: Turn,
tick: Tick,
is_unit: bool,
):
pass
def rulebook_set(
self,
rulebook: RulebookName,
branch: Branch,
turn: Turn,
tick: Tick,
rules: list[RuleName],
):
pass
def turns_completed_dump(self) -> Iterator[tuple[Branch, Turn]]:
return iter(())
def complete_turn(
self, branch: Branch, turn: Turn, discard_rules: bool = False
):
pass
def _put_window_tick_to_end(
self, branch: Branch, turn_from: Turn, tick_from: Tick
):
pass
def _put_window_tick_to_tick(
self,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Turn,
tick_to: Tick,
):
pass
def _load_windows_into(self, ret: dict, windows: list[TimeWindow]) -> None:
pass
def _increc(self):
pass
def _get_one_window(
self,
ret,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Turn,
tick_to: Tick,
):
pass
def bookmarks_dump(self) -> Iterator[tuple[Key, Time]]:
return iter(())
def set_bookmark(
self, key: Key, branch: Branch, turn: Turn, tick: Tick
) -> None:
pass
def del_bookmark(self, key: Key) -> None:
pass
def load_windows(self, windows: list[TimeWindow]) -> dict:
return {}
def window_getter(
table: str,
f: Callable[[Branch, ...], None] | partial | None = None,
per_character: bool = False,
):
"""Decorator for functions that get a window of time from the output queue"""
if f is None:
return partial(window_getter, table, per_character=per_character)
if isinstance(f, partial):
argspec = inspect.getfullargspec(f.func)
else:
argspec = inspect.getfullargspec(f)
if per_character:
if "return" not in argspec.annotations:
raise TypeError("No character in return annotation", f)
ret_sig = argspec.annotations["return"]
if isinstance(ret_sig, str):
ret_sig = eval(ret_sig)
char_index = get_args(ret_sig).index(CharName)
if char_index is None:
raise TypeError(
"per_character window getter needs CharName in its return signature"
)
def get_a_window(
self,
ret: dict,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Turn | None,
tick_to: Tick | None,
) -> None:
if (got := self._outq.get()) != (
"begin",
table,
branch,
turn_from,
tick_from,
turn_to,
tick_to,
):
raise RuntimeError("Expected beginning of " + table, got)
self._outq.task_done()
while isinstance(got := self._outq.get(), list):
for rec in got:
if isinstance(rec, dict):
rec = tuple(rec[arg] for arg in argspec.args[1:])
else:
rec = (branch, *rec)
charn = rec[char_index]
try:
ret[charn][table].append(f(self, *rec))
except TypeError as ex:
raise TypeError(*ex.args, table, rec) from ex
self._outq.task_done()
if got != (
"end",
table,
branch,
turn_from,
tick_from,
turn_to,
tick_to,
):
raise RuntimeError("Expected end of " + table, got)
self._outq.task_done()
window_getter.tables[table] = get_a_window
return get_a_window
def get_a_window(
self,
ret: dict,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Turn | None,
tick_to: Tick | None,
) -> None:
if (got := self._outq.get()) != (
"begin",
table,
branch,
turn_from,
tick_from,
turn_to,
tick_to,
):
raise RuntimeError("Expected beginning of " + table, got)
self._outq.task_done()
while isinstance(got := self._outq.get(), list):
for rec in got:
if isinstance(rec, dict):
rec = tuple(rec[arg] for arg in argspec.args[1:])
else:
rec = (branch, *rec)
try:
ret[table].append(f(self, *rec))
except TypeError as ex:
raise TypeError(*ex.args, table, rec) from ex
self._outq.task_done()
if got != (
"end",
table,
branch,
turn_from,
tick_from,
turn_to,
tick_to,
):
raise RuntimeError("Expected end of " + table, got)
self._outq.task_done()
window_getter.tables[table] = get_a_window
return get_a_window
window_getter.tables = {}
[docs]
@define
class ThreadedDatabaseConnector(AbstractDatabaseConnector):
Looper: ClassVar[type[ConnectionLooper]]
def _make_thread(self):
return Thread(target=self._looper.run, name="rundb")
_t: Thread = field(
init=False, default=Factory(_make_thread, takes_self=True)
)
_initialized: bool = field(init=False, default=False)
def close(self) -> None:
self._inq.put("close")
try:
self._looper.existence_lock.acquire(timeout=10)
except TimeoutError as ex:
raise TimeoutError("Couldn't close looper", *ex.args) from ex
self._looper.existence_lock.release()
self._t.join()
def commit(self) -> None:
self.flush()
self._inq.put("commit")
got = self._outq.get()
self._outq.task_done()
if got != "committed":
raise RuntimeError("Failed commit", got)
@contextmanager
def mutex(self):
def consume_errors():
excs = []
unfinished_tasks = self._outq.unfinished_tasks
while not self._outq.empty():
got = self._outq.get()
if isinstance(got, Exception):
excs.append(got)
else:
excs.append(ValueError("Unconsumed output", got))
if excs:
if len(excs) == 1:
raise excs[-1]
raise ExceptionGroup(
f"{unfinished_tasks} unfinished tasks in output queue "
"before call_one",
excs,
)
else:
raise RuntimeError(
f"{self._outq.unfinished_tasks} unfinished tasks in output queue "
"before call_one"
)
if self._outq.unfinished_tasks != 0:
consume_errors()
with self._lock:
yield
if self._outq.unfinished_tasks != 0:
consume_errors()
@cached_property
def _looper(self) -> ConnectionLooper:
return self.Looper(self)
@cached_property
def _lock(self):
return self._looper.lock
@cached_property
def _inq(self) -> Queue:
return Queue()
@cached_property
def _outq(self) -> Queue:
return Queue()
@mutexed
def call(self, query_name: str, *args, **kwargs):
self._inq.put(("one", query_name, args, kwargs))
ret = self._outq.get()
self._outq.task_done()
if isinstance(ret, BaseException):
raise ret
return ret
def call_silent(self, query_name: str, *args, **kwargs):
self._inq.put(("silent", "one", query_name, args, kwargs))
@mutexed
def call_many(self, query_name: str, args: list) -> None:
self._inq.put(("many", query_name, args))
ret = self._outq.get()
self._outq.task_done()
if isinstance(ret, BaseException):
raise ret
return ret
def call_many_silent(self, query_name: str, args: list) -> None:
self._inq.put(("silent", "many", query_name, args))
def echo(self, string: str) -> str:
with self.mutex():
self._inq.put(("echo", string))
ret = self._outq.get()
self._outq.task_done()
return ret
def _put_window_tick_to_end(
self, branch: Branch, turn_from: Turn, tick_from: Tick
):
putkwargs = {
"branch": branch,
"turn_from": turn_from,
"tick_from": tick_from,
}
for infix in self._infixes2load:
self._inq.put(
(
"echo",
(
"begin",
infix,
branch,
turn_from,
tick_from,
None,
None,
),
{},
)
)
self._inq.put(("one", f"load_{infix}_tick_to_end", (), putkwargs))
self._inq.put(
(
"echo",
("end", infix, branch, turn_from, tick_from, None, None),
{},
)
)
def _put_window_tick_to_tick(
self,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Turn,
tick_to: Tick,
):
putkwargs = {
"branch": branch,
"turn_from": turn_from,
"tick_from": tick_from,
"turn_to": turn_to,
"tick_to": tick_to,
}
for infix in self._infixes2load:
self._inq.put(
(
"echo",
(
"begin",
infix,
branch,
turn_from,
tick_from,
turn_to,
tick_to,
),
{},
)
)
self._inq.put(("one", f"load_{infix}_tick_to_tick", (), putkwargs))
self._inq.put(
(
"echo",
(
"end",
infix,
branch,
turn_from,
tick_from,
turn_to,
tick_to,
),
{},
)
)
def _unpack_node_keyframe(self, node_kf_packed: bytes) -> NodeKeyframe:
node_kf = self.unpack(node_kf_packed)
if not isinstance(node_kf, dict):
raise TypeError("Invalid node keyframe", node_kf)
return {
NodeName(Key(k)): {Stat(kk): Value(vv) for (kk, vv) in v.items()}
for (k, v) in node_kf.items()
}
def _unpack_edge_keyframe(self, edge_kf_packed: bytes) -> EdgeKeyframe:
unpacked = self.unpack(edge_kf_packed)
if not isinstance(unpacked, dict):
raise TypeError("Invalid edge keyframe", unpacked)
try:
return {
NodeName(Key(orig)): {
NodeName(Key(dest)): {
Stat(Key(key)): Value(val)
for (key, val) in stats.items()
}
for (dest, stats) in dests.items()
}
for (orig, dests) in unpacked.items()
}
except TypeError as ex:
raise TypeError(*ex.args, unpacked) from ex
def _unpack_graph_val_keyframe(self, graph_val_packed: bytes) -> CharDict:
unpacked = self.unpack(graph_val_packed)
if not isinstance(unpacked, dict):
raise TypeError("Invalid graph stat keyframe", unpacked)
rulebooks = {
"character_rulebook",
"unit_rulebook",
"character_thing_rulebook",
"character_place_rulebook",
"character_portal_rulebook",
}
return {
k if k in rulebooks else Stat(Key(k)): RulebookName(v)
if k in rulebooks
else Value(v)
for (k, v) in unpacked.items()
}
def _unpack_universal_keyframe(
self, universal_packed: bytes
) -> UniversalKeyframe:
unpacked = self.unpack(universal_packed)
if not isinstance(unpacked, dict):
raise TypeError("Invalid universal keyframe", unpacked)
return {UniversalKey(Key(k)): Value(v) for (k, v) in unpacked.items()}
def _unpack_rules_keyframe(self, rule_packed: bytes) -> RuleKeyframe:
unpacked = self.unpack(rule_packed)
if not isinstance(unpacked, dict):
raise TypeError("Invalid rule keyframe", unpacked)
neighborhood_d: dict[RuleName, RuleNeighborhood] = unpacked[
"neighborhood"
]
if not isinstance(neighborhood_d, dict):
raise TypeError("Invalid neighborhood dict")
for k, v in neighborhood_d.items():
if not isinstance(k, str):
raise TypeError("Invalid rule name", k)
if not isinstance(v, (int, type(None))):
raise TypeError("Invalid neighborhood", v)
return {
"triggers": {
RuleName(rule): [TriggerFuncName(trig) for trig in trigs]
for (rule, trigs) in unpacked["triggers"].items()
},
"prereqs": {
RuleName(rule): [PrereqFuncName(preq) for preq in preqs]
for (rule, preqs) in unpacked["prereqs"].items()
},
"actions": {
RuleName(rule): [ActionFuncName(act) for act in acts]
for (rule, acts) in unpacked["actions"].items()
},
"neighborhood": neighborhood_d,
"big": {
RuleName(rule): RuleBig(big)
for (rule, big) in unpacked["big"].items()
},
}
def _unpack_rulebooks_keyframe(
self, rulebook_packed: bytes
) -> RulebooksKeyframe:
unpacked = self.unpack(rulebook_packed)
if not isinstance(unpacked, dict):
raise TypeError("Invalid rulebook keyframe")
return {
RulebookName(rb): (
[RuleName(ru) for ru in rules],
RulebookPriority(prio),
)
for rb, (rules, prio) in unpacked.items()
}
@mutexed
def _load_windows_into(self, ret: dict, windows: list[TimeWindow]) -> None:
assert "graphs" in ret
for branch, turn_from, tick_from, turn_to, tick_to in windows:
if turn_to is None:
self._put_window_tick_to_end(branch, turn_from, tick_from)
else:
self._put_window_tick_to_tick(
branch, turn_from, tick_from, turn_to, tick_to
)
self._inq.join()
for window in windows:
self._get_one_window(ret, *window)
def unpack_key(self, k: bytes) -> Key:
unpacked = self.unpack(k)
if not isinstance(unpacked, Key):
raise TypeError("Invalid key", unpacked)
return unpacked
def _get_one_window(
self,
ret,
branch: Branch,
turn_from: Turn,
tick_from: Tick,
turn_to: Turn,
tick_to: Tick,
):
self.debug(
f"_get_one_window({branch}, {turn_from}, {tick_from}, {turn_to}, {tick_to})"
)
for table in self._infixes2load:
prop = Batch.cached_properties[table]
batch = getattr(self, prop.attrname)
batch.window_getter(
self, ret, branch, turn_from, tick_from, turn_to, tick_to
)