| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import logging |
| import os |
|
|
|
|
| |
| logger = logging.getLogger('hwp5') |
|
|
| loglevel = os.environ.get('PYHWP_LOGLEVEL') |
| if loglevel: |
| loglevel = dict(DEBUG=logging.DEBUG, |
| INFO=logging.INFO, |
| WARNING=logging.WARNING, |
| ERROR=logging.ERROR, |
| CRITICAL=logging.CRITICAL).get(loglevel.upper(), |
| logging.WARNING) |
| logger.setLevel(loglevel) |
| del loglevel |
|
|
| filename = os.environ.get('PYHWP_LOGFILE') |
| if filename: |
| logger.addHandler(logging.FileHandler(filename)) |
| del filename |
|
|
|
|
| import sys |
| logger.info('sys.executable = %s', sys.executable) |
| logger.info('sys.version = %s', sys.version) |
| logger.info('sys.path:') |
| for path in sys.path: |
| logger.info('- %s', path) |
|
|
|
|
| try: |
| import uno |
| import unohelper |
| import unokit |
| from unokit.util import propseq_to_dict |
| from unokit.util import dict_to_propseq |
| from unokit.util import xenumeration_list |
| from unokit.adapters import InputStreamFromFileLike |
|
|
| from com.sun.star.lang import XInitialization |
| from com.sun.star.document import XFilter, XImporter, XExtendedFilterDetection |
| from com.sun.star.task import XJobExecutor |
|
|
| def log_exception(f): |
| def wrapper(*args, **kwargs): |
| try: |
| return f(*args, **kwargs) |
| except Exception as e: |
| logger.exception(e) |
| raise |
| return wrapper |
|
|
| g_ImplementationHelper = unohelper.ImplementationHelper() |
|
|
| def implementation(component_name, *services): |
| def decorator(cls): |
| g_ImplementationHelper.addImplementation(cls, component_name, services) |
| return cls |
| return decorator |
|
|
|
|
| @implementation('hwp5.Detector', 'com.sun.star.document.ExtendedTypeDetection') |
| class Detector(unokit.Base, XExtendedFilterDetection): |
|
|
| @log_exception |
| @unokit.component_context |
| def detect(self, mediadesc): |
| from hwp5_uno import typedetect |
|
|
| logger.info('hwp5.Detector detect()') |
|
|
| desc = propseq_to_dict(mediadesc) |
| for k, v in desc.items(): |
| logger.debug('\t%s: %s', k, v) |
|
|
| inputstream = desc['InputStream'] |
|
|
| typename = typedetect(inputstream) |
|
|
| logger.info('hwp5.Detector: %s detected.', typename) |
| return typename, mediadesc |
|
|
|
|
| @implementation('hwp5.Importer', 'com.sun.star.document.ImportFilter') |
| class Importer(unokit.Base, XInitialization, XFilter, XImporter): |
|
|
| @log_exception |
| @unokit.component_context |
| def initialize(self, args): |
| logger.debug('Importer initialize: %s', args) |
|
|
| @log_exception |
| @unokit.component_context |
| def setTargetDocument(self, target): |
| logger.debug('Importer setTargetDocument: %s', target) |
| self.target = target |
|
|
| @log_exception |
| @unokit.component_context |
| def filter(self, mediadesc): |
| from hwp5.dataio import ParseError |
| from hwp5_uno import HwpFileFromInputStream |
| from hwp5_uno import load_hwp5file_into_doc |
|
|
| logger.debug('Importer filter') |
| desc = propseq_to_dict(mediadesc) |
|
|
| logger.debug('mediadesc: %s', str(desc.keys())) |
| for k, v in desc.items(): |
| logger.debug('%s: %s', k, str(v)) |
|
|
| statusindicator = desc.get('StatusIndicator') |
|
|
| inputstream = desc['InputStream'] |
| hwpfile = HwpFileFromInputStream(inputstream) |
| try: |
| load_hwp5file_into_doc(hwpfile, self.target, statusindicator) |
| except ParseError as e: |
| e.print_to_logger(logger) |
| return False |
| except Exception as e: |
| logger.exception(e) |
| return False |
| else: |
| return True |
|
|
| @unokit.component_context |
| def cancel(self): |
| logger.debug('Importer cancel') |
|
|
|
|
| @implementation('hwp5.TestJob') |
| class TestJob(unokit.Base, XJobExecutor): |
|
|
| @unokit.component_context |
| def trigger(self, args): |
| logger.debug('testjob %s', args) |
|
|
| wd = args |
|
|
| import os |
| original_wd = os.getcwd() |
| try: |
| os.chdir(wd) |
|
|
| from unittest import TextTestRunner |
| testrunner = TextTestRunner() |
|
|
| from unittest import TestSuite |
| testrunner.run(TestSuite(self.tests())) |
| finally: |
| os.chdir(original_wd) |
|
|
| def tests(self): |
| from unittest import defaultTestLoader |
| yield defaultTestLoader.loadTestsFromTestCase(DetectorTest) |
| yield defaultTestLoader.loadTestsFromTestCase(ImporterTest) |
| from hwp5_uno.tests import test_hwp5_uno |
| yield defaultTestLoader.loadTestsFromModule(test_hwp5_uno) |
| from hwp5.tests import test_suite |
| yield test_suite() |
|
|
|
|
| from unittest import TestCase |
| class DetectorTest(TestCase): |
|
|
| def test_detect(self): |
| context = uno.getComponentContext() |
|
|
| from hwp5.tests.fixtures import open_fixture |
| f = open_fixture('sample-5017.hwp', 'rb') |
| stream = InputStreamFromFileLike(f) |
| mediadesc = dict_to_propseq(dict(InputStream=stream)) |
|
|
| svm = context.ServiceManager |
| detector = svm.createInstanceWithContext('hwp5.Detector', context) |
| typename, mediadesc2 = detector.detect(mediadesc) |
| self.assertEqual('hwp5', typename) |
|
|
| class ImporterTest(TestCase): |
|
|
| def test_filter(self): |
| context = uno.getComponentContext() |
| from hwp5.tests.fixtures import open_fixture |
| f = open_fixture('sample-5017.hwp', 'rb') |
| stream = InputStreamFromFileLike(f) |
| mediadesc = dict_to_propseq(dict(InputStream=stream)) |
|
|
| svm = context.ServiceManager |
| importer = svm.createInstanceWithContext('hwp5.Importer', context) |
| desktop = svm.createInstanceWithContext('com.sun.star.frame.Desktop', |
| context) |
| doc = desktop.loadComponentFromURL('private:factory/swriter', '_blank', |
| 0, ()) |
|
|
| importer.setTargetDocument(doc) |
| importer.filter(mediadesc) |
|
|
| text = doc.getText() |
|
|
| paragraphs = text.createEnumeration() |
| paragraphs = xenumeration_list(paragraphs) |
| for paragraph_ix, paragraph in enumerate(paragraphs): |
| logger.info('Paragraph %s', paragraph_ix) |
| logger.debug('%s', paragraph) |
|
|
| services = paragraph.SupportedServiceNames |
| if 'com.sun.star.text.Paragraph' in services: |
| portions = xenumeration_list(paragraph.createEnumeration()) |
| for portion_ix, portion in enumerate(portions): |
| logger.info('Portion %s: %s', portion_ix, |
| portion.TextPortionType) |
| if portion.TextPortionType == 'Text': |
| logger.info('- %s', portion.getString()) |
| elif portion.TextPortionType == 'Frame': |
| logger.debug('%s', portion) |
| textcontent_name = 'com.sun.star.text.TextContent' |
| en = portion.createContentEnumeration(textcontent_name) |
| contents = xenumeration_list(en) |
| for content in contents: |
| logger.debug('content: %s', content) |
| content_services = content.SupportedServiceNames |
| if ('com.sun.star.drawing.GraphicObjectShape' in |
| content_services): |
| logger.info('graphic url: %s', |
| content.GraphicURL) |
| logger.info('graphic stream url: %s', |
| content.GraphicStreamURL) |
| if 'com.sun.star.text.TextTable' in services: |
| pass |
| else: |
| pass |
|
|
| paragraph_portions = paragraphs[0].createEnumeration() |
| paragraph_portions = xenumeration_list(paragraph_portions) |
| self.assertEqual(u'한글 ', paragraph_portions[0].getString()) |
|
|
| paragraph_portions = paragraphs[16].createEnumeration() |
| paragraph_portions = xenumeration_list(paragraph_portions) |
| contents = paragraph_portions[1].createContentEnumeration('com.sun.star.text.TextContent') |
| contents = xenumeration_list(contents) |
| self.assertEqual('image/x-vclgraphic', contents[0].Bitmap.MimeType) |
| |
| |
|
|
| graphics = doc.getGraphicObjects() |
| graphics = xenumeration_list(graphics.createEnumeration()) |
| logger.debug('graphic: %s', graphics) |
|
|
| frames = doc.getTextFrames() |
| frames = xenumeration_list(frames.createEnumeration()) |
| logger.debug('frames: %s', frames) |
| except Exception as e: |
| logger.exception(e) |
| raise |
|
|