Spaces:
Sleeping
Sleeping
#! /usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Copyright 2016 Google Inc. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""MPEG processing classes. | |
Tool for loading mpeg4 files and manipulating atoms. | |
""" | |
import io | |
import struct | |
from spatialmedia.mpeg import constants | |
def load(fh, position, end): | |
"""Loads the box located at a position in a mp4 file. | |
Args: | |
fh: file handle, input file handle. | |
position: int or None, current file position. | |
Returns: | |
box: box, box from loaded file location or None. | |
""" | |
if position is None: | |
position = fh.tell() | |
fh.seek(position) | |
header_size = 8 | |
size = struct.unpack(">I", fh.read(4))[0] | |
name = fh.read(4) | |
if size == 1: | |
size = struct.unpack(">Q", fh.read(8))[0] | |
header_size = 16 | |
if size < 8: | |
print("Error, invalid size {} in {} at {}".format(size, name, position)) | |
return None | |
if (position + size) > end: | |
print("Error: Leaf box size exceeds bounds.") | |
return None | |
new_box = Box() | |
new_box.name = name | |
new_box.position = position | |
new_box.header_size = header_size | |
new_box.content_size = size - header_size | |
new_box.contents = None | |
return new_box | |
class Box(object): | |
"""MPEG4 box contents and behaviour true for all boxes.""" | |
def __init__(self): | |
self.name = "" | |
self.position = 0 | |
self.header_size = 0 | |
self.content_size = 0 | |
self.contents = None | |
def content_start(self): | |
return self.position + self.header_size | |
def save(self, in_fh, out_fh, delta): | |
"""Save box contents prioritizing set contents. | |
Args: | |
in_fh: file handle, source to read box contents from. | |
out_fh: file handle, destination for written box contents. | |
delta: int, index update amount. | |
""" | |
if self.header_size == 16: | |
out_fh.write(struct.pack(">I", 1)) | |
out_fh.write(self.name) | |
out_fh.write(struct.pack(">Q", self.size())) | |
elif self.header_size == 8: | |
out_fh.write(struct.pack(">I", self.size())) | |
out_fh.write(self.name) | |
if self.content_start(): | |
in_fh.seek(self.content_start()) | |
if self.name == constants.TAG_STCO: | |
stco_copy(in_fh, out_fh, self, delta) | |
elif self.name == constants.TAG_CO64: | |
co64_copy(in_fh, out_fh, self, delta) | |
elif self.contents: | |
out_fh.write(self.contents) | |
else: | |
tag_copy(in_fh, out_fh, self.content_size) | |
def set(self, new_contents): | |
"""Sets / overwrites the box contents.""" | |
self.contents = new_contents | |
self.content_size = len(contents) | |
def size(self): | |
"""Total size of a box. | |
Returns: | |
Int, total size in bytes of the box. | |
""" | |
return self.header_size + self.content_size | |
def print_structure(self, indent=""): | |
"""Prints the box structure.""" | |
size1 = self.header_size | |
size2 = self.content_size | |
print("{0} {1} [{2}, {3}]".format(indent, self.name, size1, size2)) | |
def tag_copy(in_fh, out_fh, size): | |
"""Copies a block of data from in_fh to out_fh. | |
Args: | |
in_fh: file handle, source of uncached file contents. | |
out_fh: file handle, destination for saved file. | |
size: int, amount of data to copy. | |
""" | |
# On 32-bit systems reading / writing is limited to 2GB chunks. | |
# To prevent overflow, read/write 64 MB chunks. | |
block_size = 64 * 1024 * 1024 | |
while (size > block_size): | |
contents = in_fh.read(block_size) | |
out_fh.write(contents) | |
size = size - block_size | |
contents = in_fh.read(size) | |
out_fh.write(contents) | |
def index_copy(in_fh, out_fh, box, mode, mode_length, delta=0): | |
"""Update and copy index table for stco/co64 files. | |
Args: | |
in_fh: file handle, source to read index table from. | |
out_fh: file handle, destination for index file. | |
box: box, stco/co64 box to copy. | |
mode: string, bit packing mode for index entries. | |
mode_length: int, number of bytes for index entires. | |
delta: int, offset change for index entries. | |
""" | |
fh = in_fh | |
if not box.contents: | |
fh.seek(box.content_start()) | |
else: | |
fh = io.BytesIO(box.contents) | |
header = struct.unpack(">I", fh.read(4))[0] | |
values = struct.unpack(">I", fh.read(4))[0] | |
new_contents = [] | |
new_contents.append(struct.pack(">I", header)) | |
new_contents.append(struct.pack(">I", values)) | |
for i in range(values): | |
content = fh.read(mode_length) | |
content = struct.unpack(mode, content)[0] + delta | |
new_contents.append(struct.pack(mode, content)) | |
out_fh.write(b"".join(new_contents)) | |
def stco_copy(in_fh, out_fh, box, delta=0): | |
"""Copy for stco box. | |
Args: | |
in_fh: file handle, source to read index table from. | |
out_fh: file handle, destination for index file. | |
box: box, stco box to copy. | |
delta: int, offset change for index entries. | |
""" | |
index_copy(in_fh, out_fh, box, ">I", 4, delta) | |
def co64_copy(in_fh, out_fh, box, delta=0): | |
"""Copy for co64 box. | |
Args: | |
in_fh: file handle, source to read index table from. | |
out_fh: file handle, destination for index file. | |
box: box, co64 box to copy. | |
delta: int, offset change for index entries. | |
""" | |
index_copy(in_fh, out_fh, box, ">Q", 8, delta) |