Source code for models.DataQueue
#
# TI Voxel Viewer component.
#
# Copyright (c) 2014 Texas Instruments Inc.
#
from collections import deque
from threading import Condition
import numpy as np
[docs]class DataQueue(object):
def __init__(self, maxlen = 3):
self.queue = deque(maxlen = maxlen)
self.maxlen = maxlen
self.condition = Condition()
def _put(self, item):
self.queue.append(item)
[docs] def put(self, item):
self.condition.acquire()
self._put(item)
self.condition.notify()
self.condition.release()
def _get(self):
return self.queue.popleft()
[docs] def get(self, timeout = None):
self.condition.acquire()
while not len(self.queue):
self.condition.wait(timeout)
if timeout is not None and not len(self.queue): # timed out?
self.condition.release()
return None
item = self._get()
self.condition.release()
return item
[docs] def tryGet(self):
self.condition.acquire()
if len(self.queue):
item = self._get()
else:
item = None
self.condition.release()
return item
[docs] def clear(self):
self.queue.clear()