Skip to content

Video sampler

EntropyByffer

Bases: FrameBuffer

Measure image entropy as a function of the image usability

Source code in video_sampler/buffer.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
class EntropyByffer(FrameBuffer):
    """Measure image entropy as a function of the image usability"""

    def __init__(
        self, size: int, expiry: int, debug_flag: bool = False, hash_size: int = 8
    ) -> None:
        self.sliding_top_k_buffer = SlidingTopKBuffer(
            size=size, expiry=expiry, debug_flag=debug_flag, hash_size=hash_size
        )

    def get_buffer_state(self) -> list[str]:
        return self.sliding_top_k_buffer.get_buffer_state()

    def add(self, item: Image.Image, metadata: dict[str, Any]):
        return self.sliding_top_k_buffer.add(
            item, {**metadata, "index": -item.entropy()}
        )

    def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
        return self.sliding_top_k_buffer.final_flush()

    def clear(self):
        self.sliding_top_k_buffer.clear()

FrameBuffer

Bases: ABC

Source code in video_sampler/buffer.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class FrameBuffer(ABC):
    @abstractmethod
    def add(self, item: Image.Image, metadata: dict[str, Any]) -> None | tuple:
        pass

    @abstractmethod
    def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
        """Flush the buffer and return the remaining items"""
        pass

    @abstractmethod
    def get_buffer_state(self) -> list[str]:
        """Return the current state of the buffer"""
        pass

    @abstractmethod
    def clear(self):
        """Clear the buffer"""
        pass

clear() abstractmethod

Clear the buffer

Source code in video_sampler/buffer.py
30
31
32
33
@abstractmethod
def clear(self):
    """Clear the buffer"""
    pass

final_flush() abstractmethod

Flush the buffer and return the remaining items

Source code in video_sampler/buffer.py
20
21
22
23
@abstractmethod
def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
    """Flush the buffer and return the remaining items"""
    pass

get_buffer_state() abstractmethod

Return the current state of the buffer

Source code in video_sampler/buffer.py
25
26
27
28
@abstractmethod
def get_buffer_state(self) -> list[str]:
    """Return the current state of the buffer"""
    pass

GridBuffer

Bases: HashBuffer

A class representing a grid-based buffer for images. Splits the image into a grid and stores the hashes of the grid cells in a mosaic buffer.

Parameters:

Name Type Description Default
size int

The maximum size of the buffer.

required
debug_flag bool

A flag indicating whether debug information should be printed.

False
hash_size int

The size of the hash.

4
grid_x int

The number of grid cells in the x-axis.

4
grid_y int

The number of grid cells in the y-axis.

4
max_hits int

The maximum number of hits allowed for a hash.

1

Attributes:

Name Type Description
grid_x int

The number of grid cells in the x-axis.

grid_y int

The number of grid cells in the y-axis.

max_hits int

The maximum number of hits allowed for a hash.

mosaic_buffer dict

A dictionary storing the mosaic buffer.

Methods:

Name Description
add

Adds an image to the buffer along with its metadata.

clear

Clears the buffer and the mosaic buffer.

update_ttl_buffer

Updates the buffer by expiring images that are not in the grid.

