Source code for lisien.collections

# This file is part of Lisien, a framework for life simulation games.
# Copyright (c) Zachary Spector, public@zacharyspector.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""Common classes for collections in lisien

Notably includes wrappers for mutable objects, allowing them to be stored in
the database. These simply store the new value.

Most of these are subclasses of :class:`blinker.Signal`, so you can listen
for changes using the ``connect(..)`` method.

"""

from __future__ import annotations

import importlib.util
import json
import os
import sys
from abc import ABC, abstractmethod
from ast import Expr, Module, parse
from collections import UserDict
from collections.abc import MutableMapping
from copy import deepcopy
from hashlib import blake2b
from inspect import getsource
from typing import TYPE_CHECKING

import astor
import networkx as nx
from blinker import Signal

from .types import CharName, Key
from .util import AbstractEngine, dedent_source, getatt, sort_set
from .wrap import wrapval

if TYPE_CHECKING:
	from .character import Character


# 0x241d is the group separator
# 0x241e is the record separator
# per Unicode 1.1
GROUP_SEP = chr(0x241D).encode()
REC_SEP = chr(0x241E).encode()


class AbstractLanguageDescriptor(Signal, ABC):
	@abstractmethod
	def _get_language(self, inst: StringStore) -> str:
		pass

	@abstractmethod
	def _set_language(self, inst: StringStore, val: str) -> None:
		pass

	def __get__(self, instance: StringStore, owner=None):
		return self._get_language(instance)

	def __set__(self, inst: StringStore, val: str):
		self._set_language(inst, val)
		self.send(inst, language=val)


class LanguageDescriptor(AbstractLanguageDescriptor):
	def _get_language(self, inst: StringStore) -> str:
		return inst._current_language

	def _set_language(self, inst, lang):
		if lang != inst._current_language:
			inst._switch_language(lang)
			if (
				not getattr(inst.engine, "_worker", False)
				and inst.engine.eternal["language"] != lang
			):
				inst.engine.eternal["language"] = lang
			inst._current_language = lang


class TamperEvidentDict(dict):
	tampered: bool

	def __init__(self, data=()):
		self.tampered = False
		super().__init__(data)

	def __setitem__(self, key, value):
		self.tampered = True
		super().__setitem__(key, value)

	def __delitem__(self, key):
		self.tampered = True
		super().__delitem__(key)


class ChangeTrackingDict(UserDict):
	def __init__(self, data=()):
		self.changed = {}
		super().__init__(data)

	def apply_changes(self):
		self.data.update(self.changed)
		self.changed.clear()

	def copy(self):
		ret = {}
		ret.update(self.data)
		ret.update(self.changed)
		return ret

	def clear(self):
		self.data.clear()
		self.changed.clear()

	def __contains__(self, item):
		return item in self.changed or item in self.data

	def __iter__(self):
		yield from self.changed
		yield from self.data

	def __len__(self):
		return len(self.changed) + len(self.data)

	def __getitem__(self, item):
		if item in self.changed:
			return self.changed[item]
		return self.data[item]

	def __setitem__(self, key, value):
		self.changed[key] = value

	def __delitem__(self, key):
		if key in self.changed:
			del self.changed[key]
			if key in self.data:
				del self.data[key]
		else:
			del self.data[key]


[docs] class StringStore(MutableMapping, Signal): language = LanguageDescriptor() def __init__( self, engine_or_string_dict: AbstractEngine | dict, prefix: str | None, lang="eng", ): super().__init__() if isinstance(engine_or_string_dict, dict): self._prefix = None self._current_language = lang if lang in engine_or_string_dict and isinstance( engine_or_string_dict[lang], dict ): self._languages = engine_or_string_dict else: self._languages = {lang: engine_or_string_dict} else: self.engine = engine_or_string_dict self._languages = {lang: TamperEvidentDict()} self._prefix = prefix self._current_language = lang self._switch_language(lang) def _switch_language(self, lang): """Write the current language to disk, and load the new one if available""" if self._prefix is None: if lang not in self._languages: self._languages[lang] = {} return try: with open(os.path.join(self._prefix, lang + ".json"), "r") as inf: self._languages[lang] = TamperEvidentDict(json.load(inf)) except FileNotFoundError: self._languages[lang] = TamperEvidentDict() assert self._current_language in self._languages if getattr(self._languages[self._current_language], "tampered", False): with open( os.path.join(self._prefix, self._current_language + ".json"), "w", ) as outf: json.dump( self._languages[self._current_language], outf, indent=4, sort_keys=True, ) self._languages[self._current_language].tampered = False def __iter__(self): return iter(self._languages[self._current_language]) def __len__(self): return len(self._languages[self._current_language]) def __getitem__(self, k): return self._languages[self._current_language][k] def __setitem__(self, k, v): """Set the value of a string for the current language.""" self._languages[self._current_language][k] = v self.send(self, key=k, val=v) def __delitem__(self, k): """Delete the string from the current language, and remove it from the cache. """ del self._languages[self._current_language][k] self.send(self, key=k, val=None) def lang_items(self, lang=None): """Yield pairs of (id, string) for the given language.""" if ( self._prefix is not None and lang is not None and self._current_language != lang ): with open(os.path.join(self._prefix, lang + ".json"), "r") as inf: self._languages[lang] = TamperEvidentDict(json.load(inf)) yield from self._languages[lang or self._current_language].items() def save(self, reimport=False): if self._prefix is None: return if not os.path.exists(self._prefix): os.mkdir(self._prefix) for lang, d in self._languages.items(): if not d.tampered: continue with open( os.path.join(self._prefix, lang + ".json"), "w", ) as outf: json.dump( self._languages[lang], outf, indent=4, sort_keys=True, ) d.tampered = False if reimport: with open( os.path.join(self._prefix, self._current_language + ".json"), "r", ) as inf: self._languages[self._current_language] = TamperEvidentDict( json.load(inf) ) def blake2b(self) -> bytes: the_hash = blake2b() for k, v in self.items(): the_hash.update(k.encode()) the_hash.update(GROUP_SEP) the_hash.update(v.encode()) the_hash.update(REC_SEP) return the_hash.digest()
[docs] class FunctionStore(Signal): """A module-like object that lets you alter its code and save your changes. Instantiate it with a path to a file that you want to keep the code in. Assign functions to its attributes, then call its ``save()`` method, and they'll be unparsed and written to the file. This is a ``Signal``, so you can pass a function to its ``connect`` method, and it will be called when a function is added, changed, or deleted. The keyword arguments will be ``attr``, the name of the function, and ``val``, the function itself. """ def __init__( self, filename: str | None, initial: dict = None, module: str = None ): if initial is None: initial = {} super().__init__() if filename is None: self._filename = None self._module = module self._ast = Module(body=[], type_ignores=[]) self._ast_idx = {} self._need_save = False self._locl = initial else: if not filename.endswith(".py"): raise ValueError( "FunctionStore can only work with pure Python source code" ) self._filename = os.path.abspath(os.path.realpath(filename)) try: self.reimport() except (FileNotFoundError, ModuleNotFoundError): self._module = module self._ast = Module(body=[], type_ignores=[]) self._ast_idx = {} self.save() self._need_save = False self._locl = {} for k, v in initial.items(): self[k] = v def __dir__(self): yield from self._locl yield from super().__dir__() def __getattr__(self, k): if k in self._locl: return self._locl[k] elif self._need_save: self.save() return getattr(self._module, k) elif self._module: return getattr(self._module, k) else: raise AttributeError("No attribute ", k) def __setattr__(self, k, v): if not callable(v): super().__setattr__(k, v) return self._set_source(k, getsource(v), func=v) def _set_source(self, k: str, source: str, func: callable = None): if func is None: holder = {} exec(source, holder) if k not in holder: raise NameError( "Function in source has a different name", k, source ) func = holder[k] outdented = dedent_source(source) expr = Expr(parse(outdented)) expr.value.body[0].name = k if k in self._ast_idx: self._ast.body[self._ast_idx[k]] = expr else: self._ast_idx[k] = len(self._ast.body) self._ast.body.append(expr) if self._filename is not None: self._need_save = True if isinstance(self._module, str): func.__module__ = self._module self._locl[k] = func self.send(self, attr=k, val=func) def __call__(self, v): if isinstance(self._module, str): v.__module__ = self._module elif hasattr(self._module, "__name__"): v.__module__ = self._module.__name__ setattr(self, v.__name__, v) return v def __delattr__(self, k): del self._locl[k] del self._ast.body[self._ast_idx[k]] del self._ast_idx[k] for name in list(self._ast_idx): if name > k: self._ast_idx[name] -= 1 if self._filename is not None: self._need_save = True self.send(self, attr=k, val=None) def save(self, reimport=True): if self._filename is None: return with open(self._filename, "w", encoding="utf-8") as outf: outf.write(astor.code_gen.to_source(self._ast, indent_with="\t")) self._need_save = False if reimport: self.reimport() def reimport(self): if self._filename is None: return path, filename = os.path.split(self._filename) modname = filename[:-3] if modname in sys.modules: del sys.modules[modname] modname = filename[:-3] spec = importlib.util.spec_from_file_location(modname, self._filename) self._module = importlib.util.module_from_spec(spec) sys.modules[modname] = self._module spec.loader.exec_module(self._module) self._ast = parse(self._module.__loader__.get_data(self._filename)) self._ast_idx = {} for i, node in enumerate(self._ast.body): if hasattr(node, "name"): self._ast_idx[node.name] = i elif hasattr(node, "__name__"): self._ast_idx[node.__name__] = i self.send(self, attr=None, val=None) def iterplain(self): for name, idx in self._ast_idx.items(): yield name, astor.to_source(self._ast.body[idx], indent_with="\t") def store_source(self, v, name=None): self._need_save = True outdented = dedent_source(v) mod = parse(outdented) expr = Expr(mod) if len(expr.value.body) != 1: raise ValueError("Tried to store more than one function") if name is None: name = expr.value.body[0].name else: expr.value.body[0].name = name if name in self._ast_idx: self._ast.body[self._ast_idx[name]] = expr else: self._ast_idx[name] = len(self._ast.body) self._ast.body.append(expr) locl = {} exec(compile(mod, self._filename or "", "exec"), {}, locl) self._locl.update(locl) self.send(self, attr=name, val=locl[name]) def get_source(self, name): if name == "truth": return "def truth(*args):\n\treturn True" return astor.dump_tree(self._ast.body[self._ast_idx[name]]) def blake2b(self) -> bytes: """Return the blake2b hash digest of the code stored here""" hashed = blake2b() todo = dict(self._ast_idx) stripped_ast = deepcopy(self._ast.body) astor.strip_tree(stripped_ast) for k in sort_set(todo.keys()): hashed.update(k.encode()) hashed.update(GROUP_SEP) hashed.update( astor.code_gen.to_source( stripped_ast[todo[k]], indent_with="\t" ).encode() ) hashed.update(REC_SEP) return hashed.digest() @staticmethod def truth(*args): return True
class UniversalMapping(MutableMapping, Signal): """Mapping for variables that are global but which I keep history for""" __slots__ = ["engine"] def __init__(self, engine): """Store the engine and initialize my private dictionary of listeners. """ super().__init__() self.engine = engine def __iter__(self): return self.engine._universal_cache.iter_keys(*self.engine._btt()) def __len__(self): return self.engine._universal_cache.count_keys(*self.engine._btt()) def __getitem__(self, k): """Get the current value of this key""" return wrapval( self, k, self._get_cache_now(k), ) def _get_cache_now(self, k): return self.engine._universal_cache.retrieve(k, *self.engine._btt()) def __setitem__(self, k, v): """Set k=v at the current branch and tick""" branch, turn, tick = self.engine._nbtt() self.engine._universal_cache.store(k, branch, turn, tick, v) self.engine.query.universal_set(k, branch, turn, tick, v) self.send(self, key=k, val=v) def _set_cache_now(self, k, v): self.engine._universal_cache.store(k, *self.engine._btt(), v) def __delitem__(self, k): """Unset this key for the present (branch, tick)""" branch, turn, tick = self.engine._nbtt() self.engine._universal_cache.store(k, branch, turn, tick, ...) self.engine.query.universal_del(k, branch, turn, tick) self.send(self, key=k, val=...) class CharacterMapping(MutableMapping, Signal): """A mapping by which to access :class:`Character` objects. If a character already exists, you can always get its name here to get the :class:`Character` object. Deleting an item here will delete the character from the world, even if there are still :class:`Character` objects referring to it; those won't do anything useful anymore. """ engine = getatt("orm") def __init__(self, orm): self.orm = orm Signal.__init__(self) def __iter__(self): branch, turn, tick = self.engine._btt() return self.engine._graph_cache.iter_keys(branch, turn, tick) def __len__(self): branch, turn, tick = self.engine._btt() return self.engine._graph_cache.count_keys(branch, turn, tick) def __contains__(self, item): branch, turn, tick = self.engine._btt() try: return ( self.engine._graph_cache.retrieve(item, branch, turn, tick) == "DiGraph" ) except KeyError: return False def __getitem__(self, name: Key | CharName) -> "Character": """Return the named character, if it's been created. Try to use the cache if possible. """ from .character import Character name = CharName(name) if name not in self: raise KeyError("No such character", name) cache = self.engine._graph_objs if name not in cache: cache[name] = Character( self.engine, name, init_rulebooks=name not in self ) ret = cache[name] if not isinstance(ret, Character): raise TypeError( "You put something weird in the Character cache", type(ret) ) return ret def __setitem__(self, name: CharName, value: dict | nx.Graph): """Make a new character by the given name, and initialize its data to the given value. """ self.engine._init_graph(name, "DiGraph", value) self.send(self, key=name, val=self.engine.character[name]) def __delitem__(self, name: CharName): self.engine.del_character(name) self.send(self, key=name, val=None) class CompositeDict(MutableMapping, Signal): """Combine two dictionaries into one""" def __init__(self, d1, d2): """Store dictionaries""" super().__init__() self.d1 = d1 self.d2 = d2 def __iter__(self): """Iterate over both dictionaries' keys""" for k in self.d1: yield k for k in self.d2: yield k def __len__(self): """Sum the lengths of both dictionaries""" return len(self.d1) + len(self.d2) def __contains__(self, item): return item in self.d1 or item in self.d2 def __getitem__(self, k): """Get an item from ``d1`` if possible, then ``d2``""" try: return self.d1[k] except KeyError: return self.d2[k] def __setitem__(self, key, value): self.d1[key] = value self.send(self, key=key, value=value) def __delitem__(self, key): deleted = False if key in self.d2: deleted = True del self.d2[key] if key in self.d1: deleted = True del self.d1[key] if not deleted: raise KeyError("{} is in neither of my wrapped dicts".format(key)) self.send(self, key=key, value=None) def patch(self, d): """Recursive update""" for k, v in d.items(): if k in self: self[k].update(v) else: self[k] = deepcopy(v)