Source code for ipyforcegraph.sources.dodo

# Copyright (c) 2023 ipyforcegraph contributors.
# Distributed under the terms of the Modified BSD License.
"""
A :class:`~ipyforcegraph.sources.dataframe.DataFrameSource` which inspects
a ``dodo.py`` and its `tasks <https://pydoit.org/tasks.html>`_.

.. note:

    Using this source requires installing `doit <pypi.org/project/doit>`_.
"""
import sys
from copy import deepcopy
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
from typing import Any, Dict, List
from uuid import uuid4

import pandas as P
import traitlets as T
from doit.cmd_base import ModuleTaskLoader, get_loader
from doit.cmd_list import List as ListCmd
from doit.dependency import Dependency, JsonDB, SqliteDB
from doit.task import Task

from .dataframe import DataFrameSource

TAnyDict = Dict[str, Any]
Tasks = List[Task]


[docs]class DodoSource(DataFrameSource): """A source that displays the files, tasks, and dependencies of a ``dodo.py``.""" graph_data: TAnyDict = T.Dict(help="an internal collection of observed Data").tag( sync=False ) project_root: Path = T.Union( [T.Unicode(), T.Instance(Path)], help="a path to a folder that contains a ``dodo.py``", ).tag(sync=False) backend: str = T.Unicode( "sqlite3", help="the backend for ``doit``'s dependency state" ).tag(sync=False) dep_file: str = T.Unicode( ".doit.db", help="the path to ``doit``'s ``dep_file``, relative to the ``project_root``", ).tag(sync=False) dodo_file: str = T.Unicode( "dodo.py", help="the path to a ``dodo.py``, relative to the ``project_root``", ).tag(sync=True) show_files: bool = T.Bool( True, help="create a node for each file, or collapse to dep groups" ).tag(sync=False) show_directories: bool = T.Bool( False, help="create nodes for directories, and links for containment" ).tag(sync=False) _deps: Dependency = T.Instance(Dependency, help="A doit dependency tracker").tag( sync=False ) @T.default("nodes") def _default_nodes(self) -> P.DataFrame: return P.DataFrame(self.graph_data["nodes"].values()) @T.default("links") def _default_links(self) -> P.DataFrame: return P.DataFrame(self.graph_data["links"].values()) @T.default("graph_data") def _default_graph_data(self) -> TAnyDict: return self.find_graph_data() @T.default("_deps") def _default_deps(self) -> Dependency: backends = {"sqlite3": SqliteDB, "json": JsonDB} return Dependency( backends[self.backend], str(self.project_root / self.dep_file) ) @T.validate("project_root") def _validate_project_root(self, proposal: Any) -> Path: project_root = Path(proposal.value).resolve() assert project_root.exists() return project_root @T.observe("show_directories", "show_files") def _on_features_change(self, *args: Any) -> None: self.refresh()
[docs] def refresh(self) -> None: """Refresh the nodes and links.""" graph_data = self.find_graph_data() nodes = P.DataFrame(graph_data["nodes"].values()) nodes.fillna("", inplace=True) links = P.DataFrame(graph_data["links"].values()) links.fillna("", inplace=True) with self.hold_sync(): self.nodes = nodes self.links = links
def _reload_tasks(self) -> Tasks: old_sys_path = [*sys.path] mod_name = f"""__dodo__{str(uuid4()).replace("-", "_")}""" dodo_module = None try: sys.path += [str(self.project_root)] spec = spec_from_file_location(mod_name, self.project_root / self.dodo_file) if spec: dodo_module = module_from_spec(spec) if dodo_module is None or spec.loader is None: # pragma: no cover return [] sys.modules[mod_name] = dodo_module spec.loader.exec_module(dodo_module) finally: sys.path = old_sys_path loader = get_loader({}, task_loader=ModuleTaskLoader(dodo_module.__dict__)) cmd = ListCmd(loader) tasks: Tasks = loader.load_tasks(cmd, []) return tasks
[docs] def find_graph_data(self) -> TAnyDict: """Find all of the nodes and links.""" tasks = self._reload_tasks() graph_data: TAnyDict = { "nodes": {}, "links": {}, "tasks": {t.name: t for t in tasks}, } for task in tasks: self.discover_one_task(task, graph_data) if not self.show_files: graph_data = self.group_files(graph_data) return graph_data
[docs] def discover_one_task(self, task: Task, graph_data: TAnyDict) -> None: """Update nodes and links from a single ``Task``.""" task_id = f"task:{task.name}" task_list = [*graph_data["tasks"].values()] node = { "id": task_id, "type": "task", "name": f"{task.name}", "doc": task.doc or "", "status": self._deps.get_status(task, task_list).status, "subtask_of": task.subtask_of, } graph_data["nodes"][task_id] = node for task_dep in task.task_dep: dep_task = graph_data["tasks"][task_dep] dep_status = self._deps.get_status(dep_task, task_list).status task_dep_id = f"task:{task_dep}" link_id = f"{task_id}--has_task_dep--{task_dep_id}" graph_data["links"][link_id] = { "source": task_id, "target": task_dep_id, "type": "has_task_dep", "id": link_id, "exists": dep_status, } for field in ["file_dep", "targets"]: for path in getattr(task, field): self.discover_one_file(path, field, task_id, graph_data)
[docs] def discover_one_file( self, path_name: str, field: str, task_id: str, graph_data: TAnyDict ) -> None: """Update nodes and links for a single file referenced by a ``Task``.""" path = Path(path_name).resolve() path_id = f"file:{path_name}" if path_id not in graph_data["nodes"]: is_in_project = False try: name = path.relative_to(self.project_root).as_posix() is_in_project = True except Exception: # pragma: no cover name = path_name graph_data["nodes"][path_id] = { "id": path_id, "type": "file", "name": name, "exists": path.exists(), } if is_in_project: self.discover_file_parents(path, path_id, graph_data) link_id = f"{task_id}--{field}--{path_id}" if field == "file_dep": source = path_id target = task_id elif field == "targets": source = task_id target = path_id graph_data["links"][link_id] = { "source": source, "target": target, "type": field, "id": link_id, "exists": path.exists(), }
[docs] def discover_file_parents( self, path: Path, path_id: str, graph_data: TAnyDict ) -> None: """Discover parent paths.""" if not (self.show_directories and self.show_files): return parent = path.parent while parent != self.project_root: parent_id = f"folder:{parent}" if parent_id not in graph_data["nodes"]: name = Path(parent).relative_to(self.project_root).as_posix() graph_data["nodes"][parent_id] = { "id": parent_id, "type": "directory", "name": name, "exists": path.exists(), } link_id = f"{parent_id}--contains--{path_id}" graph_data["links"][link_id] = { "source": parent_id, "target": path_id, "type": "contains", "id": link_id, } parent = parent.parent path_id = parent_id
[docs] def group_files(self, graph_data: TAnyDict) -> TAnyDict: """Collapse all ``task_dep`` and ``targets``.""" graph_data = deepcopy(graph_data) new_nodes: Dict[str, TAnyDict] = {"file_dep": {}, "targets": {}} new_links: Dict[str, TAnyDict] = { "file_dep": {}, "targets": {}, "file_dep_targets": {}, } remove_links = set() remove_nodes = set() field_keys = {"file_dep": ["source", "target"], "targets": ["target", "source"]} for field, keys in field_keys.items(): file_key, task_key = keys for link_id, link in graph_data["links"].items(): if link["type"] == field: task_id = link[task_key] task_node = graph_data["nodes"][task_id] new_node_id = f"{field}:{link[task_key]}" new_node = new_nodes[field].get(new_node_id) if new_node is None: new_node = new_nodes[field][new_node_id] = { "id": new_node_id, "name": f"""{field} of {task_node["name"]}""", "type": field, "paths": [], "exists": True, } new_link_id = f"{task_id}--{field}--" new_links[field][new_link_id] = { "id": new_link_id, file_key: new_node_id, task_key: task_id, "type": field, } linked_file = graph_data["nodes"][link[file_key]] new_node["paths"].append(linked_file["name"]) new_node["exists"] = new_node["exists"] and linked_file["exists"] remove_nodes |= {link[file_key]} remove_links |= {link_id} # make make new links between new nodes for dep_node_id, dep_node in new_nodes["file_dep"].items(): for target_node_id, target_node in new_nodes["targets"].items(): if set(dep_node["paths"]) & set(target_node["paths"]): new_link_id = f"{dep_node_id}--file_dep_targets--{target_node_id}" new_links["file_dep_targets"][new_link_id] = { "id": new_link_id, "source": dep_node_id, "target": target_node_id, "type": "file_dep_targets", } # update the graph data [graph_data["links"].pop(link_id, None) for link_id in remove_links] [graph_data["nodes"].pop(node_id, None) for node_id in remove_nodes] for field in [*field_keys, "file_dep_targets"]: graph_data["nodes"].update(**new_nodes.get(field, {})) graph_data["links"].update(**new_links.get(field, {})) return graph_data