Skip to content

Sampler

SegmentSampler

Bases: VideoSampler

A class for sampling video frames based on subtitle segments.

Parameters:

Name Type Description Default
cfg SamplerConfig

The configuration for the video sampler.

required
segment_generator Iterable[subtitle_line]

An iterable of subtitle segments.

required

Methods:

Name Description
sample

Generates sample frames from a video.

write_queue

Writes sampled frames to a queue.

Source code in video_sampler/sampler.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class SegmentSampler(VideoSampler):
    """
    A class for sampling video frames based on subtitle segments.

    Args:
        cfg (SamplerConfig): The configuration for the video sampler.
        segment_generator (Iterable[subtitle_line]): An iterable of subtitle segments.

    Methods:
        sample(video_path) -> Iterable[list[FrameObject]]:
            Generates sample frames from a video.
        write_queue(video_path, q):
            Writes sampled frames to a queue.
    """

    def __init__(self, cfg: SamplerConfig) -> None:
        super().__init__(cfg)
        self.extractor = create_extractor(cfg.extractor_config)

    def sample(self, video_path: str, subs: str = None) -> Iterable[list[FrameObject]]:
        """Generate sample frames from a video.

        Args:
            video_path (str): The path to the video file.
            subs (str): Subtitles for the video file.

        Yields:
            Iterable[list[FrameObject]]: A generator that yields a list of FrameObjects representing sampled frames.
        """
        segment_generator: Iterable[subtitle_line] = self.extractor.generate_segments(
            subs
        )
        self._init_sampler()
        next_segment = next(segment_generator)
        segment_boundary_end_sec = next_segment.end_time / 1000
        segment_boundary_start_sec = next_segment.start_time / 1000
        absolute_stop = False
        with av.open(video_path) as container:
            stream = container.streams.video[0]
            if self.cfg.keyframes_only:
                stream.codec_context.skip_frame = "NONKEY"
            prev_time = -10
            for frame_indx, frame in enumerate(container.decode(stream)):
                if frame is None:
                    continue
                try:
                    ftime = frame.time
                except AttributeError:
                    continue
                reiters = 0
                # find the next segment that starts after the current frame
                while ftime > segment_boundary_end_sec:
                    console.print(
                        f"Seeking to next segment: {segment_boundary_end_sec}/{ftime}",
                        style=f"bold {Color.yellow.value}",
                    )
                    try:
                        next_segment = next(segment_generator)
                        reiters += 1
                        segment_boundary_end_sec = next_segment.end_time / 1000
                        segment_boundary_start_sec = next_segment.start_time / 1000
                    except StopIteration:
                        absolute_stop = True
                        break
                if reiters > 0:
                    console.print(
                        f"Skipped {reiters} segments!",
                        style=f"bold {Color.red.value}",
                    )
                if absolute_stop:
                    break
                # we haven't found the next segment yet
                # the other condition, is where we are after the segment
                # but this is handled by the while loop above
                if ftime <= segment_boundary_start_sec:
                    continue

                self.stats["total"] += 1
                time_diff = ftime - prev_time
                if time_diff < self.cfg.min_frame_interval_sec:
                    continue
                prev_time = ftime

                yield from self._process_frame(frame_indx, frame, ftime)
        # flush buffer
        yield from self.flush_buffer()

    def write_queue(self, video_path: str, q: Queue, subs: str = None):
        super().write_queue(video_path, q, subs=subs)

sample(video_path, subs=None)

Generate sample frames from a video.

Parameters:

Name Type Description Default
video_path str

The path to the video file.

required
subs str

Subtitles for the video file.

None

Yields:

Type Description
Iterable[list[FrameObject]]

Iterable[list[FrameObject]]: A generator that yields a list of FrameObjects representing sampled frames.

