File size: 4,566 Bytes
0aee47a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import re
from enum import Enum
from inspect import isclass
from inspect import isfunction as isFn
from inspect import iscoroutinefunction as isAsync
from typing import Any, Dict, List, Tuple, Optional

import bilibili_api

FUNC = re.compile(r"[^\.\(]+")
ARGS = re.compile(r"[^,\(\)]*[,\)]")
SENTENCES = re.compile(r"\w+(?:\([^\)]*\))?\.?")
OPS = {
    ":int": int,
    ":float": float,
    ":bool": lambda s: s == "True",
}


class Parser:
    """

    解析器

    """

    def __init__(self, params: Dict[str, str]):
        self.valid = True
        self.params = params

    async def __aenter__(self):
        """

        解析前准备



        把 params 中的参数先解析了

        """
        for key, val in self.params.items():
            obj, err = await self.parse(val)
            if err is None:
                if isinstance(obj, bilibili_api.Credential):
                    self.valid = await bilibili_api.check_valid(obj)
                self.params[key] = obj
        return self

    async def __aexit__(self, type, value, trace):
        ...

    async def transform(self, var: str) -> Any:
        """

        类型装换函数

        

        通过在字符串后加上 `:int` `:float` `:bool` `:parse` 等操作符来实现



        Args:

            var (str): 需要转换的字符串

        

        Returns:

            Any: 装换结果

        """
        for key, fn in OPS.items():
            if var.endswith(key):
                return fn(var.replace(key, ""))
        if var.endswith(":parse"):
            obj, err = await self.parse(var.replace(":parse", ""))
            if err is None:
                return obj

        # 是请求时 params 中定义的变量 获取出来 不是的话返回原字符
        return self.params.get(var, var)

    async def parse(self, path: str) -> Tuple[Any, Optional[str]]:
        """

        分析指令

        

        Args:

            path (str): 需要解析的 token 对应库中的路径

        

        Returns:

            Any: 最终数据 若解析失败为 None



            str: 错误信息 若解析成功为 None

        """
        # 纯数字
        if path.replace(":int", "").replace(":float", "").replace(".", "").replace("-", "").isdigit():
            return await self.transform(path), None

        # 指令列表
        sentences = SENTENCES.findall(path)
        # 起始点
        position: Any = bilibili_api

        async def inner() -> Optional[str]:
            """

            递归取值

            

            Returns:

                str: 错误信息 若解析成功为 None

            """
            nonlocal position
            # 分解执行的函数名、参数、指名参数
            sentence = sentences.pop(0)
            func: str = FUNC.findall(sentence)[0]
            flags: List[str] = ARGS.findall(sentence)
            args, kwargs = [], {}

            for flag in flags:
                # 去除句尾的逗号或小括号
                flag = flag[:-1]
                if flag == "":
                    continue

                # 通过判断是否有等号存入参数列表或指名参数字典
                arg = flag.split("=")
                if len(arg) == 1:
                    args.append(await self.transform(arg[0]))
                else:
                    kwargs[arg[0]] = await self.transform(arg[1])

            # print(position, func, args, kwargs)

            # 开始转移
            if isinstance(position, dict):
                position = position.get(func, None)
            elif isinstance(position, list):
                position = position[int(func)]
            else:
                position = getattr(position, func, None)

            # 赋值参数
            if isAsync(position):
                position = await position(*args, **kwargs)
            elif isFn(position):
                position = position(*args, **kwargs)
            elif isclass(position) and not issubclass(position, Enum):
                position = position(*args, **kwargs)

            # 为空返回出错语句
            # 否则检查是否分析完全部语句
            # 是则返回 None 否继续递归
            if position is None:
                return sentence
            if len(sentences) == 0:
                return None
            return await inner()

        msg = await inner()
        return position, msg