|
import base64
|
|
import os
|
|
import zlib, hmac, hashlib, time, json
|
|
|
|
|
|
class APICredentials:
|
|
"""
|
|
Stores credentials required to request our API.
|
|
"""
|
|
@property
|
|
def user_id(self):
|
|
"""
|
|
Set your user ID to the one defined on the User Settings page.
|
|
"""
|
|
return "1234567890abcdefghijklmnopqrstu"
|
|
|
|
@property
|
|
def secret(self):
|
|
"""
|
|
Set your API key to a value defined on the Integration Keys page.
|
|
"""
|
|
return "1234567890abcdefghijklmnopqrstuvwxyz1234"
|
|
|
|
|
|
class APIParams(object):
|
|
"""
|
|
Provides API authentication. Learn more at:
|
|
https://docs.edgecast.com/video/#Develop/Versioned-API.htm
|
|
"""
|
|
def __init__(self, credentials):
|
|
self.credentials = credentials
|
|
|
|
def get_params(self, data):
|
|
"""
|
|
Encodes and signs <data> into the expected format and returns it.
|
|
"""
|
|
data = self._get_params(**data)
|
|
data.update(data)
|
|
return data
|
|
|
|
def _get_msg(self, msg=None):
|
|
"""
|
|
Encodes and returns the 'msg' parameter.
|
|
"""
|
|
msg = msg if msg else {}
|
|
|
|
msg.update({
|
|
'_owner': self.credentials.user_id,
|
|
'_timestamp': int(time.time())
|
|
})
|
|
|
|
msg = json.dumps(msg)
|
|
msg_compressed = zlib.compress(msg.encode(), 9)
|
|
return base64.b64encode(msg_compressed).strip()
|
|
|
|
def _get_params(self, **msg):
|
|
"""
|
|
Returns the message and its signature.
|
|
"""
|
|
msg = self._get_msg(msg)
|
|
|
|
sig = hmac.new(
|
|
self.credentials.secret.encode(), msg, hashlib.sha256
|
|
).hexdigest()
|
|
|
|
return {
|
|
'msg': msg,
|
|
'sig': sig
|
|
} |