File size: 8,111 Bytes
7db0ae4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import os, types, traceback, copy
import json
from enum import Enum
import time
from typing import Callable, Optional
from litellm.utils import ModelResponse, get_secret, Choices, Message, Usage
import litellm
import sys, httpx
from .prompt_templates.factory import prompt_factory, custom_prompt


class GeminiError(Exception):
    def __init__(self, status_code, message):
        self.status_code = status_code
        self.message = message
        self.request = httpx.Request(
            method="POST",
            url="https://developers.generativeai.google/api/python/google/generativeai/chat",
        )
        self.response = httpx.Response(status_code=status_code, request=self.request)
        super().__init__(
            self.message
        )  # Call the base class constructor with the parameters it needs


class GeminiConfig:
    """
    Reference: https://ai.google.dev/api/python/google/generativeai/GenerationConfig

    The class `GeminiConfig` provides configuration for the Gemini's API interface. Here are the parameters:

    - `candidate_count` (int): Number of generated responses to return.

    - `stop_sequences` (List[str]): The set of character sequences (up to 5) that will stop output generation. If specified, the API will stop at the first appearance of a stop sequence. The stop sequence will not be included as part of the response.

    - `max_output_tokens` (int): The maximum number of tokens to include in a candidate. If unset, this will default to output_token_limit specified in the model's specification.

    - `temperature` (float): Controls the randomness of the output. Note: The default value varies by model, see the Model.temperature attribute of the Model returned the genai.get_model function. Values can range from [0.0,1.0], inclusive. A value closer to 1.0 will produce responses that are more varied and creative, while a value closer to 0.0 will typically result in more straightforward responses from the model.

    - `top_p` (float): Optional. The maximum cumulative probability of tokens to consider when sampling.

    - `top_k` (int): Optional. The maximum number of tokens to consider when sampling.
    """

    candidate_count: Optional[int] = None
    stop_sequences: Optional[list] = None
    max_output_tokens: Optional[int] = None
    temperature: Optional[float] = None
    top_p: Optional[float] = None
    top_k: Optional[int] = None

    def __init__(
        self,
        candidate_count: Optional[int] = None,
        stop_sequences: Optional[list] = None,
        max_output_tokens: Optional[int] = None,
        temperature: Optional[float] = None,
        top_p: Optional[float] = None,
        top_k: Optional[int] = None,
    ) -> None:
        locals_ = locals()
        for key, value in locals_.items():
            if key != "self" and value is not None:
                setattr(self.__class__, key, value)

    @classmethod
    def get_config(cls):
        return {
            k: v
            for k, v in cls.__dict__.items()
            if not k.startswith("__")
            and not isinstance(
                v,
                (
                    types.FunctionType,
                    types.BuiltinFunctionType,
                    classmethod,
                    staticmethod,
                ),
            )
            and v is not None
        }


def completion(
    model: str,
    messages: list,
    model_response: ModelResponse,
    print_verbose: Callable,
    api_key,
    encoding,
    logging_obj,
    custom_prompt_dict: dict,
    acompletion: bool = False,
    optional_params=None,
    litellm_params=None,
    logger_fn=None,
):
    try:
        import google.generativeai as genai
    except:
        raise Exception(
            "Importing google.generativeai failed, please run 'pip install -q google-generativeai"
        )
    genai.configure(api_key=api_key)

    if model in custom_prompt_dict:
        # check if the model has a registered custom prompt
        model_prompt_details = custom_prompt_dict[model]
        prompt = custom_prompt(
            role_dict=model_prompt_details["roles"],
            initial_prompt_value=model_prompt_details["initial_prompt_value"],
            final_prompt_value=model_prompt_details["final_prompt_value"],
            messages=messages,
        )
    else:
        prompt = prompt_factory(
            model=model, messages=messages, custom_llm_provider="gemini"
        )

    ## Load Config
    inference_params = copy.deepcopy(optional_params)
    inference_params.pop(
        "stream", None
    )  # palm does not support streaming, so we handle this by fake streaming in main.py
    config = litellm.GeminiConfig.get_config()
    for k, v in config.items():
        if (
            k not in inference_params
        ):  # completion(top_k=3) > gemini_config(top_k=3) <- allows for dynamic variables to be passed in
            inference_params[k] = v

    ## LOGGING
    logging_obj.pre_call(
        input=prompt,
        api_key="",
        additional_args={"complete_input_dict": {"inference_params": inference_params}},
    )
    ## COMPLETION CALL
    try:
        _model = genai.GenerativeModel(f"models/{model}")
        response = _model.generate_content(
            contents=prompt,
            generation_config=genai.types.GenerationConfig(**inference_params),
        )
    except Exception as e:
        raise GeminiError(
            message=str(e),
            status_code=500,
        )

    ## LOGGING
    logging_obj.post_call(
        input=prompt,
        api_key="",
        original_response=response,
        additional_args={"complete_input_dict": {}},
    )
    print_verbose(f"raw model_response: {response}")
    ## RESPONSE OBJECT
    completion_response = response
    try:
        choices_list = []
        for idx, item in enumerate(completion_response.candidates):
            if len(item.content.parts) > 0:
                message_obj = Message(content=item.content.parts[0].text)
            else:
                message_obj = Message(content=None)
            choice_obj = Choices(index=idx + 1, message=message_obj)
            choices_list.append(choice_obj)
        model_response["choices"] = choices_list
    except Exception as e:
        traceback.print_exc()
        raise GeminiError(
            message=traceback.format_exc(), status_code=response.status_code
        )

    try:
        completion_response = model_response["choices"][0]["message"].get("content")
        if completion_response is None: 
            raise Exception
    except:
        original_response = f"response: {response}"
        if hasattr(response, "candidates"): 
            original_response = f"response: {response.candidates}"
            if "SAFETY" in original_response: 
                original_response += "\nThe candidate content was flagged for safety reasons."
            elif "RECITATION" in original_response:
                original_response += "\nThe candidate content was flagged for recitation reasons."
        raise GeminiError(
            status_code=400,
            message=f"No response received. Original response - {original_response}",
        )

    ## CALCULATING USAGE
    prompt_str = ""
    for m in messages:
        if isinstance(m["content"], str):
            prompt_str += m["content"]
        elif isinstance(m["content"], list):
            for content in m["content"]:
                if content["type"] == "text":
                    prompt_str += content["text"]
    prompt_tokens = len(encoding.encode(prompt_str))
    completion_tokens = len(
        encoding.encode(model_response["choices"][0]["message"].get("content", ""))
    )

    model_response["created"] = int(time.time())
    model_response["model"] = "gemini/" + model
    usage = Usage(
        prompt_tokens=prompt_tokens,
        completion_tokens=completion_tokens,
        total_tokens=prompt_tokens + completion_tokens,
    )
    model_response.usage = usage
    return model_response


def embedding():
    # logic for parsing in - calling - parsing out model embedding calls
    pass