thonypythony
commited on
Commit
•
19d4ffb
1
Parent(s):
e140c63
my helping scripts
Browse files- scripts/colormapdiffimgs.py +48 -0
- scripts/imgsdiff.py +24 -0
- scripts/movefortest.py +69 -0
- scripts/split_image.py +17 -0
- scripts/test.py +67 -0
- scripts/watermark.py +225 -0
- scripts/watermarkcore.py +332 -0
scripts/colormapdiffimgs.py
ADDED
@@ -0,0 +1,48 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from PIL import Image
|
2 |
+
|
3 |
+
def diffimage(name1, name2, colour="RGB"):
|
4 |
+
img1 = Image.open(name1).convert(colour)
|
5 |
+
img2 = Image.open(name2).convert(colour)
|
6 |
+
new_image = Image.new(colour, (img1.size[0], img1.size[1]), color=(255,255,255))
|
7 |
+
color_image = Image.new(colour, (img1.size[0], img1.size[1]), color=(255,255,255))
|
8 |
+
|
9 |
+
for i in range(img1.size[0]):
|
10 |
+
for j in range(img1.size[1]):
|
11 |
+
if (img1.getpixel((i,j)) != img2.getpixel((i,j))):
|
12 |
+
i1 = img1.getpixel((i,j))
|
13 |
+
i2 = img2.getpixel((i,j))
|
14 |
+
new_image.putpixel((i, j), tuple([i1[0]-i2[0],i1[1]-i2[1],i1[2]-i2[2]]))
|
15 |
+
|
16 |
+
if 0 <= i1[0]-i2[0] < 0:
|
17 |
+
color_image.putpixel((i, j), (0,255,255))
|
18 |
+
if 1 <= i1[0]-i2[0] < 2:
|
19 |
+
color_image.putpixel((i, j), (255,50,0))
|
20 |
+
if 2 <= i1[0]-i2[0] < 3:
|
21 |
+
color_image.putpixel((i, j), (255,100,0))
|
22 |
+
if i1[0]-i2[0] > 3:
|
23 |
+
color_image.putpixel((i, j), (255,0,0))
|
24 |
+
if 0 <= i1[1]-i2[1] < 1:
|
25 |
+
color_image.putpixel((i, j), (0,255,0))
|
26 |
+
if 1 <= i1[1]-i2[1] < 2:
|
27 |
+
color_image.putpixel((i, j), (0,255,50))
|
28 |
+
if 2 <= i1[1]-i2[1] < 3:
|
29 |
+
color_image.putpixel((i, j), (0,255,100))
|
30 |
+
if i1[1]-i2[1] > 3:
|
31 |
+
color_image.putpixel((i, j), (255,51,255))
|
32 |
+
if 0 <= i1[2]-i2[2] < 1:
|
33 |
+
color_image.putpixel((i, j), (0,0,255))
|
34 |
+
if 1 <= i1[2]-i2[2] < 2:
|
35 |
+
color_image.putpixel((i, j), (50,0,255)) # синий
|
36 |
+
if 2 <= i1[2]-i2[2] < 3:
|
37 |
+
color_image.putpixel((i, j), (100,0,255)) # фиолетовый
|
38 |
+
if i1[2]-i2[2] > 3:
|
39 |
+
color_image.putpixel((i, j), (51,255,51))
|
40 |
+
# color_image.show()
|
41 |
+
# new_image.show()
|
42 |
+
new_image.save(f"diff_for_{name1[7:-4]}_and_{name2[12:-4]}.png")
|
43 |
+
color_image.save(f"colordiff_for_{name1[7:-4]}_and_{name2[12:-4]}.png")
|
44 |
+
|
45 |
+
name1 = "60.jpg"
|
46 |
+
name2 = "60wm.jpg"
|
47 |
+
|
48 |
+
diffimage(name1, name2)
|
scripts/imgsdiff.py
ADDED
@@ -0,0 +1,24 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from PIL import Image
|
2 |
+
|
3 |
+
img1 = Image.open('60.jpg')#.convert("RGB")
|
4 |
+
img2 = Image.open('60wm.jpg')#.convert("RGB")
|
5 |
+
|
6 |
+
N = 0
|
7 |
+
new_image = Image.new("RGB", (img1.size[0], img1.size[1]), color=(255,255,255))
|
8 |
+
|
9 |
+
for i in range(img1.size[0]):
|
10 |
+
for j in range(img1.size[1]):
|
11 |
+
if (img1.getpixel((i,j)) != img2.getpixel((i,j))):
|
12 |
+
N += 1
|
13 |
+
i1 = img1.getpixel((i,j))
|
14 |
+
i2 = img2.getpixel((i,j))
|
15 |
+
#print(i1)
|
16 |
+
new_image.putpixel((i, j), tuple([i1[0]-i2[0],i1[1]-i2[1],i1[2]-i2[2]])) # for "RGB"
|
17 |
+
# new_image.putpixel((i, j), tuple([i1-i2]))
|
18 |
+
|
19 |
+
print("diff pixls = ", N) # diff pixls = 179958
|
20 |
+
print(f"{N*100/(img1.size[0]*img1.size[1]):.2f}%")
|
21 |
+
|
22 |
+
|
23 |
+
new_image.show()
|
24 |
+
new_image.save(f"diff_encryption={N*100/(img1.size[0]*img1.size[1]):.2f}%.png")
|
scripts/movefortest.py
ADDED
@@ -0,0 +1,69 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
'''
|
2 |
+
DATASET/:
|
3 |
+
test/:
|
4 |
+
0/ # without watermark
|
5 |
+
1/ # with
|
6 |
+
train/:
|
7 |
+
0/
|
8 |
+
1/
|
9 |
+
validation/:
|
10 |
+
0/
|
11 |
+
1/
|
12 |
+
'''
|
13 |
+
|
14 |
+
from PIL import Image
|
15 |
+
import os, PIL, shutil, time
|
16 |
+
from glob import glob
|
17 |
+
|
18 |
+
|
19 |
+
def renameimg(path):
|
20 |
+
os.getcwd()
|
21 |
+
for i, filename in enumerate(os.listdir(path)):
|
22 |
+
try:
|
23 |
+
os.rename(path + "/" + filename, path + "/" + str(i) + ".jpeg")
|
24 |
+
|
25 |
+
except FileExistsError:
|
26 |
+
pass
|
27 |
+
|
28 |
+
def resize(path, color_mode):
|
29 |
+
dirs = os.listdir(path)
|
30 |
+
print('before resize ', len(dirs))
|
31 |
+
for item in dirs:
|
32 |
+
try:
|
33 |
+
# print(item)
|
34 |
+
with Image.open(fr'{path}/{item}') as im:
|
35 |
+
resized = im.convert(f'{color_mode}').resize((Width,Height))
|
36 |
+
resized.save(fr'{path}/{item[:-5]}.jpg')
|
37 |
+
time.sleep(0.0003)
|
38 |
+
# print(fr'for {item} have been done')
|
39 |
+
except PIL.UnidentifiedImageError:
|
40 |
+
print(fr"Confirmed: This image {path}/{item} cannot be opened!")
|
41 |
+
# os.remove(f'{path}{item}')
|
42 |
+
except OSError:
|
43 |
+
im = Image.open(fr'{path}/{item}').convert(f'{color_mode}').resize((Width,Height))
|
44 |
+
im.save(fr'{path}/{item[:-5]}.jpg')
|
45 |
+
print(fr"Chanched by hands for {path}/{item}")
|
46 |
+
dirs = os.listdir(path)
|
47 |
+
print('after resize ', len(dirs))
|
48 |
+
|
49 |
+
def moveimg(fromdir, todir, STOP):
|
50 |
+
for i, filename in enumerate(os.listdir(fromdir)):
|
51 |
+
if i == STOP:
|
52 |
+
break
|
53 |
+
else:
|
54 |
+
shutil.move(fromdir + "/" + filename, todir + "/" + filename)
|
55 |
+
i += 1
|
56 |
+
|
57 |
+
def removejpeg(fromdir):
|
58 |
+
images = glob(f'{fromdir}/*.jpeg')
|
59 |
+
for image in images:
|
60 |
+
os.remove(image)
|
61 |
+
|
62 |
+
Width, Height = 1120, 1120
|
63 |
+
path = "fortestfromcoco"
|
64 |
+
color_mode = "RGB"
|
65 |
+
# renameimg(path)
|
66 |
+
# resize(path, color_mode)
|
67 |
+
# removejpeg(path)
|
68 |
+
|
69 |
+
# moveimg(path, '0', 494)
|
scripts/split_image.py
ADDED
@@ -0,0 +1,17 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from PIL import Image
|
2 |
+
from glob import glob
|
3 |
+
|
4 |
+
path = '11wm/*.jpg'
|
5 |
+
|
6 |
+
N = 5 # разделить на N**2
|
7 |
+
|
8 |
+
images = glob(path)
|
9 |
+
k = 1
|
10 |
+
for image in images:
|
11 |
+
im = Image.open(image)
|
12 |
+
for i in range(N):
|
13 |
+
for j in range(N):
|
14 |
+
if i!=N and j!=N:
|
15 |
+
im.crop(box=(im.size[0]/N*i, im.size[1]/N*j, im.size[0]/N*(i+1), im.size[1]/N*(j+1))).\
|
16 |
+
save(f'1/{k}.jpg')
|
17 |
+
k += 1
|
scripts/test.py
ADDED
@@ -0,0 +1,67 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""Простой скрипт для тестирования работы водяных знаков"""
|
2 |
+
|
3 |
+
from glob import glob
|
4 |
+
from watermark import WaterMark
|
5 |
+
|
6 |
+
|
7 |
+
def embed_watermark(
|
8 |
+
src_img_filename: str, dst_img_filename: str, wm_content: str, password: int
|
9 |
+
) -> int:
|
10 |
+
"""Внедрить водяной знак в изображение"""
|
11 |
+
|
12 |
+
watermark = WaterMark()
|
13 |
+
|
14 |
+
return watermark.embed_and_save(
|
15 |
+
src_img_filename, wm_content, password, dst_img_filename
|
16 |
+
)
|
17 |
+
|
18 |
+
|
19 |
+
def extract_watermark(filename: str, wm_size: int, password: int) -> str:
|
20 |
+
"""Извлечь водяной знак из изображения"""
|
21 |
+
|
22 |
+
watermark = WaterMark()
|
23 |
+
|
24 |
+
return watermark.extract_img_filename(filename, password, wm_size)
|
25 |
+
|
26 |
+
|
27 |
+
SRC_IMG_PATHNAME_PATTERN = "11/*.jpg"
|
28 |
+
SRC_TXT = "101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010"
|
29 |
+
PASSWORD = 123456789
|
30 |
+
|
31 |
+
|
32 |
+
def main():
|
33 |
+
"""Точка входа"""
|
34 |
+
|
35 |
+
# watermark_size = embed_watermark(
|
36 |
+
# "photos/2.jpg", DST_IMG_FILENAME, SRC_TXT, PASSWORD
|
37 |
+
# )
|
38 |
+
|
39 |
+
# dst_txt = extract_watermark(DST_IMG_FILENAME, watermark_size, PASSWORD)
|
40 |
+
|
41 |
+
# if dst_txt != SRC_TXT:
|
42 |
+
# print(False, dst_txt)
|
43 |
+
|
44 |
+
# return
|
45 |
+
|
46 |
+
src_img_filenames = glob(SRC_IMG_PATHNAME_PATTERN)
|
47 |
+
i = 1
|
48 |
+
for src_img_filename in src_img_filenames:
|
49 |
+
DST_IMG_FILENAME = f"11wm/{src_img_filename[2:]}"
|
50 |
+
watermark_size = embed_watermark(
|
51 |
+
src_img_filename, DST_IMG_FILENAME, SRC_TXT, PASSWORD
|
52 |
+
)
|
53 |
+
|
54 |
+
dst_txt = extract_watermark(DST_IMG_FILENAME, watermark_size, PASSWORD)
|
55 |
+
|
56 |
+
if dst_txt != SRC_TXT:
|
57 |
+
print(False, dst_txt)
|
58 |
+
continue
|
59 |
+
|
60 |
+
# break
|
61 |
+
|
62 |
+
print(i, True)
|
63 |
+
i += 1
|
64 |
+
|
65 |
+
|
66 |
+
if __name__ == "__main__":
|
67 |
+
main()
|
scripts/watermark.py
ADDED
@@ -0,0 +1,225 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""Файл, хранящий класс WaterMark и функции для шифрования"""
|
2 |
+
|
3 |
+
import numpy as np
|
4 |
+
import cv2
|
5 |
+
|
6 |
+
from cv2.typing import MatLike
|
7 |
+
|
8 |
+
from watermarkcore import WaterMarkCore
|
9 |
+
|
10 |
+
|
11 |
+
class WaterMark:
|
12 |
+
"""Класс-обёртка для работы с водяными знаками"""
|
13 |
+
|
14 |
+
def __init__(self):
|
15 |
+
self.wm_core = WaterMarkCore()
|
16 |
+
|
17 |
+
def embed_and_save(
|
18 |
+
self,
|
19 |
+
src_img_filename: str,
|
20 |
+
wm_content: str,
|
21 |
+
password: str,
|
22 |
+
img_path: str,
|
23 |
+
compression_ratio: int = 100,
|
24 |
+
) -> int:
|
25 |
+
"""Внедрить водяной знак в изображение и сохранить его"""
|
26 |
+
|
27 |
+
img, wm_size = self.embed(src_img_filename, wm_content, password)
|
28 |
+
|
29 |
+
cv2.imwrite(
|
30 |
+
img_path,
|
31 |
+
img,
|
32 |
+
params=[cv2.IMWRITE_JPEG_QUALITY, compression_ratio],
|
33 |
+
)
|
34 |
+
|
35 |
+
return wm_size
|
36 |
+
|
37 |
+
def embed(
|
38 |
+
self, src_img_filename: str, wm_content: str, password: str
|
39 |
+
) -> tuple[MatLike, int]:
|
40 |
+
"""Внедрить водяной знак в изображение"""
|
41 |
+
|
42 |
+
self.prepare_img_filename(src_img_filename)
|
43 |
+
wm_size = self.prepare_wm(wm_content, password)
|
44 |
+
img = self.wm_core.embed(password)
|
45 |
+
|
46 |
+
# print(
|
47 |
+
# "\nFill percentage without duplication",
|
48 |
+
# wm_size
|
49 |
+
# / (
|
50 |
+
# img.shape[0]
|
51 |
+
# * img.shape[1]
|
52 |
+
# / (
|
53 |
+
# self.wm_core.BLOCK_SHAPE[0]
|
54 |
+
# * self.wm_core.BLOCK_SHAPE[1]
|
55 |
+
# * np.dtype(np.float32).itemsize
|
56 |
+
# )
|
57 |
+
# ),
|
58 |
+
# )
|
59 |
+
|
60 |
+
return (img, wm_size)
|
61 |
+
|
62 |
+
def prepare_img_filename(self, filename: str) -> MatLike:
|
63 |
+
"""Подготовить изображение из файла"""
|
64 |
+
|
65 |
+
img = cv2.imread(filename, flags=cv2.IMREAD_UNCHANGED)
|
66 |
+
assert img is not None, f"Image file \"'{filename}'\" is unreadable!"
|
67 |
+
|
68 |
+
self.wm_core.prepare_img_arr(img)
|
69 |
+
|
70 |
+
def prepare_wm(
|
71 |
+
self, wm_content: str, password: int, encoding: str = "utf-8"
|
72 |
+
) -> int:
|
73 |
+
"""Зашифровать и подготовить водяной знак"""
|
74 |
+
|
75 |
+
wm_bit = encrypt_wm_content(wm_content, password, encoding)
|
76 |
+
self.wm_core.prepare_wm(wm_bit)
|
77 |
+
|
78 |
+
return wm_bit.size
|
79 |
+
|
80 |
+
def extract_img_filename(self, filename: str, password: int, wm_size: int) -> str:
|
81 |
+
"""Извлечь водяной знак из файла"""
|
82 |
+
|
83 |
+
img = cv2.imread(filename, flags=cv2.IMREAD_COLOR)
|
84 |
+
assert img is not None, f"Image file \"'{filename}'\" is unreadable!"
|
85 |
+
|
86 |
+
return self.extract_img(img, password, wm_size)
|
87 |
+
|
88 |
+
def extract_img(self, img: MatLike, password: int, wm_size: int) -> str:
|
89 |
+
"""Извлечь водяной знак из изображения"""
|
90 |
+
|
91 |
+
wm_avg = self.wm_core.extract_with_kmeans(img, wm_size, password)
|
92 |
+
|
93 |
+
return decrypt_wm_content(wm_avg, password, wm_size)
|
94 |
+
|
95 |
+
|
96 |
+
def encrypt_wm_content(
|
97 |
+
wm_content: str, password: int, encoding: str = "utf-8"
|
98 |
+
) -> np.ndarray[bool]:
|
99 |
+
"""Зашифровать водяной знак"""
|
100 |
+
|
101 |
+
bits = txt_to_bits(wm_content, encoding)
|
102 |
+
shuffle_array(bits, password)
|
103 |
+
|
104 |
+
return bits
|
105 |
+
|
106 |
+
|
107 |
+
def decrypt_wm_content(
|
108 |
+
wm_bits: np.ndarray[bool], password: int, wm_size: int, encoding: str = "utf-8"
|
109 |
+
) -> str:
|
110 |
+
"""Дешифровать водяной знак"""
|
111 |
+
|
112 |
+
deshuffle_array(wm_bits, password, wm_size)
|
113 |
+
wm_content = bits_to_txt(wm_bits, encoding)
|
114 |
+
|
115 |
+
return wm_content
|
116 |
+
|
117 |
+
|
118 |
+
def shuffle_array(array: np.ndarray, seed: int):
|
119 |
+
"""Перемешать случайные блоки массива битов"""
|
120 |
+
|
121 |
+
random_state = np.random.RandomState(seed)
|
122 |
+
random_state.shuffle(array)
|
123 |
+
|
124 |
+
|
125 |
+
def deshuffle_array(array: np.ndarray, seed: int, wm_size: int):
|
126 |
+
"""Восстановить блоки перемешанного массива"""
|
127 |
+
|
128 |
+
indexes_for_deshuffle = np.arange(wm_size)
|
129 |
+
shuffle_array(indexes_for_deshuffle, seed)
|
130 |
+
array[indexes_for_deshuffle] = array.copy()
|
131 |
+
|
132 |
+
|
133 |
+
def txt_to_bits(txt: str, encoding: str = "utf-8") -> np.ndarray[bool]:
|
134 |
+
"""Преобразовать водяной знак ("abc") в массив битов [True, False].
|
135 |
+
txt -> hex_str -> num -> bin_str -> bin_chars -> bits"""
|
136 |
+
|
137 |
+
hex_str = hex_encode(txt, encoding)
|
138 |
+
num = hex_str_to_num(hex_str)
|
139 |
+
bin_str = num_to_bin_str(num)
|
140 |
+
bin_chars = str_to_chars(bin_str)
|
141 |
+
bits = bin_chars_to_bits(bin_chars)
|
142 |
+
|
143 |
+
return bits
|
144 |
+
|
145 |
+
|
146 |
+
def bits_to_txt(bits: np.ndarray[bool], encoding: str = "utf-8"):
|
147 |
+
"""Преобразовать массив битов [True, False] в водяной знак ("abc").
|
148 |
+
bits -> bin_chars -> bin_str -> num -> hex_str -> txt"""
|
149 |
+
|
150 |
+
bin_chars = bits_to_bin_chars(bits)
|
151 |
+
bin_str = chars_to_str(bin_chars)
|
152 |
+
num = bin_str_to_num(bin_str)
|
153 |
+
hex_str = num_to_hex_str(num)
|
154 |
+
txt = hex_decode(hex_str, encoding)
|
155 |
+
|
156 |
+
return txt
|
157 |
+
|
158 |
+
|
159 |
+
def hex_encode(txt: str, encoding: str = "utf-8") -> str:
|
160 |
+
"""Побайтово кодировать строку ("abc") в заданной кодировке (по умолчанию: "utf-8"),
|
161 |
+
а затем преобразовать в строку с 16-ми представлениями этих байтов ("123F")"""
|
162 |
+
|
163 |
+
encoded_txt = txt.encode(encoding)
|
164 |
+
|
165 |
+
return encoded_txt.hex()
|
166 |
+
|
167 |
+
|
168 |
+
def hex_decode(hex_str: str, encoding: str = "utf-8") -> str:
|
169 |
+
"""Преобразовать строку с 16-ми представлениями символов в байты,
|
170 |
+
а затем побайтово декодировать строку из символов ("abc") в заданной кодировке (по умолчанию: "utf-8").
|
171 |
+
Заменяет нечитаемые символы символом (�)"""
|
172 |
+
|
173 |
+
hex_bytes = bytes.fromhex(hex_str)
|
174 |
+
|
175 |
+
return hex_bytes.decode(encoding, errors="replace")
|
176 |
+
|
177 |
+
|
178 |
+
def hex_str_to_num(hex_str: str) -> int:
|
179 |
+
"""Преобразовать строку с 16-м числом ("67F") в 10-е число (123)"""
|
180 |
+
|
181 |
+
return int(hex_str, base=16)
|
182 |
+
|
183 |
+
|
184 |
+
def num_to_hex_str(num: int) -> str:
|
185 |
+
"""Преобразовать 10-е число (123) в строку с 16-м числом ("67F")"""
|
186 |
+
|
187 |
+
return hex(num)[2:]
|
188 |
+
|
189 |
+
|
190 |
+
def num_to_bin_str(num: int) -> str:
|
191 |
+
"""Преобразовать 10-е число (123)
|
192 |
+
в 2-е число в виде строки без указания системы счисления в начале строки ("10101")
|
193 |
+
"""
|
194 |
+
|
195 |
+
return bin(num)[2:]
|
196 |
+
|
197 |
+
|
198 |
+
def bin_str_to_num(bin_str: str) -> int:
|
199 |
+
"""Преобразовать 2-е число в виде строки без указания системы счисления в начале строки ("10101")
|
200 |
+
в 10-е число (123)"""
|
201 |
+
|
202 |
+
return int(bin_str, base=2)
|
203 |
+
|
204 |
+
|
205 |
+
def str_to_chars(txt: str) -> np.ndarray[str]:
|
206 |
+
"""Преобразовать строку ('abc') в массив символов ['a', 'b', 'c']"""
|
207 |
+
|
208 |
+
return np.array(list(txt))
|
209 |
+
|
210 |
+
|
211 |
+
def chars_to_str(bin_chars: list[str]) -> str:
|
212 |
+
"""Преобразовать массив символов ['a', 'b', 'c'] в строку ('abc')"""
|
213 |
+
|
214 |
+
return "".join(bin_chars)
|
215 |
+
|
216 |
+
|
217 |
+
def bin_chars_to_bits(bin_chars: np.ndarray[str]) -> np.ndarray[bool]:
|
218 |
+
"""Преобразовать бинарные символы ['1', '0'] в массив битов [True, False]"""
|
219 |
+
|
220 |
+
return bin_chars == "1"
|
221 |
+
|
222 |
+
|
223 |
+
def bits_to_bin_chars(bits: np.ndarray[bool]) -> list[str]:
|
224 |
+
"""Преобразовать массив битов [True, False] в бинарные символы ['1', '0']"""
|
225 |
+
return [str(int(bit)) for bit in bits]
|
scripts/watermarkcore.py
ADDED
@@ -0,0 +1,332 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""Файл, хранящий класс WaterMarkCore и функцию-кластеризатор (метод k-средних)"""
|
2 |
+
|
3 |
+
from random import Random
|
4 |
+
|
5 |
+
import copy
|
6 |
+
import numpy as np
|
7 |
+
import cv2
|
8 |
+
|
9 |
+
from cv2 import dct, idct
|
10 |
+
from cv2.typing import MatLike
|
11 |
+
from numpy.linalg import svd
|
12 |
+
from pywt import dwt2, idwt2
|
13 |
+
|
14 |
+
|
15 |
+
class WaterMarkCore:
|
16 |
+
"""Класс-ядро для работы с водяными знаками"""
|
17 |
+
|
18 |
+
# Размер блока
|
19 |
+
# Чем больше размер, тем выше стойкость, но тем больше искажается выходное изображение
|
20 |
+
BLOCK_SHAPE: np.ndarray[np.int32] = np.array([3, 3])
|
21 |
+
|
22 |
+
MIN_D = 2 # Нижний порог интервала квантования
|
23 |
+
MAX_D = 9 # Верхний порог интервала квантования
|
24 |
+
DUPLICATE_NUM = 10
|
25 |
+
|
26 |
+
def __init__(self):
|
27 |
+
self.ca_block = [np.array([])] * 3 # Результаты для: dct на канал
|
28 |
+
self.block_num = 0 # Количество фрагментов информации, которые могут быть вставлены в исходное изображение
|
29 |
+
self.wm_size = 0 # Высота и ширина водяного знака
|
30 |
+
self.img_shape = None # Размер изображения
|
31 |
+
self.part_shape = None # Это округленный двумерный размер ca, который используется для игнорирования смещения элементов справа и снизу при встраивании
|
32 |
+
self.block_index = None # Декартово произведение всех индексов блоков
|
33 |
+
self.ca_block_shape = None # Кортеж (кол-во блоков в высоту, кол-во блоков в ширину, высота блока, ширина блока)
|
34 |
+
self.wm_bits = None # Водяной знак (массив битов)
|
35 |
+
|
36 |
+
# Аппроксимация от двумерного дискретного вейвлет-преобразования
|
37 |
+
self.ca = [np.array([])] * 3
|
38 |
+
|
39 |
+
# Коэффициенты от двумерного дискретного вейвлет-преобразования
|
40 |
+
# (horizontal detail, vertical detail and diagonal detail coefficients respectively)
|
41 |
+
self.hvd = [np.array([])] * 3
|
42 |
+
|
43 |
+
def embed(self, password: int) -> MatLike:
|
44 |
+
"""Внедрить подготовленный водяной знак в подготовленное изображение"""
|
45 |
+
|
46 |
+
self.init_block_index()
|
47 |
+
embed_ca = copy.deepcopy(self.ca)
|
48 |
+
embed_yuv = [np.array([])] * 3
|
49 |
+
seed1 = str(password)[::2]
|
50 |
+
seed2 = str(password)[1::2]
|
51 |
+
|
52 |
+
for channel in range(3):
|
53 |
+
random1 = Random(seed1)
|
54 |
+
random2 = Random(seed2)
|
55 |
+
blocks_args = [
|
56 |
+
(
|
57 |
+
self.ca_block[channel][self.block_index[i]],
|
58 |
+
i,
|
59 |
+
random1.randint(WaterMarkCore.MIN_D, WaterMarkCore.MAX_D),
|
60 |
+
random2.randint(WaterMarkCore.MIN_D, WaterMarkCore.MAX_D),
|
61 |
+
)
|
62 |
+
for i in range(self.block_num)
|
63 |
+
]
|
64 |
+
tmp = list(map(self.block_add_wm, blocks_args))
|
65 |
+
|
66 |
+
for i in range(self.block_num):
|
67 |
+
self.ca_block[channel][self.block_index[i]] = tmp[i]
|
68 |
+
|
69 |
+
# Четырехмерное преобразование в двухмерное
|
70 |
+
# Каждый канал хранит четырехмерный массив,
|
71 |
+
# представляющий результат четырехмерной разбивки.
|
72 |
+
# После четырехмерной разбивки иногда часть отсутствует из-за нецелочисленного деления.
|
73 |
+
# ca_part - это часть ca, которая отсутствует в данном канале
|
74 |
+
ca_part = np.concatenate(np.concatenate(self.ca_block[channel], 1), 1)
|
75 |
+
|
76 |
+
# При 4-мерном разбиении правая и нижняя часть длинной полосы,
|
77 |
+
# которая не является делимой, сохраняется,
|
78 |
+
# а остальное - это основная часть данных,
|
79 |
+
# которая после встраивания преобразуется в частотную область.
|
80 |
+
embed_ca[channel][: self.part_shape[0], : self.part_shape[1]] = ca_part
|
81 |
+
|
82 |
+
# инверсия
|
83 |
+
embed_yuv[channel] = idwt2((embed_ca[channel], self.hvd[channel]), "haar")
|
84 |
+
|
85 |
+
# Объединить 3 канала
|
86 |
+
embed_img_yuv = np.stack(embed_yuv, axis=2)
|
87 |
+
|
88 |
+
# Ранее, если оно не было целым числом 2, оно добавляло белую рамку, которая здесь удалена
|
89 |
+
embed_img_yuv = embed_img_yuv[: self.img_shape[0], : self.img_shape[1]]
|
90 |
+
embed_img = cv2.cvtColor(embed_img_yuv, cv2.COLOR_YUV2BGR)
|
91 |
+
embed_img = np.clip(embed_img, a_min=0, a_max=255)
|
92 |
+
|
93 |
+
return embed_img
|
94 |
+
|
95 |
+
def extract_with_kmeans(
|
96 |
+
self, img: MatLike, wm_size: int, password: int
|
97 |
+
) -> np.ndarray[bool]:
|
98 |
+
"""Извлечь кластеризированный водяной знак из изображения"""
|
99 |
+
|
100 |
+
wm_avg = self.extract(img, wm_size, password)
|
101 |
+
|
102 |
+
return one_dim_kmeans(wm_avg)
|
103 |
+
|
104 |
+
def extract(
|
105 |
+
self, img: MatLike, wm_size: int, password: int
|
106 |
+
) -> np.ndarray[np.float64]:
|
107 |
+
"""Извлечь водяной знак из изображения"""
|
108 |
+
|
109 |
+
wm_raw_bits = self.extract_raw(img, password)
|
110 |
+
|
111 |
+
return extract_avg(wm_raw_bits, wm_size)
|
112 |
+
|
113 |
+
def extract_raw(self, img: MatLike, password: int) -> np.ndarray[np.float64]:
|
114 |
+
"""Извлечь необработанные биты, из каждого блока"""
|
115 |
+
|
116 |
+
self.prepare_img_arr(img)
|
117 |
+
self.init_block_index()
|
118 |
+
wm_raw_bits = np.zeros(shape=(3, self.block_num))
|
119 |
+
seed1 = str(password)[::2]
|
120 |
+
seed2 = str(password)[1::2]
|
121 |
+
|
122 |
+
for channel in range(3):
|
123 |
+
random1 = Random(seed1)
|
124 |
+
random2 = Random(seed2)
|
125 |
+
blocks_args = [
|
126 |
+
(
|
127 |
+
self.ca_block[channel][self.block_index[i]],
|
128 |
+
random1.randint(WaterMarkCore.MIN_D, WaterMarkCore.MAX_D),
|
129 |
+
random2.randint(WaterMarkCore.MIN_D, WaterMarkCore.MAX_D),
|
130 |
+
)
|
131 |
+
for i in range(self.block_num)
|
132 |
+
]
|
133 |
+
wm_raw_bits[channel, :] = list(map(self.block_get_wm, blocks_args))
|
134 |
+
|
135 |
+
return wm_raw_bits
|
136 |
+
|
137 |
+
def prepare_img_arr(self, img: MatLike):
|
138 |
+
"""Подготовить изображение.
|
139 |
+
Считывание изображения ->
|
140 |
+
YUVise ->
|
141 |
+
добавление белой границы, чтобы сделать пиксели равномерными ->
|
142 |
+
4D чанкинг"""
|
143 |
+
|
144 |
+
img = img.astype(np.float32)
|
145 |
+
|
146 |
+
# shape (высота, ширина, кол-во цветов), img_shape (высота, ширина)
|
147 |
+
self.img_shape = img.shape[:2]
|
148 |
+
|
149 |
+
# Y (яркость) UV (цвет)
|
150 |
+
img_yuv = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)
|
151 |
+
|
152 |
+
# Дополнить изображение чёрной рамкой в 1 пиксель в нижней и/или левой части,
|
153 |
+
# чтобы ширина и высота были чётными
|
154 |
+
img_yuv_with_border = cv2.copyMakeBorder(
|
155 |
+
img_yuv,
|
156 |
+
0,
|
157 |
+
img.shape[0] % 2,
|
158 |
+
0,
|
159 |
+
img.shape[1] % 2,
|
160 |
+
cv2.BORDER_CONSTANT,
|
161 |
+
value=(0, 0, 0),
|
162 |
+
)
|
163 |
+
|
164 |
+
ca_shape = [(i + 1) // 2 for i in self.img_shape] # центр
|
165 |
+
self.ca_block_shape = (
|
166 |
+
ca_shape[0] // WaterMarkCore.BLOCK_SHAPE[0],
|
167 |
+
ca_shape[1] // WaterMarkCore.BLOCK_SHAPE[1],
|
168 |
+
WaterMarkCore.BLOCK_SHAPE[0],
|
169 |
+
WaterMarkCore.BLOCK_SHAPE[1],
|
170 |
+
)
|
171 |
+
|
172 |
+
# Размер/шаг в байтах для построения свёртки
|
173 |
+
strides = np.dtype(np.float32).itemsize * np.array(
|
174 |
+
[
|
175 |
+
ca_shape[1] * WaterMarkCore.BLOCK_SHAPE[0],
|
176 |
+
WaterMarkCore.BLOCK_SHAPE[1],
|
177 |
+
ca_shape[1],
|
178 |
+
1, # Сдвиг по каналам
|
179 |
+
]
|
180 |
+
)
|
181 |
+
|
182 |
+
for channel in range(3):
|
183 |
+
# haar, db1, bior1/1, rbio1/1
|
184 |
+
self.ca[channel], self.hvd[channel] = dwt2(
|
185 |
+
img_yuv_with_border[:, :, channel], "haar"
|
186 |
+
) # Двумерное дискретное вейвлет-преобразование (Тип вейвлет: "haar")
|
187 |
+
# Переход к 4 измерениям
|
188 |
+
self.ca_block[channel] = np.lib.stride_tricks.as_strided(
|
189 |
+
self.ca[channel].astype(np.float32), self.ca_block_shape, strides
|
190 |
+
) # Построение свёртки исходного изображения (разделение на блоки)
|
191 |
+
|
192 |
+
def prepare_wm(self, wm_bits: np.ndarray[bool]):
|
193 |
+
"""Подготовить водяной знака"""
|
194 |
+
|
195 |
+
self.wm_bits = wm_bits
|
196 |
+
self.wm_size = wm_bits.size
|
197 |
+
|
198 |
+
def init_block_index(self):
|
199 |
+
"""Подготовить информацию о блоках"""
|
200 |
+
|
201 |
+
self.block_num = self.ca_block_shape[0] * self.ca_block_shape[1]
|
202 |
+
|
203 |
+
assert self.wm_size < self.block_num, IndexError(
|
204 |
+
f"До {self.block_num * 3 / 128} кб встроенной информации, \
|
205 |
+
более {self.wm_size * 3 / 128} кб информации с водяными знаками, переполнение"
|
206 |
+
)
|
207 |
+
|
208 |
+
self.part_shape = self.ca_block_shape[:2] * WaterMarkCore.BLOCK_SHAPE
|
209 |
+
|
210 |
+
self.block_index = [
|
211 |
+
(i, j)
|
212 |
+
for i in range(self.ca_block_shape[0])
|
213 |
+
for j in range(self.ca_block_shape[1])
|
214 |
+
]
|
215 |
+
|
216 |
+
def block_add_wm(self, args) -> MatLike:
|
217 |
+
"""Внедрить информацию о водяном знаке в блок.
|
218 |
+
d1, d2 ∊ N; d1, d2 > 1"""
|
219 |
+
|
220 |
+
block: np.ndarray[np.ndarray[np.float64]]
|
221 |
+
i: int
|
222 |
+
d1: int
|
223 |
+
d2: int
|
224 |
+
block, i, d1, d2 = args
|
225 |
+
|
226 |
+
if (
|
227 |
+
WaterMarkCore.DUPLICATE_NUM is not None
|
228 |
+
and i >= self.wm_size * WaterMarkCore.DUPLICATE_NUM
|
229 |
+
):
|
230 |
+
return block
|
231 |
+
|
232 |
+
bit: bool = self.wm_bits[i % self.wm_size]
|
233 |
+
|
234 |
+
block_dct = dct(block) # Дискретное косинус-преобразование
|
235 |
+
|
236 |
+
u: np.ndarray[np.ndarray[np.float32]]
|
237 |
+
s: np.ndarray[np.float32]
|
238 |
+
v: np.ndarray[np.ndarray[np.float32]]
|
239 |
+
u, s, v = svd(block_dct) # Сингулярное разложение
|
240 |
+
|
241 |
+
s[0] = quantization(s[0], d1, bit)
|
242 |
+
s[1] = quantization(s[1], d2, bit)
|
243 |
+
|
244 |
+
inverse_block_dct = isvd(u, s, v)
|
245 |
+
inverse_block = idct(
|
246 |
+
inverse_block_dct
|
247 |
+
) # Обратное дискретное косинус-преобразование
|
248 |
+
|
249 |
+
return inverse_block
|
250 |
+
|
251 |
+
def block_get_wm(self, args):
|
252 |
+
"""Извлечь информацию о водяном знаке из блока.
|
253 |
+
d1, d2 ∊ N; d1, d2 > 1"""
|
254 |
+
|
255 |
+
block: np.ndarray[np.ndarray[np.float64]]
|
256 |
+
d1: int
|
257 |
+
d2: int
|
258 |
+
block, d1, d2 = args
|
259 |
+
|
260 |
+
dct_block = dct(block)
|
261 |
+
|
262 |
+
s: np.ndarray[np.float32]
|
263 |
+
_, s, _ = svd(dct_block)
|
264 |
+
|
265 |
+
bit1 = reverse_quantization(s[0], d1)
|
266 |
+
bit2 = reverse_quantization(s[1], d2)
|
267 |
+
|
268 |
+
# Первый столбец более устойчив к помехам
|
269 |
+
# и имеет больший коэффициент, чем второй (3 к 1)
|
270 |
+
bit = (bit1 * 3 + bit2 * 1) / 4
|
271 |
+
|
272 |
+
return bit
|
273 |
+
|
274 |
+
|
275 |
+
def quantization(num: np.float32, d: int, bit: bool) -> np.float32:
|
276 |
+
"""Квантовать сигнал в число с заданным коэффициентом"""
|
277 |
+
return (num // d + 0.25 + 0.5 * bit) * d
|
278 |
+
|
279 |
+
|
280 |
+
def reverse_quantization(num: np.float32, d: int) -> bool:
|
281 |
+
"""Восстановить квантованный сигнал из числа с заданным коэффициентом"""
|
282 |
+
return num % d > d / 2
|
283 |
+
|
284 |
+
|
285 |
+
def isvd(u, s, v):
|
286 |
+
"""Обратное сингулярное разложение"""
|
287 |
+
return np.dot(u, np.dot(np.diag(s), v))
|
288 |
+
|
289 |
+
|
290 |
+
def extract_avg(array: np.ndarray[np.float64], wm_size: int) -> np.ndarray[np.float64]:
|
291 |
+
"""Извлечь массив средних арифметических дубликатов битов водяного знака.
|
292 |
+
Каждый элемент массива является средним арифметическим между дубликатами
|
293 |
+
квантованного числа от отдельного бита исходного водяного знака из каждого канала(2),
|
294 |
+
каждого блока (Кол-во блоков // размер водяного знака)
|
295 |
+
и каждого из используемых коэффициентов сингулярного разложения (2)"""
|
296 |
+
|
297 |
+
wm_bits_avg = np.zeros(np.int32(wm_size))
|
298 |
+
|
299 |
+
for i in range(wm_size):
|
300 |
+
repeated_wm_bit = array[:, i::wm_size]
|
301 |
+
if WaterMarkCore.DUPLICATE_NUM is not None:
|
302 |
+
repeated_wm_bit = repeated_wm_bit[:, : WaterMarkCore.DUPLICATE_NUM]
|
303 |
+
wm_bits_avg[i] = repeated_wm_bit.mean()
|
304 |
+
|
305 |
+
return wm_bits_avg
|
306 |
+
|
307 |
+
|
308 |
+
def one_dim_kmeans(inputs: np.ndarray[np.float64], iter_num=300) -> np.ndarray[bool]:
|
309 |
+
"""Кластеризировать входные точки (метод k-средних)"""
|
310 |
+
|
311 |
+
threshold = 0
|
312 |
+
e_tol = 10 ** (-6)
|
313 |
+
center = [inputs.min(), inputs.max()] # 1. Инициализация центральной точки
|
314 |
+
|
315 |
+
for _ in range(iter_num):
|
316 |
+
threshold = (center[0] + center[1]) / 2
|
317 |
+
|
318 |
+
# 2. Проверка расстояния между всеми точками и этими k точками,
|
319 |
+
# каждая из которых классифицирована до ближайшего центра
|
320 |
+
is_class01 = inputs > threshold
|
321 |
+
|
322 |
+
# 3. Вычисление новой центральной точки
|
323 |
+
center = [inputs[~is_class01].mean(), inputs[is_class01].mean()]
|
324 |
+
|
325 |
+
# 4. Условие остановки
|
326 |
+
if np.abs((center[0] + center[1]) / 2 - threshold) < e_tol:
|
327 |
+
threshold = (center[0] + center[1]) / 2
|
328 |
+
break
|
329 |
+
|
330 |
+
is_class01 = inputs > threshold
|
331 |
+
|
332 |
+
return is_class01
|