Buckets:
| """Stuff to parse Sun and NeXT audio files. | |
| An audio file consists of a header followed by the data. The structure | |
| of the header is as follows. | |
| +---------------+ | |
| | magic word | | |
| +---------------+ | |
| | header size | | |
| +---------------+ | |
| | data size | | |
| +---------------+ | |
| | encoding | | |
| +---------------+ | |
| | sample rate | | |
| +---------------+ | |
| | # of channels | | |
| +---------------+ | |
| | info | | |
| | | | |
| +---------------+ | |
| The magic word consists of the 4 characters '.snd'. Apart from the | |
| info field, all header fields are 4 bytes in size. They are all | |
| 32-bit unsigned integers encoded in big-endian byte order. | |
| The header size really gives the start of the data. | |
| The data size is the physical size of the data. From the other | |
| parameters the number of frames can be calculated. | |
| The encoding gives the way in which audio samples are encoded. | |
| Possible values are listed below. | |
| The info field currently consists of an ASCII string giving a | |
| human-readable description of the audio file. The info field is | |
| padded with NUL bytes to the header size. | |
| Usage. | |
| Reading audio files: | |
| f = sunau.open(file, 'r') | |
| where file is either the name of a file or an open file pointer. | |
| The open file pointer must have methods read(), seek(), and close(). | |
| When the setpos() and rewind() methods are not used, the seek() | |
| method is not necessary. | |
| This returns an instance of a class with the following public methods: | |
| getnchannels() -- returns number of audio channels (1 for | |
| mono, 2 for stereo) | |
| getsampwidth() -- returns sample width in bytes | |
| getframerate() -- returns sampling frequency | |
| getnframes() -- returns number of audio frames | |
| getcomptype() -- returns compression type ('NONE' or 'ULAW') | |
| getcompname() -- returns human-readable version of | |
| compression type ('not compressed' matches 'NONE') | |
| getparams() -- returns a namedtuple consisting of all of the | |
| above in the above order | |
| getmarkers() -- returns None (for compatibility with the | |
| aifc module) | |
| getmark(id) -- raises an error since the mark does not | |
| exist (for compatibility with the aifc module) | |
| readframes(n) -- returns at most n frames of audio | |
| rewind() -- rewind to the beginning of the audio stream | |
| setpos(pos) -- seek to the specified position | |
| tell() -- return the current position | |
| close() -- close the instance (make it unusable) | |
| The position returned by tell() and the position given to setpos() | |
| are compatible and have nothing to do with the actual position in the | |
| file. | |
| The close() method is called automatically when the class instance | |
| is destroyed. | |
| Writing audio files: | |
| f = sunau.open(file, 'w') | |
| where file is either the name of a file or an open file pointer. | |
| The open file pointer must have methods write(), tell(), seek(), and | |
| close(). | |
| This returns an instance of a class with the following public methods: | |
| setnchannels(n) -- set the number of channels | |
| setsampwidth(n) -- set the sample width | |
| setframerate(n) -- set the frame rate | |
| setnframes(n) -- set the number of frames | |
| setcomptype(type, name) | |
| -- set the compression type and the | |
| human-readable compression type | |
| setparams(tuple)-- set all parameters at once | |
| tell() -- return current position in output file | |
| writeframesraw(data) | |
| -- write audio frames without pathing up the | |
| file header | |
| writeframes(data) | |
| -- write audio frames and patch up the file header | |
| close() -- patch up the file header and close the | |
| output file | |
| You should set the parameters before the first writeframesraw or | |
| writeframes. The total number of frames does not need to be set, | |
| but when it is set to the correct value, the header does not have to | |
| be patched up. | |
| It is best to first set all parameters, perhaps possibly the | |
| compression type, and then write audio frames using writeframesraw. | |
| When all frames have been written, either call writeframes(b'') or | |
| close() to patch up the sizes in the header. | |
| The close() method is called automatically when the class instance | |
| is destroyed. | |
| """ | |
| from collections import namedtuple | |
| _sunau_params = namedtuple('_sunau_params', | |
| 'nchannels sampwidth framerate nframes comptype compname') | |
| # from <multimedia/audio_filehdr.h> | |
| AUDIO_FILE_MAGIC = 0x2e736e64 | |
| AUDIO_FILE_ENCODING_MULAW_8 = 1 | |
| AUDIO_FILE_ENCODING_LINEAR_8 = 2 | |
| AUDIO_FILE_ENCODING_LINEAR_16 = 3 | |
| AUDIO_FILE_ENCODING_LINEAR_24 = 4 | |
| AUDIO_FILE_ENCODING_LINEAR_32 = 5 | |
| AUDIO_FILE_ENCODING_FLOAT = 6 | |
| AUDIO_FILE_ENCODING_DOUBLE = 7 | |
| AUDIO_FILE_ENCODING_ADPCM_G721 = 23 | |
| AUDIO_FILE_ENCODING_ADPCM_G722 = 24 | |
| AUDIO_FILE_ENCODING_ADPCM_G723_3 = 25 | |
| AUDIO_FILE_ENCODING_ADPCM_G723_5 = 26 | |
| AUDIO_FILE_ENCODING_ALAW_8 = 27 | |
| # from <multimedia/audio_hdr.h> | |
| AUDIO_UNKNOWN_SIZE = 0xFFFFFFFF # ((unsigned)(~0)) | |
| _simple_encodings = [AUDIO_FILE_ENCODING_MULAW_8, | |
| AUDIO_FILE_ENCODING_LINEAR_8, | |
| AUDIO_FILE_ENCODING_LINEAR_16, | |
| AUDIO_FILE_ENCODING_LINEAR_24, | |
| AUDIO_FILE_ENCODING_LINEAR_32, | |
| AUDIO_FILE_ENCODING_ALAW_8] | |
| class Error(Exception): | |
| pass | |
| def _read_u32(file): | |
| x = 0 | |
| for i in range(4): | |
| byte = file.read(1) | |
| if not byte: | |
| raise EOFError | |
| x = x*256 + ord(byte) | |
| return x | |
| def _write_u32(file, x): | |
| data = [] | |
| for i in range(4): | |
| d, m = divmod(x, 256) | |
| data.insert(0, int(m)) | |
| x = d | |
| file.write(bytes(data)) | |
| class Au_read: | |
| def __init__(self, f): | |
| if type(f) == type(''): | |
| import builtins | |
| f = builtins.open(f, 'rb') | |
| self._opened = True | |
| else: | |
| self._opened = False | |
| self.initfp(f) | |
| def __del__(self): | |
| if self._file: | |
| self.close() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |
| def initfp(self, file): | |
| self._file = file | |
| self._soundpos = 0 | |
| magic = int(_read_u32(file)) | |
| if magic != AUDIO_FILE_MAGIC: | |
| raise Error('bad magic number') | |
| self._hdr_size = int(_read_u32(file)) | |
| if self._hdr_size < 24: | |
| raise Error('header size too small') | |
| if self._hdr_size > 100: | |
| raise Error('header size ridiculously large') | |
| self._data_size = _read_u32(file) | |
| if self._data_size != AUDIO_UNKNOWN_SIZE: | |
| self._data_size = int(self._data_size) | |
| self._encoding = int(_read_u32(file)) | |
| if self._encoding not in _simple_encodings: | |
| raise Error('encoding not (yet) supported') | |
| if self._encoding in (AUDIO_FILE_ENCODING_MULAW_8, | |
| AUDIO_FILE_ENCODING_ALAW_8): | |
| self._sampwidth = 2 | |
| self._framesize = 1 | |
| elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_8: | |
| self._framesize = self._sampwidth = 1 | |
| elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_16: | |
| self._framesize = self._sampwidth = 2 | |
| elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_24: | |
| self._framesize = self._sampwidth = 3 | |
| elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_32: | |
| self._framesize = self._sampwidth = 4 | |
| else: | |
| raise Error('unknown encoding') | |
| self._framerate = int(_read_u32(file)) | |
| self._nchannels = int(_read_u32(file)) | |
| if not self._nchannels: | |
| raise Error('bad # of channels') | |
| self._framesize = self._framesize * self._nchannels | |
| if self._hdr_size > 24: | |
| self._info = file.read(self._hdr_size - 24) | |
| self._info, _, _ = self._info.partition(b'\0') | |
| else: | |
| self._info = b'' | |
| try: | |
| self._data_pos = file.tell() | |
| except (AttributeError, OSError): | |
| self._data_pos = None | |
| def getfp(self): | |
| return self._file | |
| def getnchannels(self): | |
| return self._nchannels | |
| def getsampwidth(self): | |
| return self._sampwidth | |
| def getframerate(self): | |
| return self._framerate | |
| def getnframes(self): | |
| if self._data_size == AUDIO_UNKNOWN_SIZE: | |
| return AUDIO_UNKNOWN_SIZE | |
| if self._encoding in _simple_encodings: | |
| return self._data_size // self._framesize | |
| return 0 # XXX--must do some arithmetic here | |
| def getcomptype(self): | |
| if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: | |
| return 'ULAW' | |
| elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8: | |
| return 'ALAW' | |
| else: | |
| return 'NONE' | |
| def getcompname(self): | |
| if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: | |
| return 'CCITT G.711 u-law' | |
| elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8: | |
| return 'CCITT G.711 A-law' | |
| else: | |
| return 'not compressed' | |
| def getparams(self): | |
| return _sunau_params(self.getnchannels(), self.getsampwidth(), | |
| self.getframerate(), self.getnframes(), | |
| self.getcomptype(), self.getcompname()) | |
| def getmarkers(self): | |
| return None | |
| def getmark(self, id): | |
| raise Error('no marks') | |
| def readframes(self, nframes): | |
| if self._encoding in _simple_encodings: | |
| if nframes == AUDIO_UNKNOWN_SIZE: | |
| data = self._file.read() | |
| else: | |
| data = self._file.read(nframes * self._framesize) | |
| self._soundpos += len(data) // self._framesize | |
| if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: | |
| import audioop | |
| data = audioop.ulaw2lin(data, self._sampwidth) | |
| return data | |
| return None # XXX--not implemented yet | |
| def rewind(self): | |
| if self._data_pos is None: | |
| raise OSError('cannot seek') | |
| self._file.seek(self._data_pos) | |
| self._soundpos = 0 | |
| def tell(self): | |
| return self._soundpos | |
| def setpos(self, pos): | |
| if pos < 0 or pos > self.getnframes(): | |
| raise Error('position not in range') | |
| if self._data_pos is None: | |
| raise OSError('cannot seek') | |
| self._file.seek(self._data_pos + pos * self._framesize) | |
| self._soundpos = pos | |
| def close(self): | |
| file = self._file | |
| if file: | |
| self._file = None | |
| if self._opened: | |
| file.close() | |
| class Au_write: | |
| def __init__(self, f): | |
| if type(f) == type(''): | |
| import builtins | |
| f = builtins.open(f, 'wb') | |
| self._opened = True | |
| else: | |
| self._opened = False | |
| self.initfp(f) | |
| def __del__(self): | |
| if self._file: | |
| self.close() | |
| self._file = None | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |
| def initfp(self, file): | |
| self._file = file | |
| self._framerate = 0 | |
| self._nchannels = 0 | |
| self._sampwidth = 0 | |
| self._framesize = 0 | |
| self._nframes = AUDIO_UNKNOWN_SIZE | |
| self._nframeswritten = 0 | |
| self._datawritten = 0 | |
| self._datalength = 0 | |
| self._info = b'' | |
| self._comptype = 'ULAW' # default is U-law | |
| def setnchannels(self, nchannels): | |
| if self._nframeswritten: | |
| raise Error('cannot change parameters after starting to write') | |
| if nchannels not in (1, 2, 4): | |
| raise Error('only 1, 2, or 4 channels supported') | |
| self._nchannels = nchannels | |
| def getnchannels(self): | |
| if not self._nchannels: | |
| raise Error('number of channels not set') | |
| return self._nchannels | |
| def setsampwidth(self, sampwidth): | |
| if self._nframeswritten: | |
| raise Error('cannot change parameters after starting to write') | |
| if sampwidth not in (1, 2, 3, 4): | |
| raise Error('bad sample width') | |
| self._sampwidth = sampwidth | |
| def getsampwidth(self): | |
| if not self._framerate: | |
| raise Error('sample width not specified') | |
| return self._sampwidth | |
| def setframerate(self, framerate): | |
| if self._nframeswritten: | |
| raise Error('cannot change parameters after starting to write') | |
| self._framerate = framerate | |
| def getframerate(self): | |
| if not self._framerate: | |
| raise Error('frame rate not set') | |
| return self._framerate | |
| def setnframes(self, nframes): | |
| if self._nframeswritten: | |
| raise Error('cannot change parameters after starting to write') | |
| if nframes < 0: | |
| raise Error('# of frames cannot be negative') | |
| self._nframes = nframes | |
| def getnframes(self): | |
| return self._nframeswritten | |
| def setcomptype(self, type, name): | |
| if type in ('NONE', 'ULAW'): | |
| self._comptype = type | |
| else: | |
| raise Error('unknown compression type') | |
| def getcomptype(self): | |
| return self._comptype | |
| def getcompname(self): | |
| if self._comptype == 'ULAW': | |
| return 'CCITT G.711 u-law' | |
| elif self._comptype == 'ALAW': | |
| return 'CCITT G.711 A-law' | |
| else: | |
| return 'not compressed' | |
| def setparams(self, params): | |
| nchannels, sampwidth, framerate, nframes, comptype, compname = params | |
| self.setnchannels(nchannels) | |
| self.setsampwidth(sampwidth) | |
| self.setframerate(framerate) | |
| self.setnframes(nframes) | |
| self.setcomptype(comptype, compname) | |
| def getparams(self): | |
| return _sunau_params(self.getnchannels(), self.getsampwidth(), | |
| self.getframerate(), self.getnframes(), | |
| self.getcomptype(), self.getcompname()) | |
| def tell(self): | |
| return self._nframeswritten | |
| def writeframesraw(self, data): | |
| if not isinstance(data, (bytes, bytearray)): | |
| data = memoryview(data).cast('B') | |
| self._ensure_header_written() | |
| if self._comptype == 'ULAW': | |
| import audioop | |
| data = audioop.lin2ulaw(data, self._sampwidth) | |
| nframes = len(data) // self._framesize | |
| self._file.write(data) | |
| self._nframeswritten = self._nframeswritten + nframes | |
| self._datawritten = self._datawritten + len(data) | |
| def writeframes(self, data): | |
| self.writeframesraw(data) | |
| if self._nframeswritten != self._nframes or \ | |
| self._datalength != self._datawritten: | |
| self._patchheader() | |
| def close(self): | |
| if self._file: | |
| try: | |
| self._ensure_header_written() | |
| if self._nframeswritten != self._nframes or \ | |
| self._datalength != self._datawritten: | |
| self._patchheader() | |
| self._file.flush() | |
| finally: | |
| file = self._file | |
| self._file = None | |
| if self._opened: | |
| file.close() | |
| # | |
| # private methods | |
| # | |
| def _ensure_header_written(self): | |
| if not self._nframeswritten: | |
| if not self._nchannels: | |
| raise Error('# of channels not specified') | |
| if not self._sampwidth: | |
| raise Error('sample width not specified') | |
| if not self._framerate: | |
| raise Error('frame rate not specified') | |
| self._write_header() | |
| def _write_header(self): | |
| if self._comptype == 'NONE': | |
| if self._sampwidth == 1: | |
| encoding = AUDIO_FILE_ENCODING_LINEAR_8 | |
| self._framesize = 1 | |
| elif self._sampwidth == 2: | |
| encoding = AUDIO_FILE_ENCODING_LINEAR_16 | |
| self._framesize = 2 | |
| elif self._sampwidth == 3: | |
| encoding = AUDIO_FILE_ENCODING_LINEAR_24 | |
| self._framesize = 3 | |
| elif self._sampwidth == 4: | |
| encoding = AUDIO_FILE_ENCODING_LINEAR_32 | |
| self._framesize = 4 | |
| else: | |
| raise Error('internal error') | |
| elif self._comptype == 'ULAW': | |
| encoding = AUDIO_FILE_ENCODING_MULAW_8 | |
| self._framesize = 1 | |
| else: | |
| raise Error('internal error') | |
| self._framesize = self._framesize * self._nchannels | |
| _write_u32(self._file, AUDIO_FILE_MAGIC) | |
| header_size = 25 + len(self._info) | |
| header_size = (header_size + 7) & ~7 | |
| _write_u32(self._file, header_size) | |
| if self._nframes == AUDIO_UNKNOWN_SIZE: | |
| length = AUDIO_UNKNOWN_SIZE | |
| else: | |
| length = self._nframes * self._framesize | |
| try: | |
| self._form_length_pos = self._file.tell() | |
| except (AttributeError, OSError): | |
| self._form_length_pos = None | |
| _write_u32(self._file, length) | |
| self._datalength = length | |
| _write_u32(self._file, encoding) | |
| _write_u32(self._file, self._framerate) | |
| _write_u32(self._file, self._nchannels) | |
| self._file.write(self._info) | |
| self._file.write(b'\0'*(header_size - len(self._info) - 24)) | |
| def _patchheader(self): | |
| if self._form_length_pos is None: | |
| raise OSError('cannot seek') | |
| self._file.seek(self._form_length_pos) | |
| _write_u32(self._file, self._datawritten) | |
| self._datalength = self._datawritten | |
| self._file.seek(0, 2) | |
| def open(f, mode=None): | |
| if mode is None: | |
| if hasattr(f, 'mode'): | |
| mode = f.mode | |
| else: | |
| mode = 'rb' | |
| if mode in ('r', 'rb'): | |
| return Au_read(f) | |
| elif mode in ('w', 'wb'): | |
| return Au_write(f) | |
| else: | |
| raise Error("mode must be 'r', 'rb', 'w', or 'wb'") | |
Xet Storage Details
- Size:
- 18.2 kB
- Xet hash:
- 25552ec6d340088697a6a59679a7469aab41e74f67872889f0d489cc9b5fb996
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.