File size: 11,051 Bytes
b7c4274 f905982 b7c4274 f905982 b7c4274 f905982 b7c4274 f905982 b7c4274 f905982 b7c4274 f905982 b7c4274 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
import itertools
import random
import re
from .generator_utils import ReusableGenerator
def parse_random_mix_string(input_str):
"""
Parses a string of format "source1[percentage1%]+source2[value2]+..." and returns a dictionary.
Args:
input_str (str): A string containing source names and their respective proportions. The format is
"source[proportion%]" or "source[proportion]", with multiple sources separated by "+".
The proportion can be a percentage (e.g., "90%") or a decimal number (e.g., "0.7").
If the proportion is not provided, it assumes 100%.
Returns:
dict: A dictionary where the keys are the source names and the values are the proportions converted to floats.
If the proportion was given as a percentage, the value is divided by 100.
Raises:
ValueError: If the input string is not in the correct format.
Example:
>>> parse_random_mix_string("dale[90%]+oren[0.7]+mike")
{'dale': 0.9, 'oren': 0.7, 'mike': 1.0}
"""
if not re.fullmatch(r"(([a-zA-Z]+\[\d*\.?\d*%?\]|[a-zA-Z]+)\+)*([a-zA-Z]+\[\d*\.?\d*%?\]|[a-zA-Z]+)", input_str):
raise ValueError("Invalid input format")
pattern = re.compile(r"([a-zA-Z]+)(\[\d*\.?\d*%?\])?")
matches = pattern.findall(input_str)
return {
name: float(value.strip("[]%")) / 100 if "%" in value else (float(value.strip("[]")) if value else 1.0)
for name, value in matches
}
def parse_slices_string(input_str):
"""
Parses a string of format "source1[value1:value2] + source2[value2:] + source3 + ..." and returns a dictionary:
{"source1": [(value1,value2)], "source2": [(value2, None)], "source3": [(None,None)]...}
If a source appears multiple times with different indices, all index pairs are included in the list.
Args:
input_str (str): A string containing source names and their respective indices. The format is
"source[:index]" or "source[index:]", with multiple sources separated by "+".
The index represents the items to be taken from the source.
Returns:
dict: A dictionary where the keys are the source names and the values are lists of indices as tuples.
If the index is before the colon, it is represented as (None, index),
if it's after the colon, it's represented as (index, None)
Raises:
ValueError: If the input string is not in the correct format.
Example:
>>> parse_slices_string("oren[:50]+jake[24:]+test+oren[5:10]")
{'oren': [(None, 50), (5, 10)], 'jake': [(24, None)], 'test': [(None, None)]}
"""
result_dict = {}
# Split the input string into a list of sources
sources = re.split("\+", input_str)
for source in sources:
# If the source has a slice, parse it
match = re.fullmatch(r"(\w+)\[(\d*):(\d*)\]", source)
if match:
name, start, end = match.groups()
start = int(start) if start else None
end = int(end) if end else None
elif re.fullmatch(r"\w+", source):
# If the source has no slice, use None for both start and end
name = source
start = end = None
else:
raise ValueError(f'The input string "{input_str}" is not in the correct format.')
if name not in result_dict:
result_dict[name] = [(start, end)]
else:
result_dict[name].append((start, end))
return result_dict
def slice_stream(stream, start, end):
# If start is None, consume from the beginning
if start is not None:
stream = itertools.islice(stream, start, None)
# If end is not None, consume until end
if end is not None:
stream = itertools.islice(stream, end)
for item in stream:
yield item
# return stream
def slice_streams(input_streams, mapping):
"""
Slices multiple input streams according to a mapping and chains the results together.
Args:
input_streams (dict): A dictionary where the keys are the names of the input streams
and the values are the input streams themselves.
mapping (dict): A dictionary where the keys are the names of the new streams
and the values are dictionaries mapping old stream names
to lists of tuples representing slices.
Returns:
dict: A dictionary where the keys are the names of the new streams and the values are
the new streams, which consist of parts of the old streams chained together.
Raises:
ValueError: If a stream is supposed to be sliced at an index greater than its length.
Example:
>>> old_streams = {"train": [1, 2, 3, 4, 5, 6, 7, 8, 9], "test": [10, 11, 12, 13, 14]}
>>> mapping = {"new_train": {"train": [(None, 5), (7, 9)]}, "new_test": {"test": [(2, None)]}}
>>> slice_streams(old_streams, mapping)
{"new_train": [1, 2, 3, 4, 5, 8, 9], "new_test": [12, 13, 14]}
"""
new_streams = {}
for new_stream, sources in mapping.items():
def generator(new_stream, sources):
for old_stream, slices in sources.items():
old_stream_content = input_streams[old_stream]
for start, end in slices:
yield from slice_stream(old_stream_content, start, end)
new_streams[new_stream] = ReusableGenerator(
generator, gen_kwargs={"new_stream": new_stream, "sources": sources}
)
return new_streams
def build_stream_routing(mapping):
"""
Builds the stream mapping dictionary based on the provided mapping.
The stream mapping dictionary represents the mapping of old streams to new streams
and their respective probabilities. It ensures that the probabilities for each old stream
do not sum up to more than one. If the sum of probabilities is less than one,
a null stream (None) is included to account for the remaining probability.
Args:
mapping (dict): A dictionary specifying the mapping of old streams to new streams
and their respective probabilities.
Returns:
dict: A dictionary representing the stream mapping, where each entry corresponds to an
old stream, and the value is a tuple containing the new streams and their respective
probabilities.
Example:
>>> mapping = {
'my_new_stream': {
'my_old_stream1': 0.6,
'my_old_stream2': 0.2
},
'my_new_stream2': {
'my_old_stream1': 0.4,
'my_old_stream2': 0.8
}
}
stream_mapping = build_stream_mapping(mapping)
print(stream_mapping)
# Output: {'my_old_stream1': (['my_new_stream', 'my_new_stream2'], [0.6, 0.4]),
# 'my_old_stream2': (['my_new_stream', 'my_new_stream2'], [0.2, 0.8])}
"""
stream_mapping = {}
# Calculate total weight for each old stream
total_weights = {}
for new_stream, old_streams in mapping.items():
for old_stream, weight in old_streams.items():
if old_stream not in total_weights:
total_weights[old_stream] = weight
else:
total_weights[old_stream] += weight
# Build stream_mapping with null stream included
for new_stream, old_streams in mapping.items():
for old_stream, weight in old_streams.items():
if old_stream not in stream_mapping:
stream_mapping[old_stream] = {}
stream_mapping[old_stream][new_stream] = weight
# Add null stream if total weight less than 1
if total_weights[old_stream] < 1:
stream_mapping[old_stream][None] = 1 - total_weights[old_stream]
stream_mapping = {k: (list(v.keys()), list(v.values())) for k, v in stream_mapping.items()}
return stream_mapping
def random_mix_generator(new_stream_name, new_stream_sources, stream_routing, rand, input_streams):
for old_stream_name in new_stream_sources:
optinal_streams, weights = stream_routing[old_stream_name]
rand.seed(old_stream_name)
for item in input_streams[old_stream_name]:
choice = rand.choices(optinal_streams, weights=weights, k=1)[0]
if choice == new_stream_name:
yield item
def random_mix_streams(input_streams, mapping):
"""
Creates new streams based on the provided input streams and mapping.
The create_streams function generates new streams by selectively including items from
the old streams based on the specified mapping. Each item will be included in at most
one new stream, as defined by the probabilities in the mapping and stream routing.
Args:
input_streams (dict): A dictionary containing the input streams, where each key is
the name of the stream and the value is an iterable or generator
representing the stream.
mapping (dict): A dictionary specifying the mapping of old streams to new streams
and their respective probabilities.
Returns:
dict: A dictionary containing the generated new streams, where each key is the name
of the new stream and the value is a generator representing the stream.
Example:
>>> input_streams = {
'my_old_stream1': gen1(),
'my_old_stream2': gen2(),
}
mapping = {
'my_new_stream': {
'my_old_stream1': 0.6,
'my_old_stream2': 0.2
},
'my_new_stream2': {
'my_old_stream1': 0.4,
'my_old_stream2': 0.8
}
}
new_streams = create_streams(input_streams, mapping)
for new_stream_name, new_stream in new_streams.items():
print(f"{new_stream_name}:")
for _, item in zip(range(10), new_stream):
print(item)
"""
new_streams = {}
# Build stream routing
stream_routing = build_stream_routing(mapping)
rand = random.Random()
# Create new stream generators
for new_stream_name, new_stream_sources in mapping.items():
new_streams[new_stream_name] = ReusableGenerator(
random_mix_generator,
gen_kwargs={
"new_stream_name": new_stream_name,
"new_stream_sources": new_stream_sources,
"stream_routing": stream_routing,
"rand": rand,
"input_streams": input_streams,
},
)
return new_streams
if __name__ == "__main__":
print(parse_random_mix_string("dale[90%]+oren[0.7]+mike"))
|