|
1 | 1 | import logging
|
2 | 2 | import numbers
|
| 3 | +import shlex |
3 | 4 | from types import TracebackType
|
4 |
| -from typing import Any, Dict, List, Optional, Tuple, Type, cast |
| 5 | +from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast |
5 | 6 |
|
6 | 7 | ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
|
7 | 8 |
|
|
42 | 43 | REPLACEMENTS.update({"\n": "\\n", "\t": "\\t", "\r": "\\r"})
|
43 | 44 |
|
44 | 45 |
|
| 46 | +def parse_logfmt( |
| 47 | + line: str, aliases: Optional[Dict[str, str]] = None, convert_numeric: bool = False |
| 48 | +) -> Dict[str, Union[str, int, float]]: |
| 49 | + """ |
| 50 | + Parse a logfmt formatted string. |
| 51 | +
|
| 52 | + Raises ValueError on parsing errors. |
| 53 | + """ |
| 54 | + aliases = aliases or {} |
| 55 | + fields = {} |
| 56 | + |
| 57 | + if "\n" in line: |
| 58 | + # TODO: be strict. Don't parse what we don't produce?! |
| 59 | + raise ValueError() |
| 60 | + |
| 61 | + lexer = shlex.shlex(line, posix=True) |
| 62 | + lexer.quotes = '"' |
| 63 | + lexer.whitespace_split = True |
| 64 | + lexer.commenters = "" |
| 65 | + for token in lexer: |
| 66 | + value: Union[str, int, float] |
| 67 | + key, _, value = token.partition("=") |
| 68 | + key = aliases.get(key, key) |
| 69 | + for replacement, string in REPLACEMENTS.items(): |
| 70 | + value = value.replace(string, replacement) |
| 71 | + if convert_numeric and value.isdigit(): |
| 72 | + value = int(value) |
| 73 | + elif ( |
| 74 | + convert_numeric |
| 75 | + and value.count(".") == 1 |
| 76 | + and value.replace(".", "").isdigit() |
| 77 | + ): |
| 78 | + value = float(value) |
| 79 | + if not key: |
| 80 | + raise ValueError() |
| 81 | + fields[key] = value |
| 82 | + return fields |
| 83 | + |
| 84 | + |
45 | 85 | class _DefaultFormatter(logging.Formatter):
|
46 | 86 | def format(self, record):
|
47 | 87 | exc_info = record.exc_info
|
|
0 commit comments