Source code for bibble.io.writer

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# mypy: disable-error-code="attr-defined"
  6
  7# Imports:
  8from __future__ import annotations
  9
 10# ##-- stdlib imports
 11import datetime
 12import enum
 13import functools as ftz
 14import itertools as itz
 15import logging as logmod
 16import pathlib as pl
 17import re
 18import time
 19import types
 20import weakref
 21from copy import deepcopy
 22from uuid import UUID, uuid1
 23
 24# ##-- end stdlib imports
 25
 26# ##-- 3rd party imports
 27import jgdv
 28from jgdv import Proto, Mixin
 29from jgdv._abstract.protocols.general import Visitor_p
 30from jgdv.debugging.timing import TimeCtx
 31from bibtexparser import model
 32from bibtexparser.model import MiddlewareErrorBlock
 33
 34# ##-- end 3rd party imports
 35
 36# ##-- 1st party imports
 37from bibble import _interface as API
 38from . import _interface as API_W
 39from bibble.util.mixins import MiddlewareValidator_m
 40from bibble.model import MetaBlock, FailedBlock
 41from bibble.util import PairStack
 42
 43from ._util import Runner_m
 44# ##-- end 1st party imports
 45
 46# ##-- types
 47# isort: off
 48import abc
 49import collections.abc
 50from typing import TYPE_CHECKING, cast, assert_type, assert_never
 51from typing import Generic, NewType
 52# Protocols:
 53from typing import Protocol, runtime_checkable
 54# Typing Decorators:
 55from typing import no_type_check, final, override, overload
 56
 57if TYPE_CHECKING:
 58    from jgdv import Maybe
 59    from typing import Final
 60    from typing import ClassVar, Any, LiteralString
 61    from typing import Never, Self, Literal
 62    from typing import TypeGuard
 63    from collections.abc import Iterable, Iterator, Callable, Generator
 64    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 65    from logmod import Logger
 66
 67    from bibtexparser.library import Library
 68    from bibtexparser.writer import BibtexFormat
 69
 70    type Middleware = API.Middleware_p | API.BidirectionalMiddleware_p
 71##--|
 72
 73# isort: on
 74# ##-- end types
 75
 76##-- logging
 77logging = logmod.getLogger(__name__)
 78##-- end logging
 79
 80EMPTY_JOIN : Final[str] = ""
 81DEFAULT_ACTIVE : Final[set] = set([
 82    MetaBlock,
 83    model.Entry,
 84    model.String,
 85    model.Preamble,
 86    model.ExplicitComment,
 87    model.ImplicitComment,
 88])
 89##--|
 90