Source code in video_sampler/buffer.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
class GridBuffer(HashBuffer):
    """
    A class representing a grid-based buffer for images.
    Splits the image into a grid and stores the hashes of the grid cells in a mosaic buffer.

    Args:
        size (int): The maximum size of the buffer.
        debug_flag (bool, optional): A flag indicating whether debug information should be printed.
        hash_size (int, optional): The size of the hash.
        grid_x (int, optional): The number of grid cells in the x-axis.
        grid_y (int, optional): The number of grid cells in the y-axis.
        max_hits (int, optional): The maximum number of hits allowed for a hash.

    Attributes:
        grid_x (int): The number of grid cells in the x-axis.
        grid_y (int): The number of grid cells in the y-axis.
        max_hits (int): The maximum number of hits allowed for a hash.
        mosaic_buffer (dict): A dictionary storing the mosaic buffer.

    Methods:
        add(item, metadata):
            Adds an image to the buffer along with its metadata.
        clear():
            Clears the buffer and the mosaic buffer.
        update_ttl_buffer():
            Updates the buffer by expiring images that are not in the grid.

    """

    def __init__(
        self,
        size: int,
        debug_flag: bool = False,
        hash_size: int = 4,
        grid_x: int = 4,
        grid_y: int = 4,
        max_hits: int = 1,
    ) -> None:
        super().__init__(size, debug_flag, hash_size)
        self.grid_x = grid_x
        self.grid_y = grid_y
        self.max_hits = max_hits
        self.mosaic_buffer = {}

    def __get_grid_hash(self, item: Image.Image) -> Iterable[str]:
        """Compute grid hashes for a given image"""
        for x in range(self.grid_x):
            for y in range(self.grid_y):
                yield str(
                    phash(
                        item.crop(
                            (
                                x * item.width / self.grid_x,
                                y * item.height / self.grid_y,
                                (x + 1) * item.width / self.grid_x,
                                (y + 1) * item.height / self.grid_y,
                            )
                        ),
                        hash_size=self.hash_size,
                    )
                )

    def _check_mosaic(self, mosaic_hash: str):
        return mosaic_hash in self.mosaic_buffer

    def update_ttl_buffer(self):
        # expire the images that are not in the grid
        if len(self.ordered_buffer) > self.max_size:
            to_return_hash, return_data = self.ordered_buffer.popitem(last=False)
            if to_return_hash is not None:
                removal_keys = [
                    img_hash
                    for img_hash, mosaic_hash in self.mosaic_buffer.items()
                    if mosaic_hash == to_return_hash
                ]
                for key in removal_keys:
                    del self.mosaic_buffer[key]
            return return_data
        return None

    def add(self, item: Image.Image, metadata: dict[str, Any]):
        hash_ = str(phash(item, hash_size=self.hash_size))
        if not self._check_duplicate(hash_):
            # not automatically rejected, check the mosaic buffer
            hash_hits = 0
            hash_sets = []
            for el_hash_ in self.__get_grid_hash(item):
                if el_hash_ in self.mosaic_buffer:
                    hash_hits += 1
                hash_sets.append(el_hash_)

            if hash_hits < self.max_hits:
                # add image hash to the ttl counter
                self.ordered_buffer[hash_] = (item, metadata)
                # add the image to the mosaic buffer
                # this also automatically overwrites the deleted hashes
                for el_hash in hash_sets:
                    self.mosaic_buffer[el_hash] = hash_

            if self.debug_flag:
                console.print(
                    f"\tHash hits: {hash_hits}"
                    f"\tHash sets: {len(hash_sets)}"
                    f"\tHash buffer: {len(self.get_buffer_state())}"
                    f"\tMosaic buffer: {len(self.mosaic_buffer)}"
                )
        return self.update_ttl_buffer()

    def clear(self):
        super().clear()
        self.mosaic_buffer = {}

__get_grid_hash(item)

Compute grid hashes for a given image

Source code in video_sampler/buffer.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def __get_grid_hash(self, item: Image.Image) -> Iterable[str]:
    """Compute grid hashes for a given image"""
    for x in range(self.grid_x):
        for y in range(self.grid_y):
            yield str(
                phash(
                    item.crop(
                        (
                            x * item.width / self.grid_x,
                            y * item.height / self.grid_y,
                            (x + 1) * item.width / self.grid_x,
                            (y + 1) * item.height / self.grid_y,
                        )
                    ),
                    hash_size=self.hash_size,
                )
            )

GzipBuffer

Bases: FrameBuffer

Measure compression size as a function of the image usability

Source code in video_sampler/buffer.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
class GzipBuffer(FrameBuffer):
    """Measure compression size as a function of the image usability"""

    def __init__(
        self, size: int, expiry: int, debug_flag: bool = False, hash_size: int = 8
    ) -> None:
        self.sliding_top_k_buffer = SlidingTopKBuffer(
            size=size, expiry=expiry, debug_flag=debug_flag, hash_size=hash_size
        )

    def get_buffer_state(self) -> list[str]:
        return self.sliding_top_k_buffer.get_buffer_state()

    def add(self, item: Image.Image, metadata: dict[str, Any]):
        compressed_l = len(gzip.compress(item.tobytes()))
        return self.sliding_top_k_buffer.add(item, {**metadata, "index": -compressed_l})

    def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
        return self.sliding_top_k_buffer.final_flush()

    def clear(self):
        self.sliding_top_k_buffer.clear()

HashBuffer

Bases: FrameBuffer

A buffer that stores frames with their corresponding metadata and checks for duplicates based on image hashes. Args: size (int): The maximum size of the buffer. debug_flag (bool, optional): Flag indicating whether to enable debug mode. Defaults to False. hash_size (int, optional): The size of the image hash. Defaults to 4.

Methods:

Name Description
get_buffer_state

Returns the current state of the buffer as a list of image hashes.

