#!/usr/bin/python
# -*- coding: utf-8 -*-
# Hive Appier Framework
# Copyright (c) 2008-2024 Hive Solutions Lda.
#
# This file is part of Hive Appier Framework.
#
# Hive Appier Framework is free software: you can redistribute it and/or modify
# it under the terms of the Apache License as published by the Apache
# Foundation, either version 2.0 of the License, or (at your option) any
# later version.
#
# Hive Appier Framework is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache License for more details.
#
# You should have received a copy of the Apache License along with
# Hive Appier Framework. If not, see .
__author__ = "João Magalhães "
""" The author(s) of the module """
__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda."
""" The copyright for the module """
__license__ = "Apache License, Version 2.0"
""" The license for the module """
import time
import heapq
import calendar
import datetime
import traceback
import threading
from . import common
BACKGROUND = []
""" The list containing the various global registered
function to be executed as background operations, note
that only the name is used in the list so a possible
collision of tasks is possible """
SLEEP_TIME = 0.5
""" The amount of time to sleep between iteration
this amount should be small enough to provide some
resolution level to the schedule execution """
background_t = None
""" The background execution task to be started by
the appier execution system (global value) """
class ExecutionThread(threading.Thread):
"""
The thread to be used in the execution of "random"
"callables" for a provided time, this thread contains
a series of thread safe method for operating over
the work tuples.
"""
run_flag = True
""" The flag that controls the running operations
of the execution thread, once this value is unset
the thread is exited """
work_list = []
""" The list containing the various work descriptors
for the work to be done, this work is going to be
run in a single thread (in sequence) """
work_lock = None
""" The lock that control the access to the list of
work to be executed """
def __init__(self):
"""
Constructor of the class.
"""
threading.Thread.__init__(self, name="Execution")
self.daemon = True
self.work_list = []
self.work_lock = threading.RLock()
def run(self):
# iterates continuously (executing work)
# while the run flag is set
while self.run_flag:
# creates a list list that will would the
# work tuples to be executed (this way the
# lock problem is avoided)
execution_list = []
# acquires the lock to access the list
# of work and execute it
self.work_lock.acquire()
# retrieves the current time, this variable
# is going to be used to check if the work in
# iteration should be run or not
current_time = time.time()
try:
# iterates continuously to execute all the
# work that can be executed in the work list
while True:
# in case there is no work pending to be
# executed must exist immediately
if not self.work_list:
break
# retrieves the current work tuple to
# be used and executes it in case the
# time has passed (should be executed)
_time, callable, callback, args, kwargs = self.work_list[0]
if _time < current_time:
execution_list.append((callable, callback, args, kwargs))
heapq.heappop(self.work_list)
else:
break
finally:
# releases the work lock providing access
# to the work list
self.work_lock.release()
# iterates over all the "callables" in the execution
# list to execute their operations
for callable, callback, args, kwargs in execution_list:
# sets the initial (default) value for the error
# variable that controls the result of the execution
error = None
# executes the "callable" and logs the error in case the
# execution fails (must be done to log the error) then
# sets the error flag with the exception variable
try:
callable(*args, **kwargs)
except Exception as exception:
error = exception
lines = traceback.format_exc().splitlines()
logger = common.base().get_logger()
logger.warning(str(exception))
for line in lines:
logger.info(line)
# calls the callback method with the currently set error
# in order to notify the runtime about the problem, only
# calls the callback in case such method is defined
callback and callback(error=error)
# sleeps for a while so that the process may
# released for different tasks
time.sleep(SLEEP_TIME)
def stop(self):
self.run_flag = False
def insert_work(
self, callable, args=[], kwargs={}, target_time=None, callback=None
):
target_time = target_time or time.time()
work = (target_time, callable, callback, args, kwargs)
self.work_lock.acquire()
try:
heapq.heappush(self.work_list, work)
finally:
self.work_lock.release()
def background(timeout=None):
def decorator(function):
_timeout = timeout or 0.0
def schedule(error=None, force=False):
if timeout == None and not force:
return
target = time.time() + _timeout
insert_work(function, target, schedule)
# retrieves the name of the function and in
# case the name already exists in the global
# list of background execution tasks returns
# immediately (nothing to be done, duplicate)
fname = function.__name__
exists = fname in BACKGROUND
if exists:
return function
# runs the scheduling operation on the task and
# then adds the function name to the list of already
# registered names
schedule(force=True)
BACKGROUND.append(fname)
return function
return decorator
def insert_work(callable, args=[], kwargs={}, target_time=None, callback=None):
"""
Runs the provided callable (function, method, etc) in a separated
thread context under submission of a queue system.
It's possible to control the runtime for the execution with the
``target_time`` argument and it's also possible to be notified
of the end of the execution providing a callable to the ``callback``
parameter.
.. warning::
The execution is not guaranteed as the system process may be
interrupted and resuming of the execution would not be possible.
:type callable: Function
:param callable: The callable object to be called in a separated\
execution environment.
:type args: List
:param args: The list of unnamed argument values to be send to the\
callable upon execution.
:type args: Dictionary
:param args: The dictionary of named argument values to be send to the\
callable upon execution.
:type target_time: float
:param target_time: The target timestamp value for execution, in case\
it's not provided the current time is used as the target one.
:type callback: Function
:param callback: The callback function to be called upon finishing the\
execution of the callable, in case an error (exception) on executing\
the callback the error is passed as error argument.
"""
background_t.insert_work(
callable, args=args, kwargs=kwargs, target_time=target_time, callback=callback
)
def interval_work(
callable, args=[], kwargs={}, callback=None, initial=None, interval=60, eval=None
):
initial = initial or (eval and eval()) or time.time()
composed = build_composed(callable, initial, interval, eval, callback)
insert_work(
composed, args=args, kwargs=kwargs, target_time=initial, callback=callback
)
return initial
def seconds_work(callable, offset=0, *args, **kwargs):
eval = lambda: seconds_eval(offset)
return interval_work(callable, eval=eval, *args, **kwargs)
def minutes_work(callable, offset=0, *args, **kwargs):
eval = lambda: minutes_eval(offset)
return interval_work(callable, eval=eval, *args, **kwargs)
def hourly_work(callable, offset=0, *args, **kwargs):
eval = lambda: hourly_eval(offset)
return interval_work(callable, eval=eval, *args, **kwargs)
def daily_work(callable, offset=0, *args, **kwargs):
eval = lambda: daily_eval(offset)
return interval_work(callable, eval=eval, *args, **kwargs)
def weekly_work(callable, weekday=4, offset=0, *args, **kwargs):
eval = lambda: weekly_eval(weekday, offset)
return interval_work(callable, eval=eval, *args, **kwargs)
def monthly_work(callable, monthday=1, offset=0, *args, **kwargs):
eval = lambda: monthly_eval(monthday, offset)
return interval_work(callable, eval=eval, *args, **kwargs)
def seconds_eval(offset, now=None):
now = now or datetime.datetime.utcnow()
next = now + datetime.timedelta(seconds=offset)
next_tuple = next.utctimetuple()
return calendar.timegm(next_tuple)
def minutes_eval(offset, now=None):
now = now or datetime.datetime.utcnow()
current = datetime.datetime(
year=now.year, month=now.month, day=now.day, hour=now.hour, minute=now.minute
)
next = current + datetime.timedelta(minutes=1, seconds=offset)
next_tuple = next.utctimetuple()
return calendar.timegm(next_tuple)
def hourly_eval(offset, now=None):
now = now or datetime.datetime.utcnow()
current = datetime.datetime(
year=now.year, month=now.month, day=now.day, hour=now.hour
)
next = current + datetime.timedelta(hours=1, seconds=offset)
next_tuple = next.utctimetuple()
return calendar.timegm(next_tuple)
def daily_eval(offset, now=None):
now = now or datetime.datetime.utcnow()
today = datetime.datetime(year=now.year, month=now.month, day=now.day)
tomorrow = today + datetime.timedelta(days=1, seconds=offset)
tomorrow_tuple = tomorrow.utctimetuple()
return calendar.timegm(tomorrow_tuple)
def weekly_eval(weekday, offset, now=None):
now = now or datetime.datetime.utcnow()
today = datetime.datetime(year=now.year, month=now.month, day=now.day)
distance = (weekday - today.weekday()) % 7
weekday = today + datetime.timedelta(days=distance, seconds=offset)
if weekday < now:
weekday += datetime.timedelta(days=7)
weekday_tuple = weekday.utctimetuple()
return calendar.timegm(weekday_tuple)
def monthly_eval(monthday, offset, now=None):
now = now or datetime.datetime.utcnow()
next_year, next_month = (
(now.year + 1, 1) if now.month == 12 else (now.year, now.month + 1)
)
if now.day > monthday:
month, year = (next_month, next_year)
else:
month, year = (now.month, now.year)
monthday = datetime.datetime(year=year, month=month, day=monthday)
monthday = monthday + datetime.timedelta(seconds=offset)
if monthday < now:
monthday = datetime.datetime(year=next_year, month=next_month, day=monthday.day)
monthday += datetime.timedelta(seconds=offset)
monthday_tuple = monthday.utctimetuple()
return calendar.timegm(monthday_tuple)
def build_composed(callable, target_time, interval, eval, callback):
def composed(*args, **kwargs):
try:
# runs the initial callable, propagating the provided normal arguments
# and keyword based ones to the callable as it's expected by the current
# underlying running logic (and by the specification)
result = callable(*args, **kwargs)
finally:
if eval:
# in case the evaluation function for the next timing exists it must be
# called to be able to retrieve the target timing for the next execution
# this is required from a specification point of view (dual mode)
next_time = eval()
else:
# retrieves the current time value as the final value of execution, then
# calculates the delta value and uses it to verify if the current work is
# allowed for initial based time delta calculus (avoiding queue starvation)
final = time.time()
delta = final - target_time
is_valid = delta < interval
if is_valid:
next_time = target_time + interval
else:
next_time = final + interval
# builds a new callable (composed) method taking into account the state and
# inserts the work unit again into the queue of processing
composed = build_composed(callable, next_time, interval, eval, callback)
insert_work(
composed,
args=args,
kwargs=kwargs,
target_time=next_time,
callback=callback,
)
# returns the current result from the original callable to the calling method,
# this is the expected behavior from the scheduler point of view
return result
return composed