Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botframework-streaming
/botframework
/streaming
/receive_response.py
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
from typing import List, Union, Type | |
from msrest.serialization import Model | |
from botframework.streaming.payloads import ContentStream | |
from botframework.streaming.payloads.models import Serializable | |
class ReceiveResponse: | |
def __init__(self, status_code: int = 0, streams: List[ContentStream] = None): | |
self.status_code = status_code | |
self.streams = streams or [] | |
def read_body_as_json( | |
self, cls: Union[Type[Model], Type[Serializable]] | |
) -> Union[Model, Serializable]: | |
try: | |
body_str = self.read_body_as_str() | |
body = None | |
if issubclass(cls, Serializable): | |
body = cls().from_json(body_str) | |
elif isinstance(cls, Model): | |
body = cls.deserialize(body_str) | |
return body | |
except Exception as error: | |
raise error | |
def read_body_as_str(self) -> str: | |
try: | |
content_stream = self.read_body() | |
if not content_stream: | |
return "" | |
# TODO: encoding double check | |
return content_stream.decode("utf8") | |
except Exception as error: | |
raise error | |
def read_body(self) -> bytes: | |
try: | |
content_stream = self.streams[0] if self.streams else None | |
if not content_stream: | |
return None | |
return bytes(content_stream.stream) | |
except Exception as error: | |
raise error | |