|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import time |
|
import os |
|
|
|
from PyQt4.QtCore import * |
|
from PyQt4.QtGui import * |
|
from PyQt4.QtWebKit import * |
|
from PyQt4.QtNetwork import * |
|
|
|
|
|
|
|
class WebkitRenderer(QObject): |
|
""" |
|
A class that helps to create 'screenshots' of webpages using |
|
Qt's QWebkit. Requires PyQt4 library. |
|
|
|
Use "render()" to get a 'QImage' object, render_to_bytes() to get the |
|
resulting image as 'str' object or render_to_file() to write the image |
|
directly into a 'file' resource. |
|
""" |
|
def __init__(self,**kwargs): |
|
""" |
|
Sets default values for the properties. |
|
""" |
|
|
|
if not QApplication.instance(): |
|
raise RuntimeError(self.__class__.__name__ + " requires a running QApplication instance") |
|
QObject.__init__(self) |
|
|
|
|
|
self.width = kwargs.get('width', 0) |
|
self.height = kwargs.get('height', 0) |
|
self.timeout = kwargs.get('timeout', 0) |
|
self.wait = kwargs.get('wait', 0) |
|
self.scaleToWidth = kwargs.get('scaleToWidth', 0) |
|
self.scaleToHeight = kwargs.get('scaleToHeight', 0) |
|
self.scaleRatio = kwargs.get('scaleRatio', 'keep') |
|
self.format = kwargs.get('format', 'png') |
|
self.logger = kwargs.get('logger', None) |
|
|
|
|
|
|
|
|
|
self.grabWholeWindow = kwargs.get('grabWholeWindow', False) |
|
self.renderTransparentBackground = kwargs.get('renderTransparentBackground', False) |
|
self.ignoreAlert = kwargs.get('ignoreAlert', True) |
|
self.ignoreConfirm = kwargs.get('ignoreConfirm', True) |
|
self.ignorePrompt = kwargs.get('ignorePrompt', True) |
|
self.interruptJavaScript = kwargs.get('interruptJavaScript', True) |
|
self.encodedUrl = kwargs.get('encodedUrl', False) |
|
self.cookies = kwargs.get('cookies', []) |
|
|
|
|
|
self.qWebSettings = { |
|
QWebSettings.JavascriptEnabled : False, |
|
QWebSettings.PluginsEnabled : False, |
|
QWebSettings.PrivateBrowsingEnabled : True, |
|
QWebSettings.JavascriptCanOpenWindows : False |
|
} |
|
|
|
|
|
def render(self, res): |
|
""" |
|
Renders the given URL into a QImage object |
|
""" |
|
|
|
|
|
|
|
helper = _WebkitRendererHelper(self) |
|
helper._window.resize( self.width, self.height ) |
|
image = helper.render(res) |
|
|
|
|
|
|
|
|
|
image.helper = helper |
|
|
|
return image |
|
|
|
def render_to_file(self, res, file_object): |
|
""" |
|
Renders the image into a File resource. |
|
Returns the size of the data that has been written. |
|
""" |
|
format = self.format |
|
image = self.render(res) |
|
qBuffer = QBuffer() |
|
image.save(qBuffer, format) |
|
file_object.write(qBuffer.buffer().data()) |
|
return qBuffer.size() |
|
|
|
def render_to_bytes(self, res): |
|
"""Renders the image into an object of type 'str'""" |
|
format = self.format |
|
image = self.render(res) |
|
qBuffer = QBuffer() |
|
image.save(qBuffer, format) |
|
return qBuffer.buffer().data() |
|
|
|
|
|
class CookieJar(QNetworkCookieJar): |
|
def __init__(self, cookies, qtUrl, parent=None): |
|
QNetworkCookieJar.__init__(self, parent) |
|
for cookie in cookies: |
|
QNetworkCookieJar.setCookiesFromUrl(self, QNetworkCookie.parseCookies(QByteArray(cookie)), qtUrl) |
|
|
|
def allCookies(self): |
|
return QNetworkCookieJar.allCookies(self) |
|
|
|
def setAllCookies(self, cookieList): |
|
QNetworkCookieJar.setAllCookies(self, cookieList) |
|
|
|
class _WebkitRendererHelper(QObject): |
|
""" |
|
This helper class is doing the real work. It is required to |
|
allow WebkitRenderer.render() to be called "asynchronously" |
|
(but always from Qt's GUI thread). |
|
""" |
|
|
|
def __init__(self, parent): |
|
""" |
|
Copies the properties from the parent (WebkitRenderer) object, |
|
creates the required instances of QWebPage, QWebView and QMainWindow |
|
and registers some Slots. |
|
""" |
|
QObject.__init__(self) |
|
|
|
|
|
for key,value in parent.__dict__.items(): |
|
setattr(self,key,value) |
|
|
|
|
|
proxy = QNetworkProxy(QNetworkProxy.NoProxy) |
|
if 'http_proxy' in os.environ: |
|
proxy_url = QUrl(os.environ['http_proxy']) |
|
if unicode(proxy_url.scheme()).startswith('http'): |
|
protocol = QNetworkProxy.HttpProxy |
|
else: |
|
protocol = QNetworkProxy.Socks5Proxy |
|
|
|
proxy = QNetworkProxy( |
|
protocol, |
|
proxy_url.host(), |
|
proxy_url.port(), |
|
proxy_url.userName(), |
|
proxy_url.password() |
|
) |
|
|
|
|
|
self._page = CustomWebPage(logger=self.logger, ignore_alert=self.ignoreAlert, |
|
ignore_confirm=self.ignoreConfirm, ignore_prompt=self.ignorePrompt, |
|
interrupt_js=self.interruptJavaScript) |
|
self._page.networkAccessManager().setProxy(proxy) |
|
self._view = QWebView() |
|
self._view.setPage(self._page) |
|
self._window = QMainWindow() |
|
self._window.setCentralWidget(self._view) |
|
|
|
|
|
for key, value in self.qWebSettings.iteritems(): |
|
self._page.settings().setAttribute(key, value) |
|
|
|
|
|
self.connect(self._page, SIGNAL("loadFinished(bool)"), self._on_load_finished) |
|
self.connect(self._page, SIGNAL("loadStarted()"), self._on_load_started) |
|
self.connect(self._page.networkAccessManager(), SIGNAL("sslErrors(QNetworkReply *,const QList<QSslError>&)"), self._on_ssl_errors) |
|
self.connect(self._page.networkAccessManager(), SIGNAL("finished(QNetworkReply *)"), self._on_each_reply) |
|
|
|
|
|
self._page.mainFrame().setScrollBarPolicy(Qt.Horizontal, Qt.ScrollBarAlwaysOff) |
|
self._page.mainFrame().setScrollBarPolicy(Qt.Vertical, Qt.ScrollBarAlwaysOff) |
|
self._page.settings().setUserStyleSheetUrl(QUrl("data:text/css,html,body{overflow-y:hidden !important;}")) |
|
|
|
|
|
self._window.show() |
|
|
|
def __del__(self): |
|
""" |
|
Clean up Qt4 objects. |
|
""" |
|
self._window.close() |
|
del self._window |
|
del self._view |
|
del self._page |
|
|
|
def render(self, res): |
|
""" |
|
The real worker. Loads the page (_load_page) and awaits |
|
the end of the given 'delay'. While it is waiting outstanding |
|
QApplication events are processed. |
|
After the given delay, the Window or Widget (depends |
|
on the value of 'grabWholeWindow' is drawn into a QPixmap |
|
and postprocessed (_post_process_image). |
|
""" |
|
self._load_page(res, self.width, self.height, self.timeout) |
|
|
|
|
|
if self.wait > 0: |
|
if self.logger: self.logger.debug("Waiting %d seconds " % self.wait) |
|
waitToTime = time.time() + self.wait |
|
while time.time() < waitToTime: |
|
if QApplication.hasPendingEvents(): |
|
QApplication.processEvents() |
|
|
|
if self.renderTransparentBackground: |
|
|
|
image = QImage(self._page.viewportSize(), QImage.Format_ARGB32) |
|
image.fill(QColor(255,0,0,0).rgba()) |
|
|
|
|
|
palette = self._view.palette() |
|
palette.setBrush(QPalette.Base, Qt.transparent) |
|
self._page.setPalette(palette) |
|
self._view.setAttribute(Qt.WA_OpaquePaintEvent, False) |
|
|
|
painter = QPainter(image) |
|
painter.setBackgroundMode(Qt.TransparentMode) |
|
self._page.mainFrame().render(painter) |
|
painter.end() |
|
else: |
|
if self.grabWholeWindow: |
|
|
|
|
|
|
|
self._view.activateWindow() |
|
image = QPixmap.grabWindow(self._window.winId()) |
|
else: |
|
image = QPixmap.grabWidget(self._window) |
|
|
|
return self._post_process_image(image) |
|
|
|
def _load_page(self, res, width, height, timeout): |
|
""" |
|
This method implements the logic for retrieving and displaying |
|
the requested page. |
|
""" |
|
|
|
|
|
|
|
cancelAt = time.time() + timeout |
|
self.__loading = True |
|
self.__loadingResult = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
if type(res) == tuple: |
|
url = res[1] |
|
else: |
|
url = res |
|
|
|
if self.encodedUrl: |
|
qtUrl = QUrl.fromEncoded(url) |
|
else: |
|
qtUrl = QUrl(url) |
|
|
|
|
|
self.cookieJar = CookieJar(self.cookies, qtUrl) |
|
self._page.networkAccessManager().setCookieJar(self.cookieJar) |
|
|
|
|
|
if type(res) == tuple: |
|
self._page.mainFrame().setHtml(res[0], qtUrl) |
|
else: |
|
self._page.mainFrame().load(qtUrl) |
|
|
|
while self.__loading: |
|
if timeout > 0 and time.time() >= cancelAt: |
|
raise RuntimeError("Request timed out on %s" % res) |
|
while QApplication.hasPendingEvents() and self.__loading: |
|
QCoreApplication.processEvents() |
|
|
|
if self.logger: self.logger.debug("Processing result") |
|
|
|
if self.__loading_result == False: |
|
if self.logger: self.logger.warning("Failed to load %s" % res) |
|
|
|
|
|
size = self._page.mainFrame().contentsSize() |
|
if self.logger: self.logger.debug("contentsSize: %s", size) |
|
if width > 0: |
|
size.setWidth(width) |
|
if height > 0: |
|
size.setHeight(height) |
|
|
|
self._window.resize(size) |
|
|
|
def _post_process_image(self, qImage): |
|
""" |
|
If 'scaleToWidth' or 'scaleToHeight' are set to a value |
|
greater than zero this method will scale the image |
|
using the method defined in 'scaleRatio'. |
|
""" |
|
if self.scaleToWidth > 0 or self.scaleToHeight > 0: |
|
|
|
if self.scaleRatio == 'keep': |
|
ratio = Qt.KeepAspectRatio |
|
elif self.scaleRatio in ['expand', 'crop']: |
|
ratio = Qt.KeepAspectRatioByExpanding |
|
else: |
|
ratio = Qt.IgnoreAspectRatio |
|
qImage = qImage.scaled(self.scaleToWidth, self.scaleToHeight, ratio, Qt.SmoothTransformation) |
|
if self.scaleRatio == 'crop': |
|
qImage = qImage.copy(0, 0, self.scaleToWidth, self.scaleToHeight) |
|
return qImage |
|
|
|
def _on_each_reply(self,reply): |
|
""" |
|
Logs each requested uri |
|
""" |
|
|
|
|
|
|
|
|
|
def _on_load_started(self): |
|
""" |
|
Slot that sets the '__loading' property to true |
|
""" |
|
if self.logger: self.logger.debug("loading started") |
|
self.__loading = True |
|
|
|
|
|
def _on_load_finished(self, result): |
|
"""Slot that sets the '__loading' property to false and stores |
|
the result code in '__loading_result'. |
|
""" |
|
if self.logger: self.logger.debug("loading finished with result %s", result) |
|
self.__loading = False |
|
self.__loading_result = result |
|
|
|
|
|
def _on_ssl_errors(self, reply, errors): |
|
""" |
|
Slot that writes SSL warnings into the log but ignores them. |
|
""" |
|
for e in errors: |
|
if self.logger: self.logger.warn("SSL: " + e.errorString()) |
|
reply.ignoreSslErrors() |
|
|
|
|
|
class CustomWebPage(QWebPage): |
|
def __init__(self, **kwargs): |
|
""" |
|
Class Initializer |
|
""" |
|
super(CustomWebPage, self).__init__() |
|
self.logger = kwargs.get('logger', None) |
|
self.ignore_alert = kwargs.get('ignore_alert', True) |
|
self.ignore_confirm = kwargs.get('ignore_confirm', True) |
|
self.ignore_prompt = kwargs.get('ignore_prompt', True) |
|
self.interrupt_js = kwargs.get('interrupt_js', True) |
|
|
|
def javaScriptAlert(self, frame, message): |
|
if self.logger: self.logger.debug('Alert: %s', message) |
|
if not self.ignore_alert: |
|
return super(CustomWebPage, self).javaScriptAlert(frame, message) |
|
|
|
def javaScriptConfirm(self, frame, message): |
|
if self.logger: self.logger.debug('Confirm: %s', message) |
|
if not self.ignore_confirm: |
|
return super(CustomWebPage, self).javaScriptConfirm(frame, message) |
|
else: |
|
return False |
|
|
|
def javaScriptPrompt(self, frame, message, default, result): |
|
""" |
|
This function is called whenever a JavaScript program running inside frame tries to prompt |
|
the user for input. The program may provide an optional message, msg, as well as a default value |
|
for the input in defaultValue. |
|
|
|
If the prompt was cancelled by the user the implementation should return false; |
|
otherwise the result should be written to result and true should be returned. |
|
If the prompt was not cancelled by the user, the implementation should return true and |
|
the result string must not be null. |
|
""" |
|
if self.logger: self.logger.debug('Prompt: %s (%s)' % (message, default)) |
|
if not self.ignore_prompt: |
|
return super(CustomWebPage, self).javaScriptPrompt(frame, message, default, result) |
|
else: |
|
return False |
|
|
|
def shouldInterruptJavaScript(self): |
|
""" |
|
This function is called when a JavaScript program is running for a long period of time. |
|
If the user wanted to stop the JavaScript the implementation should return true; otherwise false. |
|
""" |
|
if self.logger: self.logger.debug("WebKit ask to interrupt JavaScript") |
|
return self.interrupt_js |
|
|