Skip to content

Video sampler

EntropyByffer

Bases: FrameBuffer

Measure image entropy as a function of the image usability

Source code in video_sampler/buffer.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
class EntropyByffer(FrameBuffer):
    """Measure image entropy as a function of the image usability"""

    def __init__(
        self, size: int, expiry: int, debug_flag: bool = False, hash_size: int = 8
    ) -> None:
        self.sliding_top_k_buffer = SlidingTopKBuffer(
            size=size, expiry=expiry, debug_flag=debug_flag, hash_size=hash_size
        )

    def get_buffer_state(self) -> list[str]:
        return self.sliding_top_k_buffer.get_buffer_state()

    def add(self, item: Image.Image, metadata: dict[str, Any]):
        return self.sliding_top_k_buffer.add(
            item, {**metadata, "index": -item.entropy()}
        )

    def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
        return self.sliding_top_k_buffer.final_flush()

    def clear(self):
        self.sliding_top_k_buffer.clear()

FrameBuffer

Bases: ABC

Source code in video_sampler/buffer.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class FrameBuffer(ABC):
    @abstractmethod
    def add(self, item: Image.Image, metadata: dict[str, Any]) -> None | tuple:
        pass

    @abstractmethod
    def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
        """Flush the buffer and return the remaining items"""
        pass

    @abstractmethod
    def get_buffer_state(self) -> list[str]:
        """Return the current state of the buffer"""
        pass

    @abstractmethod
    def clear(self):
        """Clear the buffer"""
        pass

clear() abstractmethod

Clear the buffer

Source code in video_sampler/buffer.py
94
95
96
97
@abstractmethod
def clear(self):
    """Clear the buffer"""
    pass

final_flush() abstractmethod

Flush the buffer and return the remaining items

Source code in video_sampler/buffer.py
84
85
86
87
@abstractmethod
def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
    """Flush the buffer and return the remaining items"""
    pass

get_buffer_state() abstractmethod

Return the current state of the buffer

Source code in video_sampler/buffer.py
89
90
91
92
@abstractmethod
def get_buffer_state(self) -> list[str]:
    """Return the current state of the buffer"""
    pass

GridBuffer

Bases: HashBuffer

A class representing a grid-based buffer for images. Splits the image into a grid and stores the hashes of the grid cells in a mosaic buffer.

Parameters:

Name Type Description Default
size int

The maximum size of the buffer.

required
debug_flag bool

A flag indicating whether debug information should be printed.

False
hash_size int

The size of the hash.

4
grid_x int

The number of grid cells in the x-axis.

4
grid_y int

The number of grid cells in the y-axis.

4
max_hits int

The maximum number of hits allowed for a hash.

1

Attributes:

Name Type Description
grid_x int

The number of grid cells in the x-axis.

grid_y int

The number of grid cells in the y-axis.

max_hits int

The maximum number of hits allowed for a hash.

mosaic_buffer dict

A dictionary storing the mosaic buffer.

Methods:

Name Description
add

Adds an image to the buffer along with its metadata.

clear

Clears the buffer and the mosaic buffer.

update_ttl_buffer

Updates the buffer by expiring images that are not in the grid.