Source code in video_sampler/sampler.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def sample(self, video_path: str, subs: str = None) -> Iterable[list[FrameObject]]:
    """Generate sample frames from a video.

    Args:
        video_path (str): The path to the video file.
        subs (str): Subtitles for the video file.

    Yields:
        Iterable[list[FrameObject]]: A generator that yields a list of FrameObjects representing sampled frames.
    """
    segment_generator: Iterable[subtitle_line] = self.extractor.generate_segments(
        subs
    )
    self._init_sampler()
    next_segment = next(segment_generator)
    segment_boundary_end_sec = next_segment.end_time / 1000
    segment_boundary_start_sec = next_segment.start_time / 1000
    absolute_stop = False
    with av.open(video_path) as container:
        stream = container.streams.video[0]
        if self.cfg.keyframes_only:
            stream.codec_context.skip_frame = "NONKEY"
        prev_time = -10
        for frame_indx, frame in enumerate(container.decode(stream)):
            if frame is None:
                continue
            try:
                ftime = frame.time
            except AttributeError:
                continue
            reiters = 0
            # find the next segment that starts after the current frame
            while ftime > segment_boundary_end_sec:
                console.print(
                    f"Seeking to next segment: {segment_boundary_end_sec}/{ftime}",
                    style=f"bold {Color.yellow.value}",
                )
                try:
                    next_segment = next(segment_generator)
                    reiters += 1
                    segment_boundary_end_sec = next_segment.end_time / 1000
                    segment_boundary_start_sec = next_segment.start_time / 1000
                except StopIteration:
                    absolute_stop = True
                    break
            if reiters > 0:
                console.print(
                    f"Skipped {reiters} segments!",
                    style=f"bold {Color.red.value}",
                )
            if absolute_stop:
                break
            # we haven't found the next segment yet
            # the other condition, is where we are after the segment
            # but this is handled by the while loop above
            if ftime <= segment_boundary_start_sec:
                continue

            self.stats["total"] += 1
            time_diff = ftime - prev_time
            if time_diff < self.cfg.min_frame_interval_sec:
                continue
            prev_time = ftime

            yield from self._process_frame(frame_indx, frame, ftime)
    # flush buffer
    yield from self.flush_buffer()

VideoSampler

The fundamental class for sampling video frames.

Parameters:

Name Type Description Default
cfg SamplerConfig

The configuration for the video sampler.

required

Attributes:

Name Type Description
cfg SamplerConfig

The configuration for the video sampler.

frame_buffer FrameBuffer

The frame buffer used for sampling frames.

gate Gate

The gate used for filtering frames.

stats Counter

A counter for tracking statistics.

Methods:

Name Description
sample

Generates sample frames from a video.

write_queue

Writes sampled frames to a queue.

