Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -869,10 +869,13 @@ Note that one can also create a shared queue by using a manager object -- see
bother you then you can instead use a queue created with a
:ref:`manager <multiprocessing-managers>`.

(1) After putting an object on an empty queue there may be an
infinitesimal delay before the queue's :meth:`~Queue.empty`
method returns :const:`False` and :meth:`~Queue.get_nowait` can
return without raising :exc:`queue.Empty`.
(1) After putting an object on an empty queue there may be a delay
before :meth:`~Queue.get_nowait` can return without raising
:exc:`queue.Empty`, because the feeder thread flushes objects to
the underlying pipe asynchronously. On platforms where
``sem_getvalue()`` is not implemented (for example macOS), the
queue's :meth:`~Queue.empty` method may also remain :const:`True`
during this delay.

(2) If multiple processes are enqueuing objects, it is possible for
the objects to be received at the other end out-of-order.
Expand Down Expand Up @@ -947,8 +950,17 @@ For an example of the usage of queues for interprocess communication see
Return ``True`` if the queue is empty, ``False`` otherwise. Because of
multithreading/multiprocessing semantics, this is not reliable.

On platforms where ``sem_getvalue()`` is implemented, this method
uses the same approximate size accounting as :meth:`~Queue.qsize`.
Otherwise, it may report ``True`` while items are still buffered and
waiting to be flushed to the underlying pipe.

May raise an :exc:`OSError` on closed queues. (not guaranteed)

.. versionchanged:: 3.15
On platforms where ``sem_getvalue()`` is implemented, this method
now uses semaphore-based queue size accounting.

.. method:: full()

Return ``True`` if the queue is full, ``False`` otherwise. Because of
Expand Down
12 changes: 11 additions & 1 deletion Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,17 @@ def qsize(self):
return self._maxsize - self._sem.get_value()

def empty(self):
return not self._poll()
# Preserve the historical "closed queue may raise OSError" behavior.
# q.close() is a no-op for unused queues, so this only raises once the
# reader end has actually been closed.
if self._closed:
self._poll()

try:
return self._sem.get_value() == self._maxsize
except NotImplementedError:
# Fallback for platforms without sem_getvalue() (for example macOS).
return not self._poll()

def full(self):
return self._sem._semlock._is_zero()
Expand Down
36 changes: 35 additions & 1 deletion Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,18 @@ def test_get(self):
break
self.assertEqual(queue_empty(queue), False)

self.assertEqual(queue.get_nowait(), 1)
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
try:
value = queue.get_nowait()
except pyqueue.Empty:
# Queue.empty() may become false before the feeder thread
# flushes objects to the pipe.
continue
else:
break
else:
self.fail("queue.get_nowait() unexpectedly raised Empty")
self.assertEqual(value, 1)
self.assertEqual(queue.get(True, None), 2)
self.assertEqual(queue.get(True), 3)
self.assertEqual(queue.get(timeout=1), 4)
Expand Down Expand Up @@ -1320,6 +1331,29 @@ def test_qsize(self):
self.assertEqual(q.qsize(), 0)
close_queue(q)

def test_empty_uses_semaphore_count(self):
if self.TYPE != 'processes':
self.skipTest(f'test not appropriate for {self.TYPE}')

q = self.Queue()
try:
q._sem.get_value()
except NotImplementedError:
close_queue(q)
self.skipTest('sem_getvalue not implemented on this platform')

q.put('sentinel')
original_poll = q._poll
q._poll = lambda timeout=0.0: False
try:
self.assertFalse(q.empty())
finally:
q._poll = original_poll

self.assertEqual(q.get(timeout=support.SHORT_TIMEOUT), 'sentinel')
self.assertTrue(q.empty())
close_queue(q)

@classmethod
def _test_task_done(cls, q):
for obj in iter(q.get, None):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fixed :meth:`multiprocessing.Queue.empty` to use semaphore-based size
accounting on platforms that support ``sem_getvalue()``, making its behavior
consistent with :meth:`multiprocessing.Queue.qsize` and
:meth:`multiprocessing.Queue.full`.
Loading