Source code in video_sampler/buffer.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
class GridBuffer(HashBuffer):
    """
    A class representing a grid-based buffer for images.
    Splits the image into a grid and stores the hashes of the grid cells in a mosaic buffer.

    Args:
        size (int): The maximum size of the buffer.
        debug_flag (bool, optional): A flag indicating whether debug information should be printed.
        hash_size (int, optional): The size of the hash.
        grid_x (int, optional): The number of grid cells in the x-axis.
        grid_y (int, optional): The number of grid cells in the y-axis.
        max_hits (int, optional): The maximum number of hits allowed for a hash.

    Attributes:
        grid_x (int): The number of grid cells in the x-axis.
        grid_y (int): The number of grid cells in the y-axis.
        max_hits (int): The maximum number of hits allowed for a hash.
        mosaic_buffer (dict): A dictionary storing the mosaic buffer.

    Methods:
        add(item, metadata):
            Adds an image to the buffer along with its metadata.
        clear():
            Clears the buffer and the mosaic buffer.
        update_ttl_buffer():
            Updates the buffer by expiring images that are not in the grid.

    """

    def __init__(
        self,
        size: int,
        debug_flag: bool = False,
        hash_size: int = 4,
        grid_x: int = 4,
        grid_y: int = 4,
        max_hits: int = 1,
    ) -> None:
        super().__init__(size, debug_flag, hash_size)
        self.grid_x = grid_x
        self.grid_y = grid_y
        self.max_hits = max_hits
        self.mosaic_buffer = {}

    def __get_grid_hash(self, item: Image.Image) -> Iterable[str]:
        """Compute grid hashes for a given image"""
        for x in range(self.grid_x):
            for y in range(self.grid_y):
                yield str(
                    phash(
                        item.crop(
                            (
                                x * item.width / self.grid_x,
                                y * item.height / self.grid_y,
                                (x + 1) * item.width / self.grid_x,
                                (y + 1) * item.height / self.grid_y,
                            )
                        ),
                        hash_size=self.hash_size,
                    )
                )

    def _check_mosaic(self, mosaic_hash: str):
        return mosaic_hash in self.mosaic_buffer

    def update_ttl_buffer(self):
        # expire the images that are not in the grid
        if len(self.ordered_buffer) >= self.max_size:
            to_return_hash, return_data = self.ordered_buffer.popitem(last=False)
            if to_return_hash is not None:
                removal_keys = [
                    img_hash
                    for img_hash, mosaic_hash in self.mosaic_buffer.items()
                    if mosaic_hash == to_return_hash
                ]
                for key in removal_keys:
                    del self.mosaic_buffer[key]
            return return_data
        return None

    def add(self, item: Image.Image, metadata: dict[str, Any]):
        hash_ = str(phash(item, hash_size=self.hash_size))
        if not self._check_duplicate(hash_):
            # not automatically rejected, check the mosaic buffer
            hash_hits = 0
            hash_sets = []
            for el_hash_ in self.__get_grid_hash(item):
                if el_hash_ in self.mosaic_buffer:
                    hash_hits += 1
                hash_sets.append(el_hash_)

            if hash_hits < self.max_hits:
                # add image hash to the ttl counter
                self.ordered_buffer[hash_] = (item, metadata)
                # add the image to the mosaic buffer
                # this also automatically overwrites the deleted hashes
                for el_hash in hash_sets:
                    self.mosaic_buffer[el_hash] = hash_

            if self.debug_flag:
                console.print(
                    f"\tHash hits: {hash_hits}"
                    f"\tHash sets: {len(hash_sets)}"
                    f"\tHash buffer: {len(self.get_buffer_state())}"
                    f"\tMosaic buffer: {len(self.mosaic_buffer)}"
                )
        return self.update_ttl_buffer()

    def clear(self):
        super().clear()
        self.mosaic_buffer = {}

__get_grid_hash(item)

Compute grid hashes for a given image

Source code in video_sampler/buffer.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def __get_grid_hash(self, item: Image.Image) -> Iterable[str]:
    """Compute grid hashes for a given image"""
    for x in range(self.grid_x):
        for y in range(self.grid_y):
            yield str(
                phash(
                    item.crop(
                        (
                            x * item.width / self.grid_x,
                            y * item.height / self.grid_y,
                            (x + 1) * item.width / self.grid_x,
                            (y + 1) * item.height / self.grid_y,
                        )
                    ),
                    hash_size=self.hash_size,
                )
            )

GzipBuffer

Bases: FrameBuffer

Measure compression size as a function of the image usability

Source code in video_sampler/buffer.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
class GzipBuffer(FrameBuffer):
    """Measure compression size as a function of the image usability"""

    def __init__(
        self, size: int, expiry: int, debug_flag: bool = False, hash_size: int = 8
    ) -> None:
        self.sliding_top_k_buffer = SlidingTopKBuffer(
            size=size, expiry=expiry, debug_flag=debug_flag, hash_size=hash_size
        )

    def get_buffer_state(self) -> list[str]:
        return self.sliding_top_k_buffer.get_buffer_state()

    def add(self, item: Image.Image, metadata: dict[str, Any]):
        compressed_l = len(gzip.compress(item.tobytes()))
        return self.sliding_top_k_buffer.add(item, {**metadata, "index": -compressed_l})

    def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
        return self.sliding_top_k_buffer.final_flush()

    def clear(self):
        self.sliding_top_k_buffer.clear()