Source code in video_sampler/sampler.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class VideoSampler:
    """
    The fundamental class for sampling video frames.

    Args:
        cfg (SamplerConfig): The configuration for the video sampler.

    Attributes:
        cfg (SamplerConfig): The configuration for the video sampler.
        frame_buffer (FrameBuffer): The frame buffer used for sampling frames.
        gate (Gate): The gate used for filtering frames.
        stats (Counter): A counter for tracking statistics.

    Methods:
        sample(video_path) -> Iterable[list[FrameObject]]:
            Generates sample frames from a video.
        write_queue(video_path, q):
            Writes sampled frames to a queue.

    """

    def __init__(self, cfg: SamplerConfig) -> None:
        self.cfg: SamplerConfig = deepcopy(cfg)
        self.frame_buffer = create_buffer(self.cfg.buffer_config)
        self.gate = create_gate(self.cfg.gate_config)
        self.stats = Counter()

    def flush_buffer(self):
        """Flushes the frame buffer and yields gated frames"""
        for res in self.frame_buffer.final_flush():
            if res:
                self.stats["produced"] += 1
                gated_obj = self.gate(*res)
                self.stats["gated"] += gated_obj.N
                if gated_obj.frames:
                    yield gated_obj.frames
        gated_obj = self.gate.flush()
        self.stats["gated"] += gated_obj.N
        if gated_obj.frames:
            yield gated_obj.frames
        yield PROCESSING_DONE_ITERABLE

    def _init_sampler(self):
        self.stats.clear()
        self.frame_buffer.clear()

    def _process_frame(self, frame_indx, frame, ftime):
        frame_pil: Image = frame.to_image()
        if self.cfg.debug:
            buf = self.frame_buffer.get_buffer_state()
            console.print(
                f"Frame {frame_indx}\ttime: {ftime}",
                f"\t Buffer ({len(buf)}): {buf}",
                style=f"bold {Color.green.value}",
            )
        frame_meta = {"frame_time": ftime, "frame_indx": frame_indx}
        self.stats["decoded"] += 1
        if res := self.frame_buffer.add(
            frame_pil,
            metadata=frame_meta,
        ):
            gated_obj = self.gate(*res)
            self.stats["produced"] += 1
            self.stats["gated"] += gated_obj.N
            if gated_obj.frames:
                yield gated_obj.frames

    def sample(self, video_path: str, subs: str = None) -> Iterable[list[FrameObject]]:
        """Generate sample frames from a video.

        Args:
            video_path (str): The path to the video file.
            subs (str): Unused in video sampler

        Yields:
            Iterable[list[FrameObject]]: A generator that yields a list of FrameObjects representing sampled frames.
        """
        self._init_sampler()
        with av.open(video_path) as container:
            stream = container.streams.video[0]
            if self.cfg.keyframes_only:
                stream.codec_context.skip_frame = "NONKEY"
            prev_time = -10
            for frame_indx, frame in enumerate(container.decode(stream)):
                if frame is None:
                    continue
                try:
                    ftime = frame.time
                except AttributeError:
                    continue
                # skip frames if keyframes_only is True
                time_diff = ftime - prev_time
                self.stats["total"] += 1
                if time_diff < self.cfg.min_frame_interval_sec:
                    continue
                prev_time = ftime

                yield from self._process_frame(frame_indx, frame, ftime)

        # flush buffer
        yield from self.flush_buffer()

    def write_queue(self, video_path: str, q: Queue, subs: str = None):
        try:
            item: tuple[FrameObject, int]
            for item in self.sample(video_path=video_path, subs=subs):
                q.put(item)
        except (av.IsADirectoryError, av.InvalidDataError) as e:
            console.print(
                f"Error while processing {video_path}",
                f"\n\t{e}",
                style=f"bold {Color.red.value}",
            )
            q.put(PROCESSING_DONE_ITERABLE)

flush_buffer()

Flushes the frame buffer and yields gated frames

Source code in video_sampler/sampler.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def flush_buffer(self):
    """Flushes the frame buffer and yields gated frames"""
    for res in self.frame_buffer.final_flush():
        if res:
            self.stats["produced"] += 1
            gated_obj = self.gate(*res)
            self.stats["gated"] += gated_obj.N
            if gated_obj.frames:
                yield gated_obj.frames
    gated_obj = self.gate.flush()
    self.stats["gated"] += gated_obj.N
    if gated_obj.frames:
        yield gated_obj.frames
    yield PROCESSING_DONE_ITERABLE

sample(video_path, subs=None)

Generate sample frames from a video.

Parameters:

Name Type Description Default
video_path str

The path to the video file.

required
subs str

Unused in video sampler

None

Yields:

Type Description
Iterable[list[FrameObject]]

Iterable[list[FrameObject]]: A generator that yields a list of FrameObjects representing sampled frames.

Source code in video_sampler/sampler.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def sample(self, video_path: str, subs: str = None) -> Iterable[list[FrameObject]]:
    """Generate sample frames from a video.

    Args:
        video_path (str): The path to the video file.
        subs (str): Unused in video sampler

    Yields:
        Iterable[list[FrameObject]]: A generator that yields a list of FrameObjects representing sampled frames.
    """
    self._init_sampler()
    with av.open(video_path) as container:
        stream = container.streams.video[0]
        if self.cfg.keyframes_only:
            stream.codec_context.skip_frame = "NONKEY"
        prev_time = -10
        for frame_indx, frame in enumerate(container.decode(stream)):
            if frame is None:
                continue
            try:
                ftime = frame.time
            except AttributeError:
                continue
            # skip frames if keyframes_only is True
            time_diff = ftime - prev_time
            self.stats["total"] += 1
            if time_diff < self.cfg.min_frame_interval_sec:
                continue
            prev_time = ftime

            yield from self._process_frame(frame_indx, frame, ftime)

    # flush buffer
    yield from self.flush_buffer()