[docs] 91class _Visitors_m: 92
[docs] 93 def visit_metablock(self, block:MetaBlock) -> list[str]: 94 return []
95
[docs] 96 def visit_entry(self, block:model.Entry) -> list[str]: 97 res = ["@", block.entry_type, "{", block.key, ",\n"] 98 field: model.Field 99 for i, field in enumerate(block.fields): 100 res.append(self._align_key(field.key)) 101 res.append(str(field.value)) 102 if self.format.trailing_comma or i < len(block.fields) - 1: 103 res.append(",") 104 res.append("\n") 105 else: 106 res.append("}\n") 107 return res
108
[docs] 109 def visit_string(self, block:model.String) -> list[str]: 110 return [ 111 "@string{", 112 block.key, 113 self._value_sep, 114 block.value, 115 "}\n", 116 ]
117
[docs] 118 def visit_preamble(self, block:model.Preamble) -> list[str]: 119 return [f"@preamble{{{block.value}}}\n"]
120
[docs] 121 def visit_impl_comment(self, block:model.ImplicitComment) -> list[str]: 122 # Note: No explicit escaping is done here - that should be done in middleware 123 return [block.comment, "\n"]
124
[docs] 125 def visit_expl_comment(self, block:model.ExplicitComment) -> list[str]: 126 return ["@comment{", block.comment, "}\n"]
127
[docs] 128 def visit_failed_block(self, block:FailedBlock) -> list[str]: 129 lines = self.visit_entry(block._ignore_error_block) 130 line_count = len(lines) 131 err = f"<{block.error.__class__.__name__}> : {block.error}" 132 format_line = self.format.parsing_failed_comment 133 parsing_failed_comment = format_line.format(n=line_count, err=err) 134 return [parsing_failed_comment, "\n", 135 block.raw, "\n\n", 136 *lines, "\n", 137 API_W.FAIL_END, "\n"]
138 139
[docs] 140 def visit_middleware_error_block(self, block:model.MiddlewareErrorBlock) -> list[str]: 141 format_line = self.format.parsing_failed_comment 142 line_count = len(block.raw.splitlines()) 143 err = f"<{block.error.__class__.__name__}> : {block.error}" 144 parsing_failed_comment = format_line.format(n=line_count, err=err) 145 return [parsing_failed_comment, "\n", 146 block.raw, "\n", 147 API_W.FAIL_END, "\n"]
148
[docs] 149 def visit_parsing_failed_block(self, block:model.ParsingFailedBlock) -> list[str]: 150 format_line = self.format.parsing_failed_comment 151 line_count = len(block.raw.splitlines()) 152 err = f"<{block.error.__class__.__name__}> : {block.error}" 153 parsing_failed_comment = format_line.format(n=line_count, err=err) 154 return [parsing_failed_comment, "\n", 155 block.raw, "\n", 156 API_W.FAIL_END, "\n"]
157 158 159##--| 160
[docs] 161@Proto(Visitor_p, API.Writer_p) 162@Mixin(_Visitors_m, Runner_m, MiddlewareValidator_m) 163class BibbleWriter: 164 """ A Refactored bibtexparser writer 165 Uses visitor pattern 166 167 Note: visit method are responsible for new lines 168 """ 169 _value_sep : str 170 _value_column : Maybe[int] 171 _middlewares : list[Middleware] 172 format : BibtexFormat 173 _active_blocks : set[type[model.Block]] 174 175 def __init__(self, stack:PairStack|list[Middleware], *, format:Maybe[BibtexFormat]=None, logger:Maybe[Logger]=None, active_blocks:Maybe[Iterable[type[model.Block]]]=None): 176 self._value_sep = API_W.VAL_SEP 177 self._value_column = None 178 self._logger = logger or logging 179 self._join_char = EMPTY_JOIN 180 self._active_blocks = active_blocks or DEFAULT_ACTIVE 181 match stack: 182 case PairStack(): 183 self._middlewares = stack.write_stack() 184 case list(): 185 self._middlewares = stack 186 case x: 187 raise TypeError(type(x)) 188 189 match format: 190 case None: 191 self.format = deepcopy(API_W.default_format()) 192 case BibtexFormat(): 193 self.format = deepcopy(format) 194 case x: 195 raise TypeError(type(x)) 196 197 self.exclude_middlewares(API.ReadTime_p) 198
[docs] 199 def set_active(self, active:Iterable[type[model.Block]]) -> None: 200 self._active_blocks = set(active)
201
[docs] 202 def write(self, library:Library, *, file:None|pl.Path=None, append:Maybe[list[Middleware]]=None, title:Maybe[str]=None) -> str: 203 """ Write the library to a string, and possbly a file 204 # TODO write failure reports to a separate file 205 """ 206 self._calculate_auto_value_align(library) 207 208 with TimeCtx(logger=logging, level=logmod.INFO) as ctx: 209 ctx.msg("--> Write Transforms: Start") 210 transformed = self._run_writewares(library, append=append) 211 212 ctx.msg("<-- Write Transforms took: %s", ctx.total_s) 213 214 header = self.make_header(transformed, title) 215 body = self.make_body(transformed) 216 footer = self.make_footer(transformed, file) 217 lib = self.make_lib(header=header, body=body, footer=footer) 218 219 # Reset the value column: 220 self._value_column = None 221 match file: 222 case pl.Path(): 223 file.write_text(lib) 224 return lib 225 case _: 226 return lib
227
[docs] 228 def write_as_data(self, library:Library, *, file:None|pl.Path=None, append:Maybe[list[Middleware]]=None, title:Maybe[str]=None) -> Any: 229 """ Instead of writing the library out as a string, write it as data 230 231 eg: creating a docutils structure. 232 """ 233 raise NotImplementedError()
234
[docs] 235 def write_failures(self, library:Library, *, file:Maybe[pl.Path]=None, append:bool=False) -> str: 236 """ Write failed blocks to a separate file """ 237 curr_blocks = self._active_blocks 238 self._active_blocks = set([FailedBlock, model.ParsingFailedBlock, model.MiddlewareErrorBlock]) 239 result = self.write(library, append=append) 240 self._active_blocks = curr_blocks 241 if not file: 242 return result 243 244 with file.open('a') as f: 245 f.write(result) 246 247 return result
248
[docs] 249 def make_header(self, library, title:Maybe[str]) -> list[str]: 250 return []
251
[docs] 252 def make_body(self, library) -> list[str]: 253 total_entries = len(library.blocks) - 2 254 body : list[str] = [] 255 for i, block in enumerate(library.blocks): 256 # Get string representation (as list of strings) of block 257 pieces = self.visit(block) 258 body.extend(pieces) 259 # Separate Blocks 260 if i <= total_entries: 261 body.append(self.format.block_separator) 262 else: 263 return body
264 267
[docs] 268 def make_lib(self, *, header:list[str], body:list[str], footer:list[str]) -> str: 269 return self._join_char.join([*header, *body, *footer])
270
[docs] 271 def visit(self, block) -> list[str]: 272 match block: 273 case x if isinstance(x, API.CustomWriteBlock_p): 274 assert(hasattr(x, "visit")) 275 return x.visit(self) 276 ##--| Standard blocks 277 case MetaBlock() if MetaBlock in self._active_blocks: 278 return self.visit_metablock(block) 279 case model.Entry() if model.Entry in self._active_blocks: 280 return self.visit_entry(block) 281 case model.String() if model.String in self._active_blocks: 282 return self.visit_string(block) 283 case model.Preamble() if model.Preamble in self._active_blocks: 284 return self.visit_preamble(block) 285 case model.ExplicitComment() if model.ExplicitComment in self._active_blocks: 286 return self.visit_expl_comment(block) 287 case model.ImplicitComment() if model.ImplicitComment in self._active_blocks: 288 return self.visit_impl_comment(block) 289 ##--| Failures 290 case FailedBlock() if FailedBlock in self._active_blocks: 291 return self.visit_failed_block(block) 292 case model.MiddlewareErrorBlock() if model.MiddlewareErrorBlock in self._active_blocks: 293 return self.visit_middleware_error_block(block) 294 case model.ParsingFailedBlock() if model.ParsingFailedBlock in self._active_blocks: 295 return self.visit_parsing_failed_block(block) 296 case model.ParsingFailedBlock() if block._ignore_error_block is not None: 297 return self.visit(block._ignore_error_block) 298 case model.ParsingFailedBlock(): 299 return [] 300 case _: 301 logging.info(f"Skipping block type: {type(block)}") 302 return []
303
[docs] 304 def _calculate_auto_value_align(self, library: Library) -> None: 305 """ 306 Sets the separation between keys and the value separator. 307 If its already set, does nothing. 308 If the format specifies a value, uses that. 309 Otherwise calulates it from the larges field key 310 """ 311 if self._value_column is not None: 312 return 313 314 match self.format.value_column: 315 case int() as x: 316 self._value_column = x 317 case _: 318 max_key_len = 0 319 for entry in library.entries: 320 for key in entry.fields_dict: 321 max_key_len = max(max_key_len, len(key)) 322 ##--| 323 else: 324 self._value_column = max_key_len + len(self._value_sep)
325
[docs] 326 def _align_key(self, key: str) -> str: 327 """ take {key} and make {key}{padding}{sep}, 328 Padding is from '_calculate_auto_value_align', the largest key length. 329 Sep is typically '='. 330 331 eg: _align_key('blah') -> 'blah = ' 332 """ 333 match (self.format.value_column - len(key) - len(self._value_sep)): 334 case x if 0 <= x: 335 return f"{self.format.indent}{key}{' '*x}{self._value_sep}" 336 case x: 337 return f"{self.format.indent}{key}{' '*x}{self._value_sep}"