HashBuffer

Bases: FrameBuffer

A buffer that stores frames with their corresponding metadata and checks for duplicates based on image hashes. Args: size (int): The maximum size of the buffer. debug_flag (bool, optional): Flag indicating whether to enable debug mode. Defaults to False. hash_size (int, optional): The size of the image hash. Defaults to 4.

Methods:

Name Description
get_buffer_state

Returns the current state of the buffer as a list of image hashes.

add

Image.Image, metadata: dict[str, Any]) Adds an item to the buffer along with its metadata.

final_flush

Yields the stored items and their metadata in the buffer.

Private Methods

__add(item: Image.Image, hash_: str, metadata: dict) Adds an item to the buffer with the given hash and metadata.

__check_duplicate(hash_: str) -> bool: Checks if the given hash already exists in the buffer and renews its validity if found.

Source code in video_sampler/buffer.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class HashBuffer(FrameBuffer):
    """
    A buffer that stores frames with their corresponding metadata and
    checks for duplicates based on image hashes.
    Args:
        size (int): The maximum size of the buffer.
        debug_flag (bool, optional): Flag indicating whether to enable debug mode. Defaults to False.
        hash_size (int, optional): The size of the image hash. Defaults to 4.

    Methods:
        get_buffer_state() -> list[str]:
            Returns the current state of the buffer as a list of image hashes.

        add(item: Image.Image, metadata: dict[str, Any])
            Adds an item to the buffer along with its metadata.

        final_flush() -> Iterable[tuple[Image.Image | None, dict]]:
            Yields the stored items and their metadata in the buffer.

        clear()
            Clears the buffer.

    Private Methods:
        __add(item: Image.Image, hash_: str, metadata: dict)
            Adds an item to the buffer with the given hash and metadata.

        __check_duplicate(hash_: str) -> bool:
            Checks if the given hash already exists in the buffer and renews its validity if found.

    """

    def __init__(self, size: int, debug_flag: bool = False, hash_size: int = 4) -> None:
        self.ordered_buffer = OrderedDict()
        self.max_size = size
        self.debug_flag = debug_flag
        self.hash_size = hash_size

    def get_buffer_state(self) -> list[str]:
        return list(self.ordered_buffer.keys())

    def add(self, item: Image.Image, metadata: dict[str, Any]):
        hash_ = str(phash(item, hash_size=self.hash_size))
        if not self._check_duplicate(hash_):
            return self.__add(hash_, item, metadata)
        return None

    def __add(self, hash_: str, item: Image.Image, metadata: dict):
        self.ordered_buffer[hash_] = (item, metadata)
        if len(self.ordered_buffer) >= self.max_size:
            return self.ordered_buffer.popitem(last=False)[1]
        return None

    def _check_duplicate(self, hash_: str) -> bool:
        if hash_ in self.ordered_buffer:
            # renew the hash validity
            if self.debug_flag:
                console.print(
                    f"Renewing {hash_}",
                    style=f"bold {Color.red.value}",
                )
            self.ordered_buffer.move_to_end(hash_)
            return True
        return False

    def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
        yield from self.ordered_buffer.values()

    def clear(self):
        self.ordered_buffer.clear()

SamplerConfig

Bases: BaseModel

Configuration options for the video sampler.

Parameters:

Name Type Description Default
min_frame_interval_sec float

The minimum time interval between sampled frames in seconds. Defaults to 1.

required
keyframes_only bool

Flag indicating whether to sample only keyframes. Defaults to True.

required
queue_wait float

The time to wait between checking the frame queue in seconds. Defaults to 0.1.

required
debug bool

Flag indicating whether to enable debug mode. Defaults to False.

required
print_stats bool

Flag indicating whether to print sampling statistics. Defaults to False.

required
buffer_config dict[str, Any]

Configuration options for the frame buffer. Defaults to {"type": "entropy", "size": 15, "debug": True}.