add

Image.Image, metadata: dict[str, Any]) Adds an item to the buffer along with its metadata.

final_flush

Yields the stored items and their metadata in the buffer.

Private Methods

__add(item: Image.Image, hash_: str, metadata: dict) Adds an item to the buffer with the given hash and metadata.

__check_duplicate(hash_: str) -> bool: Checks if the given hash already exists in the buffer and renews its validity if found.

Source code in video_sampler/buffer.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class HashBuffer(FrameBuffer):
    """
    A buffer that stores frames with their corresponding metadata and
    checks for duplicates based on image hashes.
    Args:
        size (int): The maximum size of the buffer.
        debug_flag (bool, optional): Flag indicating whether to enable debug mode. Defaults to False.
        hash_size (int, optional): The size of the image hash. Defaults to 4.

    Methods:
        get_buffer_state() -> list[str]:
            Returns the current state of the buffer as a list of image hashes.

        add(item: Image.Image, metadata: dict[str, Any])
            Adds an item to the buffer along with its metadata.

        final_flush() -> Iterable[tuple[Image.Image | None, dict]]:
            Yields the stored items and their metadata in the buffer.

        clear()
            Clears the buffer.

    Private Methods:
        __add(item: Image.Image, hash_: str, metadata: dict)
            Adds an item to the buffer with the given hash and metadata.

        __check_duplicate(hash_: str) -> bool:
            Checks if the given hash already exists in the buffer and renews its validity if found.

    """

    def __init__(self, size: int, debug_flag: bool = False, hash_size: int = 4) -> None:
        self.ordered_buffer = OrderedDict()
        self.max_size = size
        self.debug_flag = debug_flag
        self.hash_size = hash_size

    def get_buffer_state(self) -> list[str]:
        return list(self.ordered_buffer.keys())

    def add(self, item: Image.Image, metadata: dict[str, Any]):
        hash_ = str(phash(item, hash_size=self.hash_size))
        if not self._check_duplicate(hash_):
            return self.__add(hash_, item, metadata)
        return None

    def __add(self, hash_: str, item: Image.Image, metadata: dict):
        self.ordered_buffer[hash_] = (item, metadata)
        if len(self.ordered_buffer) > self.max_size:
            return self.ordered_buffer.popitem(last=False)[1]
        return None

    def _check_duplicate(self, hash_: str) -> bool:
        if hash_ in self.ordered_buffer:
            # renew the hash validity
            if self.debug_flag:
                console.print(
                    f"Renewing {hash_}",
                    style=f"bold {Color.red.value}",
                )
            self.ordered_buffer.move_to_end(hash_)
            return True
        return False

    def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
        yield from self.ordered_buffer.values()

    def clear(self):
        self.ordered_buffer.clear()

    def __len__(self):
        return len(self.ordered_buffer)

    def __iter__(self):
        return iter(self.ordered_buffer)

    def __getitem__(self, key: str):
        return self.ordered_buffer[key]

    def popitem(self, last: bool = True):
        return self.ordered_buffer.popitem(last=last)

SlidingTopKBuffer

Bases: FrameBuffer

A class representing a sliding top-k buffer for frames.

Parameters:

Name Type Description Default
size int

The maximum size of the buffer.

required
debug_flag bool

A flag indicating whether debug information should be printed.

False
expiry int

The expiry count for frames.

30
hash_size int

The size of the hash.

8

Attributes:

Name Type Description
sliding_buffer list

The sliding buffer implemented as a min heap.

max_size int

The maximum size of the buffer.

debug_flag bool

A flag indicating whether debug information should be printed.

expiry_count int

The expiry count for frames.

hash_size int

The size of the hash.

Methods:

Name Description
get_buffer_state

Returns the current state of the buffer.

add

Adds a frame to the buffer along with its metadata.

final_flush

Performs a final flush of the buffer and yields the remaining frames.

clear

Clears the buffer.