Worker

Source code in video_sampler/sampler.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
class Worker:
    def __init__(
        self,
        cfg: SamplerConfig,
        devnull: bool = False,
        sampler_cls: VideoSampler = VideoSampler,
        extra_sampler_args: dict = None,
    ) -> None:
        if extra_sampler_args is None:
            extra_sampler_args = {}
        self.cfg: SamplerConfig = cfg
        self.sampler: VideoSampler = sampler_cls(cfg=cfg, **extra_sampler_args)
        self.q = Queue()
        self.devnull = devnull
        self.__initialise_summary_objs()

    def __initialise_summary_objs(self):
        self.pool = None
        self.futures = {}
        if self.cfg.summary_config:
            from concurrent.futures import ThreadPoolExecutor

            from .integrations.llava_chat import ImageDescriptionDefault

            console.print("Initialising summary pool...", style="bold yellow")
            self.pool = ThreadPoolExecutor(
                max_workers=self.cfg.summary_config.get("max_workers", 2)
            )
            self.desc_client = ImageDescriptionDefault(
                url=self.cfg.summary_config.get("url")
            )

    def collect_summaries(self, savepath: str):
        if not self.pool:
            return
        console.print(
            f"Waiting for summary pool to finish: [{len(self.futures)}] items...",
            style="bold yellow",
        )
        summary_info = []
        for k, v in self.futures.items():
            if summary := v.result():
                summary_info.append({"time": k, "summary": summary})
                if self.cfg.debug:
                    console.print(
                        f"Summary for frame {k}",
                        f"\t{summary}",
                        style="bold green",
                    )
        import json

        # save as a jsonl
        try:
            with open(os.path.join(savepath, "summaries.jsonl"), "w") as f:
                for item in summary_info:
                    f.write(json.dumps(item) + "\n")
        except OSError as e:
            console.print(f"Failed to write to file: {e}", style="bold red")

    def launch(
        self,
        video_path: str,
        output_path: str = "",
        pretty_video_name: str = "",
        subs: str = None,
    ) -> None:
        """
        Launch the worker.

        Args:
            video_path (str): Path to the video file.
            output_path (str, optional): Path to the output folder. Defaults to "".
            pretty_video_name (str, optional): Name of the video file for pretty printing (useful for urls).
                                                Defaults to "".
        """
        if not pretty_video_name:
            pretty_video_name = os.path.basename(video_path)
        if output_path and self.devnull:
            raise ValueError("Cannot write to disk when devnull is True")
        if output_path:
            os.makedirs(output_path, exist_ok=True)

        proc_thread = Thread(
            target=self.sampler.write_queue, args=(video_path, self.q, subs)
        )
        proc_thread.start()
        self.queue_reader(output_path, read_interval=self.cfg.queue_wait)
        proc_thread.join()
        self.collect_summaries(output_path)
        if self.cfg.print_stats:
            console.print(
                f"Stats for: {pretty_video_name}",
                f"\n\tTotal frames: {self.sampler.stats['total']}",
                f"\n\tDecoded frames: {self.sampler.stats['decoded']}",
                f"\n\tProduced frames: {self.sampler.stats['produced']}",
                f"\n\tGated frames: {self.sampler.stats['gated']}",
                style=f"bold {Color.magenta.value}",
            )

    def queue_reader(self, output_path, read_interval=0.1) -> None:
        """
        Reads frames from the queue and saves them as JPEG images.

        Args:
            output_path (str): The directory path where the frames will be saved.
            read_interval (float, optional): The time interval between reading frames from the queue.
                    Defaults to 0.1 seconds.
        """
        last_summary_time = -10
        self.futures = {}  # clear futures
        while True:
            if not self.q.empty():
                frame_object: FrameObject
                for frame_object in self.q.get():
                    if frame_object.metadata.get("end", False):
                        return
                    if frame_object.frame is not None and (
                        not self.devnull and isinstance(frame_object.frame, Image.Image)
                    ):
                        frame_object.frame.save(
                            os.path.join(
                                output_path,
                                f"{frame_object.metadata['frame_time']}.jpg",
                            )
                        )
                        if self.pool:
                            ftime = frame_object.metadata["frame_time"]
                            if ftime - last_summary_time < self.cfg.summary_config.get(
                                "min_sum_interval", 30
                            ):  # seconds
                                continue

                            future = self.pool.submit(
                                self.desc_client.summarise_image, frame_object.frame
                            )
                            if self.cfg.debug:
                                console.print(
                                    f"Submitting summary for frame {ftime}",
                                    style="bold yellow",
                                )
                            self.futures[ftime] = future
                            last_summary_time = ftime
            time.sleep(read_interval)