required
gate_config dict[str, Any]

Configuration options for the frame gate. Defaults to {"type": "pass"}.

required
extractor_config dict[str, Any]

Configuration options for the extractor (keyword, audio). Defaults to None.

required
summary_config dict[str, Any]

Configuration options for the summary generator. Defaults to None.

required

Methods: str() -> str: Returns a string representation of the configuration.

Source code in video_sampler/buffer.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class SamplerConfig(BaseModel):
    """
    Configuration options for the video sampler.

    Args:
        min_frame_interval_sec (float, optional): The minimum time interval
            between sampled frames in seconds. Defaults to 1.
        keyframes_only (bool, optional): Flag indicating whether to
            sample only keyframes. Defaults to True.
        queue_wait (float, optional): The time to wait between checking
            the frame queue in seconds. Defaults to 0.1.
        debug (bool, optional): Flag indicating whether to enable debug mode.
            Defaults to False.
        print_stats (bool, optional): Flag indicating whether to print
            sampling statistics. Defaults to False.
        buffer_config (dict[str, Any], optional): Configuration options for
                the frame buffer. Defaults to {"type": "entropy", "size": 15,
                "debug": True}.
        gate_config (dict[str, Any], optional): Configuration options for
                the frame gate. Defaults to {"type": "pass"}.
        extractor_config (dict[str, Any], optional): Configuration options for
                the extractor (keyword, audio). Defaults to None.
        summary_config (dict[str, Any], optional): Configuration options for
                the summary generator. Defaults to None.
    Methods:
        __str__() -> str:
            Returns a string representation of the configuration.

    """

    min_frame_interval_sec: float = Field(default=1, ge=0)
    keyframes_only: bool = True
    queue_wait: float = Field(default=0.1, ge=1e-3)
    debug: bool = False
    print_stats: bool = False
    buffer_config: dict[str, Any] = field(
        default_factory=lambda: {
            "type": "hash",
            "hash_size": 8,
            "size": 15,
            "debug": True,
        }
    )
    gate_config: dict[str, Any] = field(
        default_factory=lambda: {
            "type": "pass",
        }
    )
    extractor_config: dict[str, Any] = field(default_factory=dict)
    summary_config: dict[str, Any] = field(default_factory=dict)
    n_workers: int = 1

    def __str__(self) -> str:
        return str(asdict(self))

    @classmethod
    def from_yaml(cls, file_path: str) -> "SamplerConfig":
        with open(file_path) as file:
            data = yaml.safe_load(file)
        return cls(**data)

SlidingTopKBuffer

Bases: FrameBuffer

A class representing a sliding top-k buffer for frames.

Parameters:

Name Type Description Default
size int

The maximum size of the buffer.

required
debug_flag bool

A flag indicating whether debug information should be printed.

False
expiry int

The expiry count for frames.

30
hash_size int

The size of the hash.

8

Attributes:

Name Type Description
sliding_buffer list

The sliding buffer implemented as a min heap.

max_size int

The maximum size of the buffer.

debug_flag bool

A flag indicating whether debug information should be printed.

expiry_count int

The expiry count for frames.

hash_size int

The size of the hash.

Methods:

Name Description
get_buffer_state

Returns the current state of the buffer.

add

Adds a frame to the buffer along with its metadata.

final_flush

Performs a final flush of the buffer and yields the remaining frames.

clear

Clears the buffer.

