Elron commited on
Commit
62fd389
1 Parent(s): 5b43d27

Upload stream.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. stream.py +29 -55
stream.py CHANGED
@@ -3,10 +3,11 @@ from typing import Dict, Iterable
3
 
4
  from datasets import Dataset, DatasetDict, IterableDataset, IterableDatasetDict
5
 
6
- from .generator_utils import ReusableGenerator
 
7
 
8
 
9
- class Stream:
10
  """A class for handling streaming data in a customizable way.
11
 
12
  This class provides methods for generating, caching, and manipulating streaming data.
@@ -14,24 +15,13 @@ class Stream:
14
  Attributes:
15
  generator (function): A generator function for streaming data.
16
  gen_kwargs (dict, optional): A dictionary of keyword arguments for the generator function.
17
- streaming (bool): Whether the data is streaming or not.
18
  caching (bool): Whether the data is cached or not.
19
  """
20
 
21
- def __init__(self, generator, gen_kwargs=None, streaming=True, caching=False):
22
- """Initializes the Stream with the provided parameters.
23
-
24
- Args:
25
- generator (function): A generator function for streaming data.
26
- gen_kwargs (dict, optional): A dictionary of keyword arguments for the generator function. Defaults to None.
27
- streaming (bool, optional): Whether the data is streaming or not. Defaults to True.
28
- caching (bool, optional): Whether the data is cached or not. Defaults to False.
29
- """
30
-
31
- self.generator = generator
32
- self.gen_kwargs = gen_kwargs if gen_kwargs is not None else {}
33
- self.streaming = streaming
34
- self.caching = caching
35
 
36
  def _get_initator(self):
37
  """Private method to get the correct initiator based on the streaming and caching attributes.
@@ -39,16 +29,13 @@ class Stream:
39
  Returns:
40
  function: The correct initiator function.
41
  """
42
- if self.streaming:
43
- if self.caching:
44
- return IterableDataset.from_generator
45
- else:
46
- return ReusableGenerator
47
  else:
48
- if self.caching:
49
- return Dataset.from_generator
50
  else:
51
- raise ValueError("Cannot create non-streaming non-caching stream")
52
 
53
  def _get_stream(self):
54
  """Private method to get the stream based on the initiator function.
@@ -58,18 +45,9 @@ class Stream:
58
  """
59
  return self._get_initator()(self.generator, gen_kwargs=self.gen_kwargs)
60
 
61
- def set_caching(self, caching):
62
- self.caching = caching
63
-
64
- def set_streaming(self, streaming):
65
- self.streaming = streaming
66
-
67
  def __iter__(self):
68
  return iter(self._get_stream())
69
 
70
- def unwrap(self):
71
- return self._get_stream()
72
-
73
  def peak(self):
74
  return next(iter(self))
75
 
@@ -79,17 +57,6 @@ class Stream:
79
  break
80
  yield instance
81
 
82
- def __repr__(self):
83
- return f"{self.__class__.__name__}(generator={self.generator.__name__}, gen_kwargs={self.gen_kwargs}, streaming={self.streaming}, caching={self.caching})"
84
-
85
-
86
- def is_stream(obj):
87
- return isinstance(obj, IterableDataset) or isinstance(obj, Stream) or isinstance(obj, Dataset)
88
-
89
-
90
- def iterable_starter(iterable):
91
- return iter(deepcopy(iterable))
92
-
93
 
94
  class MultiStream(dict):
95
  """A class for handling multiple streams of data in a dictionary-like format.
@@ -125,8 +92,13 @@ class MultiStream(dict):
125
  """
126
  yield from self[key]
127
 
128
- def unwrap(self, cls):
129
- return cls({key: value.unwrap() for key, value in self.items()})
 
 
 
 
 
130
 
131
  def to_dataset(self) -> DatasetDict:
132
  return DatasetDict(
@@ -144,12 +116,11 @@ class MultiStream(dict):
144
  super().__setitem__(key, value)
145
 
146
  @classmethod
147
- def from_generators(cls, generators: Dict[str, ReusableGenerator], streaming=True, caching=False):
148
  """Creates a MultiStream from a dictionary of ReusableGenerators.
149
 
150
  Args:
151
  generators (Dict[str, ReusableGenerator]): A dictionary of ReusableGenerators.
152
- streaming (bool, optional): Whether the data should be streaming or not. Defaults to True.
153
  caching (bool, optional): Whether the data should be cached or not. Defaults to False.
154
 
155
  Returns:
@@ -160,22 +131,21 @@ class MultiStream(dict):
160
  return cls(
161
  {
162
  key: Stream(
163
- generator.get_generator(),
164
- gen_kwargs=generator.get_gen_kwargs(),
165
- streaming=streaming,
166
  caching=caching,
 
167
  )
168
  for key, generator in generators.items()
169
  }
170
  )
171
 
172
  @classmethod
173
- def from_iterables(cls, iterables: Dict[str, Iterable], streaming=True, caching=False):
174
  """Creates a MultiStream from a dictionary of iterables.
175
 
176
  Args:
177
  iterables (Dict[str, Iterable]): A dictionary of iterables.
178
- streaming (bool, optional): Whether the data should be streaming or not. Defaults to True.
179
  caching (bool, optional): Whether the data should be cached or not. Defaults to False.
180
 
181
  Returns:
@@ -184,7 +154,11 @@ class MultiStream(dict):
184
 
185
  return cls(
186
  {
187
- key: Stream(iterable_starter, gen_kwargs={"iterable": iterable}, streaming=streaming, caching=caching)
 
 
 
 
188
  for key, iterable in iterables.items()
189
  }
190
  )
 
3
 
4
  from datasets import Dataset, DatasetDict, IterableDataset, IterableDatasetDict
5
 
6
+ from .dataclass import Dataclass, OptionalField
7
+ from .generator_utils import CopyingReusableGenerator, ReusableGenerator
8
 
9
 
10
+ class Stream(Dataclass):
11
  """A class for handling streaming data in a customizable way.
12
 
13
  This class provides methods for generating, caching, and manipulating streaming data.
 
15
  Attributes:
16
  generator (function): A generator function for streaming data.
17
  gen_kwargs (dict, optional): A dictionary of keyword arguments for the generator function.
 
18
  caching (bool): Whether the data is cached or not.
19
  """
20
 
21
+ generator: callable
22
+ gen_kwargs: Dict[str, any] = OptionalField(default_factory=dict)
23
+ caching: bool = False
24
+ copying: bool = False
 
 
 
 
 
 
 
 
 
 
25
 
26
  def _get_initator(self):
27
  """Private method to get the correct initiator based on the streaming and caching attributes.
 
29
  Returns:
30
  function: The correct initiator function.
31
  """
32
+ if self.caching:
33
+ return Dataset.from_generator
 
 
 
34
  else:
35
+ if self.copying:
36
+ return CopyingReusableGenerator
37
  else:
38
+ return ReusableGenerator
39
 
40
  def _get_stream(self):
41
  """Private method to get the stream based on the initiator function.
 
45
  """
46
  return self._get_initator()(self.generator, gen_kwargs=self.gen_kwargs)
47
 
 
 
 
 
 
 
48
  def __iter__(self):
49
  return iter(self._get_stream())
50
 
 
 
 
51
  def peak(self):
52
  return next(iter(self))
53
 
 
57
  break
58
  yield instance
59
 
 
 
 
 
 
 
 
 
 
 
 
60
 
61
  class MultiStream(dict):
62
  """A class for handling multiple streams of data in a dictionary-like format.
 
92
  """
93
  yield from self[key]
94
 
95
+ def set_caching(self, caching: bool):
96
+ for stream in self.values():
97
+ stream.caching = caching
98
+
99
+ def set_copying(self, copying: bool):
100
+ for stream in self.values():
101
+ stream.copying = copying
102
 
103
  def to_dataset(self) -> DatasetDict:
104
  return DatasetDict(
 
116
  super().__setitem__(key, value)
117
 
118
  @classmethod
119
+ def from_generators(cls, generators: Dict[str, ReusableGenerator], caching=False, copying=False):
120
  """Creates a MultiStream from a dictionary of ReusableGenerators.
121
 
122
  Args:
123
  generators (Dict[str, ReusableGenerator]): A dictionary of ReusableGenerators.
 
124
  caching (bool, optional): Whether the data should be cached or not. Defaults to False.
125
 
126
  Returns:
 
131
  return cls(
132
  {
133
  key: Stream(
134
+ generator.generator,
135
+ gen_kwargs=generator.gen_kwargs,
 
136
  caching=caching,
137
+ copying=copying,
138
  )
139
  for key, generator in generators.items()
140
  }
141
  )
142
 
143
  @classmethod
144
+ def from_iterables(cls, iterables: Dict[str, Iterable], caching=False, copying=False):
145
  """Creates a MultiStream from a dictionary of iterables.
146
 
147
  Args:
148
  iterables (Dict[str, Iterable]): A dictionary of iterables.
 
149
  caching (bool, optional): Whether the data should be cached or not. Defaults to False.
150
 
151
  Returns:
 
154
 
155
  return cls(
156
  {
157
+ key: Stream(
158
+ iterable.__iter__,
159
+ caching=caching,
160
+ copying=copying,
161
+ )
162
  for key, iterable in iterables.items()
163
  }
164
  )