diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 2b67d10d7bf1b7..23aee321a6a7e6 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -869,10 +869,13 @@ Note that one can also create a shared queue by using a manager object -- see bother you then you can instead use a queue created with a :ref:`manager `. - (1) After putting an object on an empty queue there may be an - infinitesimal delay before the queue's :meth:`~Queue.empty` - method returns :const:`False` and :meth:`~Queue.get_nowait` can - return without raising :exc:`queue.Empty`. + (1) After putting an object on an empty queue there may be a delay + before :meth:`~Queue.get_nowait` can return without raising + :exc:`queue.Empty`, because the feeder thread flushes objects to + the underlying pipe asynchronously. On platforms where + ``sem_getvalue()`` is not implemented (for example macOS), the + queue's :meth:`~Queue.empty` method may also remain :const:`True` + during this delay. (2) If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. @@ -947,8 +950,17 @@ For an example of the usage of queues for interprocess communication see Return ``True`` if the queue is empty, ``False`` otherwise. Because of multithreading/multiprocessing semantics, this is not reliable. + On platforms where ``sem_getvalue()`` is implemented, this method + uses the same approximate size accounting as :meth:`~Queue.qsize`. + Otherwise, it may report ``True`` while items are still buffered and + waiting to be flushed to the underlying pipe. + May raise an :exc:`OSError` on closed queues. (not guaranteed) + .. versionchanged:: 3.15 + On platforms where ``sem_getvalue()`` is implemented, this method + now uses semaphore-based queue size accounting. + .. method:: full() Return ``True`` if the queue is full, ``False`` otherwise. Because of diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 981599acf5ef26..d7bfc6ca0665f0 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -124,7 +124,17 @@ def qsize(self): return self._maxsize - self._sem.get_value() def empty(self): - return not self._poll() + # Preserve the historical "closed queue may raise OSError" behavior. + # q.close() is a no-op for unused queues, so this only raises once the + # reader end has actually been closed. + if self._closed: + self._poll() + + try: + return self._sem.get_value() == self._maxsize + except NotImplementedError: + # Fallback for platforms without sem_getvalue() (for example macOS). + return not self._poll() def full(self): return self._sem._semlock._is_zero() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index cc07062eee6f98..9ba81be2c1b5fa 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1235,7 +1235,18 @@ def test_get(self): break self.assertEqual(queue_empty(queue), False) - self.assertEqual(queue.get_nowait(), 1) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + try: + value = queue.get_nowait() + except pyqueue.Empty: + # Queue.empty() may become false before the feeder thread + # flushes objects to the pipe. + continue + else: + break + else: + self.fail("queue.get_nowait() unexpectedly raised Empty") + self.assertEqual(value, 1) self.assertEqual(queue.get(True, None), 2) self.assertEqual(queue.get(True), 3) self.assertEqual(queue.get(timeout=1), 4) @@ -1320,6 +1331,29 @@ def test_qsize(self): self.assertEqual(q.qsize(), 0) close_queue(q) + def test_empty_uses_semaphore_count(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + q = self.Queue() + try: + q._sem.get_value() + except NotImplementedError: + close_queue(q) + self.skipTest('sem_getvalue not implemented on this platform') + + q.put('sentinel') + original_poll = q._poll + q._poll = lambda timeout=0.0: False + try: + self.assertFalse(q.empty()) + finally: + q._poll = original_poll + + self.assertEqual(q.get(timeout=support.SHORT_TIMEOUT), 'sentinel') + self.assertTrue(q.empty()) + close_queue(q) + @classmethod def _test_task_done(cls, q): for obj in iter(q.get, None): diff --git a/Misc/NEWS.d/next/Library/2026-02-16-11-20-00.gh-issue-142837.xP4kLm.rst b/Misc/NEWS.d/next/Library/2026-02-16-11-20-00.gh-issue-142837.xP4kLm.rst new file mode 100644 index 00000000000000..69bc73c5e5c282 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-02-16-11-20-00.gh-issue-142837.xP4kLm.rst @@ -0,0 +1,4 @@ +Fixed :meth:`multiprocessing.Queue.empty` to use semaphore-based size +accounting on platforms that support ``sem_getvalue()``, making its behavior +consistent with :meth:`multiprocessing.Queue.qsize` and +:meth:`multiprocessing.Queue.full`.