1#!/usr/bin/env python3
2"""
3
4"""
5# mypy: disable-error-code="attr-defined"
6
7# Imports:
8from __future__ import annotations
9
10# ##-- stdlib imports
11import datetime
12import enum
13import functools as ftz
14import itertools as itz
15import logging as logmod
16import pathlib as pl
17import re
18import time
19import types
20import weakref
21from copy import deepcopy
22from uuid import UUID, uuid1
23
24# ##-- end stdlib imports
25
26# ##-- 3rd party imports
27import jgdv
28from jgdv import Proto, Mixin
29from jgdv._abstract.protocols.general import Visitor_p
30from jgdv.debugging.timing import TimeCtx
31from bibtexparser import model
32from bibtexparser.model import MiddlewareErrorBlock
33
34# ##-- end 3rd party imports
35
36# ##-- 1st party imports
37from bibble import _interface as API
38from . import _interface as API_W
39from bibble.util.mixins import MiddlewareValidator_m
40from bibble.model import MetaBlock, FailedBlock
41from bibble.util import PairStack
42
43from ._util import Runner_m
44# ##-- end 1st party imports
45
46# ##-- types
47# isort: off
48import abc
49import collections.abc
50from typing import TYPE_CHECKING, cast, assert_type, assert_never
51from typing import Generic, NewType
52# Protocols:
53from typing import Protocol, runtime_checkable
54# Typing Decorators:
55from typing import no_type_check, final, override, overload
56
57if TYPE_CHECKING:
58 from jgdv import Maybe
59 from typing import Final
60 from typing import ClassVar, Any, LiteralString
61 from typing import Never, Self, Literal
62 from typing import TypeGuard
63 from collections.abc import Iterable, Iterator, Callable, Generator
64 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
65 from logmod import Logger
66
67 from bibtexparser.library import Library
68 from bibtexparser.writer import BibtexFormat
69
70 type Middleware = API.Middleware_p | API.BidirectionalMiddleware_p
71##--|
72
73# isort: on
74# ##-- end types
75
76##-- logging
77logging = logmod.getLogger(__name__)
78##-- end logging
79
80EMPTY_JOIN : Final[str] = ""
81DEFAULT_ACTIVE : Final[set] = set([
82 MetaBlock,
83 model.Entry,
84 model.String,
85 model.Preamble,
86 model.ExplicitComment,
87 model.ImplicitComment,
88])
89##--|
90
[docs]
91class _Visitors_m:
92
95
[docs]
96 def visit_entry(self, block:model.Entry) -> list[str]:
97 res = ["@", block.entry_type, "{", block.key, ",\n"]
98 field: model.Field
99 for i, field in enumerate(block.fields):
100 res.append(self._align_key(field.key))
101 res.append(str(field.value))
102 if self.format.trailing_comma or i < len(block.fields) - 1:
103 res.append(",")
104 res.append("\n")
105 else:
106 res.append("}\n")
107 return res
108
[docs]
109 def visit_string(self, block:model.String) -> list[str]:
110 return [
111 "@string{",
112 block.key,
113 self._value_sep,
114 block.value,
115 "}\n",
116 ]
117
[docs]
118 def visit_preamble(self, block:model.Preamble) -> list[str]:
119 return [f"@preamble{{{block.value}}}\n"]
120
124
127
[docs]
128 def visit_failed_block(self, block:FailedBlock) -> list[str]:
129 lines = self.visit_entry(block._ignore_error_block)
130 line_count = len(lines)
131 err = f"<{block.error.__class__.__name__}> : {block.error}"
132 format_line = self.format.parsing_failed_comment
133 parsing_failed_comment = format_line.format(n=line_count, err=err)
134 return [parsing_failed_comment, "\n",
135 block.raw, "\n\n",
136 *lines, "\n",
137 API_W.FAIL_END, "\n"]
138
139
[docs]
140 def visit_middleware_error_block(self, block:model.MiddlewareErrorBlock) -> list[str]:
141 format_line = self.format.parsing_failed_comment
142 line_count = len(block.raw.splitlines())
143 err = f"<{block.error.__class__.__name__}> : {block.error}"
144 parsing_failed_comment = format_line.format(n=line_count, err=err)
145 return [parsing_failed_comment, "\n",
146 block.raw, "\n",
147 API_W.FAIL_END, "\n"]
148
[docs]
149 def visit_parsing_failed_block(self, block:model.ParsingFailedBlock) -> list[str]:
150 format_line = self.format.parsing_failed_comment
151 line_count = len(block.raw.splitlines())
152 err = f"<{block.error.__class__.__name__}> : {block.error}"
153 parsing_failed_comment = format_line.format(n=line_count, err=err)
154 return [parsing_failed_comment, "\n",
155 block.raw, "\n",
156 API_W.FAIL_END, "\n"]
157
158
159##--|
160
[docs]
161@Proto(Visitor_p, API.Writer_p)
162@Mixin(_Visitors_m, Runner_m, MiddlewareValidator_m)
163class BibbleWriter:
164 """ A Refactored bibtexparser writer
165 Uses visitor pattern
166
167 Note: visit method are responsible for new lines
168 """
169 _value_sep : str
170 _value_column : Maybe[int]
171 _middlewares : list[Middleware]
172 format : BibtexFormat
173 _active_blocks : set[type[model.Block]]
174
175 def __init__(self, stack:PairStack|list[Middleware], *, format:Maybe[BibtexFormat]=None, logger:Maybe[Logger]=None, active_blocks:Maybe[Iterable[type[model.Block]]]=None):
176 self._value_sep = API_W.VAL_SEP
177 self._value_column = None
178 self._logger = logger or logging
179 self._join_char = EMPTY_JOIN
180 self._active_blocks = active_blocks or DEFAULT_ACTIVE
181 match stack:
182 case PairStack():
183 self._middlewares = stack.write_stack()
184 case list():
185 self._middlewares = stack
186 case x:
187 raise TypeError(type(x))
188
189 match format:
190 case None:
191 self.format = deepcopy(API_W.default_format())
192 case BibtexFormat():
193 self.format = deepcopy(format)
194 case x:
195 raise TypeError(type(x))
196
197 self.exclude_middlewares(API.ReadTime_p)
198
[docs]
199 def set_active(self, active:Iterable[type[model.Block]]) -> None:
200 self._active_blocks = set(active)
201
[docs]
202 def write(self, library:Library, *, file:None|pl.Path=None, append:Maybe[list[Middleware]]=None, title:Maybe[str]=None) -> str:
203 """ Write the library to a string, and possbly a file
204 # TODO write failure reports to a separate file
205 """
206 self._calculate_auto_value_align(library)
207
208 with TimeCtx(logger=logging, level=logmod.INFO) as ctx:
209 ctx.msg("--> Write Transforms: Start")
210 transformed = self._run_writewares(library, append=append)
211
212 ctx.msg("<-- Write Transforms took: %s", ctx.total_s)
213
214 header = self.make_header(transformed, title)
215 body = self.make_body(transformed)
216 footer = self.make_footer(transformed, file)
217 lib = self.make_lib(header=header, body=body, footer=footer)
218
219 # Reset the value column:
220 self._value_column = None
221 match file:
222 case pl.Path():
223 file.write_text(lib)
224 return lib
225 case _:
226 return lib
227
[docs]
228 def write_as_data(self, library:Library, *, file:None|pl.Path=None, append:Maybe[list[Middleware]]=None, title:Maybe[str]=None) -> Any:
229 """ Instead of writing the library out as a string, write it as data
230
231 eg: creating a docutils structure.
232 """
233 raise NotImplementedError()
234
[docs]
235 def write_failures(self, library:Library, *, file:Maybe[pl.Path]=None, append:bool=False) -> str:
236 """ Write failed blocks to a separate file """
237 curr_blocks = self._active_blocks
238 self._active_blocks = set([FailedBlock, model.ParsingFailedBlock, model.MiddlewareErrorBlock])
239 result = self.write(library, append=append)
240 self._active_blocks = curr_blocks
241 if not file:
242 return result
243
244 with file.open('a') as f:
245 f.write(result)
246
247 return result
248
251
[docs]
252 def make_body(self, library) -> list[str]:
253 total_entries = len(library.blocks) - 2
254 body : list[str] = []
255 for i, block in enumerate(library.blocks):
256 # Get string representation (as list of strings) of block
257 pieces = self.visit(block)
258 body.extend(pieces)
259 # Separate Blocks
260 if i <= total_entries:
261 body.append(self.format.block_separator)
262 else:
263 return body
264
267
[docs]
268 def make_lib(self, *, header:list[str], body:list[str], footer:list[str]) -> str:
269 return self._join_char.join([*header, *body, *footer])
270
[docs]
271 def visit(self, block) -> list[str]:
272 match block:
273 case x if isinstance(x, API.CustomWriteBlock_p):
274 assert(hasattr(x, "visit"))
275 return x.visit(self)
276 ##--| Standard blocks
277 case MetaBlock() if MetaBlock in self._active_blocks:
278 return self.visit_metablock(block)
279 case model.Entry() if model.Entry in self._active_blocks:
280 return self.visit_entry(block)
281 case model.String() if model.String in self._active_blocks:
282 return self.visit_string(block)
283 case model.Preamble() if model.Preamble in self._active_blocks:
284 return self.visit_preamble(block)
285 case model.ExplicitComment() if model.ExplicitComment in self._active_blocks:
286 return self.visit_expl_comment(block)
287 case model.ImplicitComment() if model.ImplicitComment in self._active_blocks:
288 return self.visit_impl_comment(block)
289 ##--| Failures
290 case FailedBlock() if FailedBlock in self._active_blocks:
291 return self.visit_failed_block(block)
292 case model.MiddlewareErrorBlock() if model.MiddlewareErrorBlock in self._active_blocks:
293 return self.visit_middleware_error_block(block)
294 case model.ParsingFailedBlock() if model.ParsingFailedBlock in self._active_blocks:
295 return self.visit_parsing_failed_block(block)
296 case model.ParsingFailedBlock() if block._ignore_error_block is not None:
297 return self.visit(block._ignore_error_block)
298 case model.ParsingFailedBlock():
299 return []
300 case _:
301 logging.info(f"Skipping block type: {type(block)}")
302 return []
303
[docs]
304 def _calculate_auto_value_align(self, library: Library) -> None:
305 """
306 Sets the separation between keys and the value separator.
307 If its already set, does nothing.
308 If the format specifies a value, uses that.
309 Otherwise calulates it from the larges field key
310 """
311 if self._value_column is not None:
312 return
313
314 match self.format.value_column:
315 case int() as x:
316 self._value_column = x
317 case _:
318 max_key_len = 0
319 for entry in library.entries:
320 for key in entry.fields_dict:
321 max_key_len = max(max_key_len, len(key))
322 ##--|
323 else:
324 self._value_column = max_key_len + len(self._value_sep)
325
[docs]
326 def _align_key(self, key: str) -> str:
327 """ take {key} and make {key}{padding}{sep},
328 Padding is from '_calculate_auto_value_align', the largest key length.
329 Sep is typically '='.
330
331 eg: _align_key('blah') -> 'blah = '
332 """
333 match (self.format.value_column - len(key) - len(self._value_sep)):
334 case x if 0 <= x:
335 return f"{self.format.indent}{key}{' '*x}{self._value_sep}"
336 case x:
337 return f"{self.format.indent}{key}{' '*x}{self._value_sep}"