launch(video_path, output_path='', pretty_video_name='', subs=None)

Launch the worker.

Parameters:

Name Type Description Default
video_path str

Path to the video file.

required
output_path str

Path to the output folder. Defaults to "".

''
pretty_video_name str

Name of the video file for pretty printing (useful for urls). Defaults to "".

''
Source code in video_sampler/sampler.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def launch(
    self,
    video_path: str,
    output_path: str = "",
    pretty_video_name: str = "",
    subs: str = None,
) -> None:
    """
    Launch the worker.

    Args:
        video_path (str): Path to the video file.
        output_path (str, optional): Path to the output folder. Defaults to "".
        pretty_video_name (str, optional): Name of the video file for pretty printing (useful for urls).
                                            Defaults to "".
    """
    if not pretty_video_name:
        pretty_video_name = os.path.basename(video_path)
    if output_path and self.devnull:
        raise ValueError("Cannot write to disk when devnull is True")
    if output_path:
        os.makedirs(output_path, exist_ok=True)

    proc_thread = Thread(
        target=self.sampler.write_queue, args=(video_path, self.q, subs)
    )
    proc_thread.start()
    self.queue_reader(output_path, read_interval=self.cfg.queue_wait)
    proc_thread.join()
    self.collect_summaries(output_path)
    if self.cfg.print_stats:
        console.print(
            f"Stats for: {pretty_video_name}",
            f"\n\tTotal frames: {self.sampler.stats['total']}",
            f"\n\tDecoded frames: {self.sampler.stats['decoded']}",
            f"\n\tProduced frames: {self.sampler.stats['produced']}",
            f"\n\tGated frames: {self.sampler.stats['gated']}",
            style=f"bold {Color.magenta.value}",
        )

queue_reader(output_path, read_interval=0.1)

Reads frames from the queue and saves them as JPEG images.

Parameters:

Name Type Description Default
output_path str

The directory path where the frames will be saved.

required
read_interval float

The time interval between reading frames from the queue. Defaults to 0.1 seconds.

0.1
Source code in video_sampler/sampler.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def queue_reader(self, output_path, read_interval=0.1) -> None:
    """
    Reads frames from the queue and saves them as JPEG images.

    Args:
        output_path (str): The directory path where the frames will be saved.
        read_interval (float, optional): The time interval between reading frames from the queue.
                Defaults to 0.1 seconds.
    """
    last_summary_time = -10
    self.futures = {}  # clear futures
    while True:
        if not self.q.empty():
            frame_object: FrameObject
            for frame_object in self.q.get():
                if frame_object.metadata.get("end", False):
                    return
                if frame_object.frame is not None and (
                    not self.devnull and isinstance(frame_object.frame, Image.Image)
                ):
                    frame_object.frame.save(
                        os.path.join(
                            output_path,
                            f"{frame_object.metadata['frame_time']}.jpg",
                        )
                    )
                    if self.pool:
                        ftime = frame_object.metadata["frame_time"]
                        if ftime - last_summary_time < self.cfg.summary_config.get(
                            "min_sum_interval", 30
                        ):  # seconds
                            continue

                        future = self.pool.submit(
                            self.desc_client.summarise_image, frame_object.frame
                        )
                        if self.cfg.debug:
                            console.print(
                                f"Submitting summary for frame {ftime}",
                                style="bold yellow",
                            )
                        self.futures[ftime] = future
                        last_summary_time = ftime
        time.sleep(read_interval)