| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """Flex data pipelines.""" |
|
|
| import multiprocessing |
|
|
| import cv2 |
| import numpy.random as npr |
|
|
| from diffnext.data import flex_transforms |
|
|
|
|
| class Worker(multiprocessing.Process): |
| """Base data worker.""" |
|
|
| def __init__(self): |
| super().__init__(daemon=True) |
| self.seed = 1337 |
| self.reader_queue = None |
| self.worker_queue = None |
|
|
| def run(self): |
| """Run implementation.""" |
| |
| cv2.setNumThreads(1), npr.seed(self.seed) |
| while True: |
| self.worker_queue.put(self.get_outputs(self.reader_queue.get())) |
|
|
|
|
| class FeaturePipe(object): |
| """Pipeline to transform data features.""" |
|
|
| def __init__(self): |
| super().__init__() |
| self.parse_latents = flex_transforms.ParseLatents() |
| self.parse_annotations = flex_transforms.ParseAnnotations() |
|
|
| def get_outputs(self, inputs): |
| """Return the outputs.""" |
| latents = self.parse_latents(inputs) |
| label, caption = self.parse_annotations(inputs) |
| outputs = {"latents": [latents]} |
| outputs.setdefault("prompt", [label]) if label is not None else None |
| outputs.setdefault("prompt", [caption]) if caption is not None else None |
| outputs.setdefault("motion", [inputs["flow"]]) if "flow" in inputs else None |
| return outputs |
|
|
|
|
| class FeatureWorker(FeaturePipe, Worker): |
| """Worker to transform data features.""" |
|
|