Source code in video_sampler/buffer.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
class SlidingTopKBuffer(FrameBuffer):
    """
    A class representing a sliding top-k buffer for frames.

    Args:
        size (int): The maximum size of the buffer.
        debug_flag (bool, optional): A flag indicating whether debug information should be printed.
        expiry (int, optional): The expiry count for frames.
        hash_size (int, optional): The size of the hash.

    Attributes:
        sliding_buffer (list): The sliding buffer implemented as a min heap.
        max_size (int): The maximum size of the buffer.
        debug_flag (bool): A flag indicating whether debug information should be printed.
        expiry_count (int): The expiry count for frames.
        hash_size (int): The size of the hash.

    Methods:
        get_buffer_state() -> list[str]:
            Returns the current state of the buffer.
        add(item, metadata):
            Adds a frame to the buffer along with its metadata.
        final_flush() -> Iterable[tuple[Image.Image | None, dict]]:
            Performs a final flush of the buffer and yields the remaining frames.
        clear():
            Clears the buffer.

    """

    def __init__(
        self, size: int, debug_flag: bool = False, expiry: int = 30, hash_size: int = 8
    ) -> None:
        # it's a min heap with a fixed size
        self.sliding_buffer = []
        self.max_size = size
        self.debug_flag = debug_flag
        self.expiry_count = expiry
        self.hash_size = hash_size
        assert (
            self.expiry_count > self.max_size
        ), "expiry count must be greater than max size"
        console.print(
            f"Creating sliding buffer of size {self.max_size} and expiry {expiry}",
            style=f"bold {Color.red.value}",
        )

    def get_buffer_state(self) -> list[str]:
        return [item[:3] for item in self.sliding_buffer]

    def add(self, item: Image.Image, metadata: dict[str, Any]):
        assert "index" in metadata, "metadata must have index key for sliding buffer"
        average_hash_ = str(average_hash(item, hash_size=self.hash_size))
        to_return = None
        if not self.__check_duplicate(average_hash_):
            heapq.heappush(
                self.sliding_buffer,
                [metadata["index"], 0, average_hash_, item, metadata],
            )
            if len(self.sliding_buffer) >= self.max_size:
                to_return = heapq.heappop(self.sliding_buffer)[-2:]
        # update the expiry count
        expired_indx = -1
        for i in range(len(self.sliding_buffer)):
            self.sliding_buffer[i][1] += 1
            if self.sliding_buffer[i][1] >= self.expiry_count:
                expired_indx = i
        # at any point only one item can be expired
        if expired_indx != -1:
            self.sliding_buffer.pop(expired_indx)  # just drop
        return to_return

    def __check_duplicate(self, hash_: str) -> bool:
        for item in self.sliding_buffer:
            if item[2] == hash_:
                # renew the hash validity
                if self.debug_flag:
                    console.print(
                        f"Renewing {hash_}",
                        style=f"bold {Color.red.value}",
                    )
                item[1] = 0
                return True
        return False

    def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
        if len(self.sliding_buffer):
            yield heapq.heappop(self.sliding_buffer)[-2:]
        yield None, {}

    def clear(self):
        self.sliding_buffer.clear()

create_buffer(buffer_config)

Create a buffer based on the config

Source code in video_sampler/buffer.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
def create_buffer(buffer_config: dict[str, Any]):
    """Create a buffer based on the config"""
    console.print(
        f"Creating buffer of type {buffer_config['type']}",
        style=f"bold {Color.red.value}",
    )
    if buffer_config["type"] == "hash":
        return HashBuffer(
            size=buffer_config["size"],
            debug_flag=buffer_config["debug"],
            hash_size=buffer_config["hash_size"],
        )
    elif buffer_config["type"] == "grid":
        return GridBuffer(
            size=buffer_config["size"],
            debug_flag=buffer_config["debug"],
            hash_size=buffer_config["hash_size"],
            grid_x=buffer_config["grid_x"],
            grid_y=buffer_config["grid_y"],
            max_hits=buffer_config["max_hits"],
        )
    elif buffer_config["type"] == "sliding_top_k":
        return SlidingTopKBuffer(
            size=buffer_config["size"],
            debug_flag=buffer_config["debug"],
            expiry=buffer_config["expiry"],
        )
    elif buffer_config["type"] == "passthrough":
        return PassThroughBuffer()
    elif buffer_config["type"] == "gzip":
        return GzipBuffer(
            size=buffer_config["size"],
            debug_flag=buffer_config["debug"],
            hash_size=buffer_config["hash_size"],
            expiry=buffer_config["expiry"],
        )
    elif buffer_config["type"] == "entropy":
        return EntropyByffer(
            size=buffer_config["size"],
            debug_flag=buffer_config["debug"],
            hash_size=buffer_config["hash_size"],
            expiry=buffer_config["expiry"],
        )
    else:
        raise ValueError(f"Unknown buffer type {buffer_config['type']}")