Source code in video_sampler/buffer.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
class SlidingTopKBuffer(FrameBuffer):
    """
    A class representing a sliding top-k buffer for frames.

    Args:
        size (int): The maximum size of the buffer.
        debug_flag (bool, optional): A flag indicating whether debug information should be printed.
        expiry (int, optional): The expiry count for frames.
        hash_size (int, optional): The size of the hash.

    Attributes:
        sliding_buffer (list): The sliding buffer implemented as a min heap.
        max_size (int): The maximum size of the buffer.
        debug_flag (bool): A flag indicating whether debug information should be printed.
        expiry_count (int): The expiry count for frames.
        hash_size (int): The size of the hash.

    Methods:
        get_buffer_state() -> list[str]:
            Returns the current state of the buffer.
        add(item, metadata):
            Adds a frame to the buffer along with its metadata.
        final_flush() -> Iterable[tuple[Image.Image | None, dict]]:
            Performs a final flush of the buffer and yields the remaining frames.
        clear():
            Clears the buffer.

    """

    def __init__(
        self, size: int, debug_flag: bool = False, expiry: int = 30, hash_size: int = 8
    ) -> None:
        # it's a min heap with a fixed size
        self.sliding_buffer = []
        self.max_size = size
        self.debug_flag = debug_flag
        self.expiry_count = expiry
        self.hash_size = hash_size
        assert (
            self.expiry_count > self.max_size
        ), "expiry count must be greater than max size"
        console.print(
            f"Creating sliding buffer of size {self.max_size} and expiry {expiry}",
            style=f"bold {Color.red.value}",
        )

    def get_buffer_state(self) -> list[str]:
        return [item[:3] for item in self.sliding_buffer]

    def add(self, item: Image.Image, metadata: dict[str, Any]):
        assert "index" in metadata, "metadata must have index key for sliding buffer"
        average_hash_ = str(average_hash(item, hash_size=self.hash_size))
        to_return = None
        if not self.__check_duplicate(average_hash_):
            heapq.heappush(
                self.sliding_buffer,
                [metadata["index"], 0, average_hash_, item, metadata],
            )
            if len(self.sliding_buffer) >= self.max_size:
                to_return = heapq.heappop(self.sliding_buffer)[-2:]
        # update the expiry count
        expired_indx = -1
        for i in range(len(self.sliding_buffer)):
            self.sliding_buffer[i][1] += 1
            if self.sliding_buffer[i][1] >= self.expiry_count:
                expired_indx = i
        # at any point only one item can be expired
        if expired_indx != -1:
            self.sliding_buffer.pop(expired_indx)  # just drop
        return to_return

    def __check_duplicate(self, hash_: str) -> bool:
        for item in self.sliding_buffer:
            if item[2] == hash_:
                # renew the hash validity
                if self.debug_flag:
                    console.print(
                        f"Renewing {hash_}",
                        style=f"bold {Color.red.value}",
                    )
                item[1] = 0
                return True
        return False

    def final_flush(self) -> Iterable[tuple[Image.Image | None, dict]]:
        if len(self.sliding_buffer):
            yield heapq.heappop(self.sliding_buffer)[-2:]
        yield None, {}

    def clear(self):
        self.sliding_buffer.clear()

create_buffer(buffer_config)

Create a buffer based on the config

Source code in video_sampler/buffer.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def create_buffer(buffer_config: dict[str, Any]) -> FrameBuffer:
    """Create a buffer based on the config"""
    check_buffer_config(buffer_config)
    console.print(
        f"Creating buffer of type {buffer_config['type']}",
        style=f"bold {Color.red.value}",
    )
    if buffer_config["type"] == "hash":
        return HashBuffer(
            size=buffer_config["size"],
            debug_flag=buffer_config.get("debug", False),
            hash_size=buffer_config["hash_size"],
        )
    elif buffer_config["type"] == "grid":
        return GridBuffer(
            size=buffer_config["size"],
            debug_flag=buffer_config.get("debug", False),
            hash_size=buffer_config["hash_size"],
            grid_x=buffer_config["grid_x"],
            grid_y=buffer_config["grid_y"],
            max_hits=buffer_config["max_hits"],
        )
    elif buffer_config["type"] == "sliding_top_k":
        return SlidingTopKBuffer(
            size=buffer_config["size"],
            debug_flag=buffer_config.get("debug", False),
            expiry=buffer_config["expiry"],
        )
    elif buffer_config["type"] == "passthrough":
        return PassThroughBuffer()
    elif buffer_config["type"] == "gzip":
        return GzipBuffer(
            size=buffer_config["size"],
            debug_flag=buffer_config.get("debug", False),
            hash_size=buffer_config["hash_size"],
            expiry=buffer_config["expiry"],
        )
    elif buffer_config["type"] == "entropy":
        return EntropyByffer(
            size=buffer_config["size"],
            debug_flag=buffer_config.get("debug", False),
            hash_size=buffer_config["hash_size"],
            expiry=buffer_config["expiry"],
        )
    else:
        raise ValueError(f"Unknown buffer type {buffer_config['type']}")