|
from googleapiclient.discovery import build |
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
from google.auth.transport.requests import Request |
|
import io |
|
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload |
|
import os |
|
import pickle |
|
from tqdm import tqdm |
|
import cv2 |
|
import re |
|
import subprocess |
|
|
|
class TextOcr(): |
|
def __init__(self, ocrType): |
|
self.service=None |
|
self.ocrType=ocrType |
|
|
|
|
|
def getGoogleCred(self,): |
|
SCOPES = ['https://www.googleapis.com/auth/drive'] |
|
creds = None |
|
|
|
|
|
|
|
if os.path.exists('token.pickle'): |
|
with open('token.pickle', 'rb') as token: |
|
creds = pickle.load(token) |
|
|
|
if not creds or not creds.valid: |
|
if creds and creds.expired and creds.refresh_token: |
|
creds.refresh(Request()) |
|
else: |
|
flow = InstalledAppFlow.from_client_secrets_file( |
|
'credentials.json', SCOPES) |
|
creds = flow.run_local_server(port=0) |
|
|
|
with open('token.pickle', 'wb') as token: |
|
pickle.dump(creds, token) |
|
service = build('drive', 'v3', credentials=creds) |
|
return service |
|
|
|
def filterText(self,inputText): |
|
inputText = re.sub('[\\\\+/§◎*)@<>#%(&=$_\-^01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz:;«¢~「」〃ゝゞヽヾ一●▲・ヽ÷①↓®▽■◆『£〆∴∞▼™↑←]', '', inputText) |
|
inputText = ''.join(inputText.split()) |
|
return inputText |
|
|
|
|
|
def getTextGoogleOcr(self,img): |
|
if self.service is None: |
|
self.service=self.getGoogleCred() |
|
|
|
exceptionCount=0 |
|
while exceptionCount<5: |
|
try: |
|
|
|
txtPath = 'googleocr.txt' |
|
imgPath="googleocr.jpg" |
|
cv2.imwrite(imgPath, img) |
|
mime = 'application/vnd.google-apps.document' |
|
res = self.service.files().create( |
|
body={'name': imgPath, |
|
'mimeType': mime }, |
|
media_body=MediaFileUpload(imgPath, mimetype=mime, resumable=True) ).execute() |
|
downloader = MediaIoBaseDownload( |
|
io.FileIO(txtPath, 'wb'), |
|
self.service.files().export_media(fileId=res['id'], mimeType="text/plain")) |
|
done = False |
|
while done is False: |
|
status, done = downloader.next_chunk() |
|
self.service.files().delete(fileId=res['id']).execute() |
|
with open(txtPath, "r", encoding="utf-8" ) as f: text_google = f.read() |
|
text_google=text_google.replace('\ufeff', '') |
|
text_google=self.filterText(text_google) |
|
except: |
|
exceptionCount+=1 |
|
continue |
|
break |
|
return text_google |
|
|
|
def getTextWindowOcr(self,img): |
|
inputFile="lib_/input.jpg" |
|
outputFile='lib_/output.txt' |
|
cv2.imwrite(inputFile, img) |
|
p = subprocess.Popen(('./lib_/winocr/winocr.exe')) |
|
p.wait() |
|
with open(outputFile, "r", encoding="utf-8" ) as f: text = f.read() |
|
if os.path.exists(inputFile): os.remove(inputFile) |
|
if os.path.exists(outputFile): os.remove(outputFile) |
|
text=self.filterText(text) |
|
return text |
|
|
|
def checkWindowOcr(self,): |
|
p = subprocess.Popen(('./lib_/winocr/winocr.exe')) |
|
p.wait() |
|
if os.path.exists("./lib_/loadResult.txt"): |
|
with open("./lib_/loadResult.txt", "r", encoding="utf-8" ) as f: text = f.read() |
|
if text=="True": |
|
return True |
|
return False |
|
|
|
def getTextFromImg(self,imgPath,rectList,textOnlyFolder): |
|
fileName=os.path.basename(imgPath) |
|
img = cv2.imread(textOnlyFolder+fileName) |
|
textList=[] |
|
rectP,rect=rectList |
|
for x1,y1,x2,y2 in rectP: |
|
|
|
cropped = img[y1: y2, x1: x2] |
|
|
|
if self.ocrType=="googleocr": |
|
text=self.getTextGoogleOcr(cropped) |
|
elif self.ocrType=="windowocr": |
|
text=self.getTextWindowOcr(cropped) |
|
textList+=[text] |
|
|
|
return textList |
|
|
|
|