o
    i                     @   sX   d Z ddlZddlZddlZddlZddlmZ G dd deZedkr*e	  dS dS )u  
Tests for greenlet behavior during interpreter shutdown (Py_FinalizeEx).

During interpreter shutdown, several greenlet code paths can access
partially-destroyed Python state, leading to SIGSEGV.  Two independent
guards protect against this on ALL Python versions:

  1. g_greenlet_shutting_down — set by an atexit handler registered at
     greenlet import time (LIFO = runs before other cleanup).  Covers
     the atexit phase of Py_FinalizeEx, where _Py_IsFinalizing() is
     still False on all Python versions.

  2. Py_IsFinalizing() — covers the GC collection and later phases of
     Py_FinalizeEx, where __del__ methods and destructor code run.

These tests are organized into four groups:

  A. Core safety (smoke): no crashes with active greenlets at shutdown.
  B. Cleanup semantics: GreenletExit / finally still works during
     normal thread exit (the standard production path).
  C. Atexit "still works" tests: getcurrent() / greenlet construction
     during atexit handlers registered AFTER greenlet import (i.e.
     BEFORE greenlet's cleanup handler in LIFO order) must return
     valid objects — verifies the guards don't over-block.
  D. TDD-certified regression tests: getcurrent() must return None
     when called AFTER greenlet's cleanup (GC finalization phase
     or late atexit phase).  These tests fail on greenlet 3.3.2
     and pass with the fix across Python 3.10-3.14.
    N)TestCasec                   @   s   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-S ).TestInterpreterShutdownc                 C   s6   t |}tjtjd|gddddd}|j|j|jfS )z
        Run a Python script in a subprocess that exercises greenlet
        during interpreter shutdown. Returns (returncode, stdout, stderr).
        z-cT   F)capture_outputtexttimeoutcheck)	textwrapdedent
subprocessrunsys
executable
returncodestdoutstderr)selfscript_bodyfull_scriptresult r   h/var/www/html/arapca_proje/venv/lib/python3.10/site-packages/greenlet/tests/test_interpreter_shutdown.py_run_shutdown_script)   s   

z,TestInterpreterShutdown._run_shutdown_scriptc              	   C   >   |  d\}}}| |dd| d| |  | d| dS )a8  
        An active (suspended) greenlet that is deallocated during
        interpreter shutdown should not crash the process.

        Before the fix, this would SIGSEGV on Python < 3.11 because
        _green_dealloc_kill_started_non_main_greenlet tried to call
        g_switch() during Py_FinalizeEx.
        aT              import greenlet

            def worker():
                greenlet.getcurrent().parent.switch("from worker")
                return "done"

            g = greenlet.greenlet(worker)
            result = g.switch()
            assert result == "from worker", result
            print("OK: exiting with active greenlet")
        r   Process crashed (rc=):
z OK: exiting with active greenletNr   assertEqualassertInr   rcr   r   r   r   r   )test_active_greenlet_at_shutdown_no_crash<   s   	zATestInterpreterShutdown.test_active_greenlet_at_shutdown_no_crashc              	   C   r   )zm
        Multiple suspended greenlets at shutdown should all be cleaned
        up without crashing.
        a              import greenlet

            def worker(name):
                greenlet.getcurrent().parent.switch(f"hello from {name}")
                return "done"

            greenlets = []
            for i in range(10):
                g = greenlet.greenlet(worker)
                result = g.switch(f"g{i}")
                greenlets.append(g)

            print(f"OK: {len(greenlets)} active greenlets at shutdown")
        r   r   r   z#OK: 10 active greenlets at shutdownNr   r   r   r   r   *test_multiple_active_greenlets_at_shutdownT   s   zBTestInterpreterShutdown.test_multiple_active_greenlets_at_shutdownc              	   C   r   )zQ
        Nested (chained parent) greenlets at shutdown should not crash.
        a              import greenlet

            def inner():
                greenlet.getcurrent().parent.switch("inner done")

            def outer():
                g_inner = greenlet.greenlet(inner)
                g_inner.switch()
                greenlet.getcurrent().parent.switch("outer done")

            g = greenlet.greenlet(outer)
            result = g.switch()
            assert result == "outer done", result
            print("OK: nested greenlets at shutdown")
        r   r   r   z OK: nested greenlets at shutdownNr   r   r   r   r   !test_nested_greenlets_at_shutdownk   s   z9TestInterpreterShutdown.test_nested_greenlets_at_shutdownc              	   C   r   )zm
        Greenlets in worker threads that are still referenced at
        shutdown should not crash.
        a              import greenlet
            import threading

            results = []

            def thread_worker():
                def greenlet_func():
                    greenlet.getcurrent().parent.switch("from thread greenlet")
                    return "done"

                g = greenlet.greenlet(greenlet_func)
                val = g.switch()
                results.append((g, val))

            threads = []
            for _ in range(3):
                t = threading.Thread(target=thread_worker)
                t.start()
                threads.append(t)

            for t in threads:
                t.join()

            print(f"OK: {len(results)} threaded greenlets at shutdown")
        r   r   r   z$OK: 3 threaded greenlets at shutdownNr   r   r   r   r   #test_threaded_greenlets_at_shutdown   s   z;TestInterpreterShutdown.test_threaded_greenlets_at_shutdownc              	   C   J   |  d\}}}| |dd| d| |  | d| | d| dS )z
        When a thread exits normally while holding active greenlets,
        GreenletExit IS thrown and cleanup code runs.  This is the
        standard cleanup path used in production (e.g. uWSGI worker
        threads finishing a request).
        a4              import os
            import threading
            import greenlet

            _write = os.write

            def thread_func():
                def worker(_w=_write,
                           _GreenletExit=greenlet.GreenletExit):
                    try:
                        greenlet.getcurrent().parent.switch("suspended")
                    except _GreenletExit:
                        _w(1, b"CLEANUP: GreenletExit caught\n")
                        raise

                g = greenlet.greenlet(worker)
                g.switch()
                # Thread exits with active greenlet -> thread-state
                # cleanup triggers GreenletExit

            t = threading.Thread(target=thread_func)
            t.start()
            t.join()
            print("OK: thread cleanup done")
        r   r   r   OK: thread cleanup donezCLEANUP: GreenletExit caughtNr   r   r   r   r   (test_greenlet_cleanup_during_thread_exit   s   z@TestInterpreterShutdown.test_greenlet_cleanup_during_thread_exitc              	   C   r%   )zl
        try/finally blocks in active greenlets run correctly when the
        owning thread exits.
        aR              import os
            import threading
            import greenlet

            _write = os.write

            def thread_func():
                def worker(_w=_write):
                    try:
                        greenlet.getcurrent().parent.switch("suspended")
                    finally:
                        _w(1, b"FINALLY: cleanup executed\n")

                g = greenlet.greenlet(worker)
                g.switch()

            t = threading.Thread(target=thread_func)
            t.start()
            t.join()
            print("OK: thread cleanup done")
        r   r   r   r&   zFINALLY: cleanup executedNr   r   r   r   r   %test_finally_block_during_thread_exit   s   z=TestInterpreterShutdown.test_finally_block_during_thread_exitc              	   C   r   )z
        Stress test: many active greenlets with cleanup code at shutdown.
        Ensures no crashes regardless of deallocation order.
        a              import sys
            import greenlet

            cleanup_count = 0

            def worker(idx):
                global cleanup_count
                try:
                    greenlet.getcurrent().parent.switch(f"ready-{idx}")
                except greenlet.GreenletExit:
                    cleanup_count += 1
                    raise

            greenlets = []
            for i in range(50):
                g = greenlet.greenlet(worker)
                result = g.switch(i)
                greenlets.append(g)

            print(f"OK: {len(greenlets)} greenlets about to shut down")
            # Note: we can't easily print cleanup_count during shutdown
            # since it happens after the main module's code runs.
        r   r   r   z#OK: 50 greenlets about to shut downNr   r   r   r   r   ,test_many_greenlets_with_cleanup_at_shutdown   s   zDTestInterpreterShutdown.test_many_greenlets_with_cleanup_at_shutdownc              	   C   r   )z
        Deeply nested greenlet parent chains at shutdown.
        Tests that the deallocation order doesn't cause issues.
        a              import greenlet

            def level(depth, max_depth):
                if depth < max_depth:
                    g = greenlet.greenlet(level)
                    g.switch(depth + 1, max_depth)
                greenlet.getcurrent().parent.switch(f"depth-{depth}")

            g = greenlet.greenlet(level)
            result = g.switch(0, 10)
            print(f"OK: nested to depth 10, got {result}")
        r   r   r   zOK: nested to depth 10Nr   r   r   r   r   (test_deeply_nested_greenlets_at_shutdown  s   z@TestInterpreterShutdown.test_deeply_nested_greenlets_at_shutdownc              	   C   r   )z
        A greenlet that has an active exception context when it's
        suspended should not crash during shutdown cleanup.
        a              import greenlet

            def worker():
                try:
                    raise ValueError("test error")
                except ValueError:
                    # Suspend while an exception is active on the stack
                    greenlet.getcurrent().parent.switch("suspended with exc")
                return "done"

            g = greenlet.greenlet(worker)
            result = g.switch()
            assert result == "suspended with exc"
            print("OK: greenlet with active exception at shutdown")
        r   r   r   z.OK: greenlet with active exception at shutdownNr   r   r   r   r   (test_greenlet_with_traceback_at_shutdown&  s   z@TestInterpreterShutdown.test_greenlet_with_traceback_at_shutdownc              	   C   L   |  d\}}}| |dd| d| |  | d| | d|d dS )	z
        getcurrent() in an atexit handler registered AFTER greenlet
        import must return a valid greenlet (not None), because LIFO
        ordering means this handler runs BEFORE greenlet's cleanup.
        a              import atexit
            import greenlet

            def call_getcurrent_at_exit():
                try:
                    g = greenlet.getcurrent()
                    if g is not None and type(g).__name__ == 'greenlet':
                        print(f"OK: getcurrent returned valid greenlet")
                    elif g is None:
                        print("FAIL: getcurrent returned None (over-blocked)")
                    else:
                        print(f"FAIL: unexpected {g!r}")
                except Exception as e:
                    print(f"OK: getcurrent raised {type(e).__name__}: {e}")

            atexit.register(call_getcurrent_at_exit)
            print("OK: atexit registered")
        r   r   r   OK: atexit registered&OK: getcurrent returned valid greenletigetcurrent() should return a valid greenlet when called before greenlet's cleanup handler (LIFO ordering)Nr   r   r   r   r   &test_getcurrent_during_atexit_no_crashJ  s   z>TestInterpreterShutdown.test_getcurrent_during_atexit_no_crashc              	   C   r   )zK
        Calling greenlet.gettrace() during atexit must not crash.
        a              import atexit
            import greenlet

            def check_at_exit():
                try:
                    result = greenlet.gettrace()
                    print(f"OK: gettrace returned {result!r}")
                except Exception as e:
                    print(f"OK: gettrace raised {type(e).__name__}: {e}")

            atexit.register(check_at_exit)
            print("OK: registered")
        r   r   r   OK: registeredNr   r   r   r   r   $test_gettrace_during_atexit_no_crashi     z<TestInterpreterShutdown.test_gettrace_during_atexit_no_crashc              	   C   r   )zK
        Calling greenlet.settrace() during atexit must not crash.
        a              import atexit
            import greenlet

            def check_at_exit():
                try:
                    greenlet.settrace(lambda *args: None)
                    print("OK: settrace succeeded")
                except Exception as e:
                    print(f"OK: settrace raised {type(e).__name__}: {e}")

            atexit.register(check_at_exit)
            print("OK: registered")
        r   r   r   r1   Nr   r   r   r   r   $test_settrace_during_atexit_no_crash~  r3   z<TestInterpreterShutdown.test_settrace_during_atexit_no_crashc              	   C   r,   )	z
        getcurrent() during atexit (registered after import) with active
        greenlets must still return a valid greenlet, since LIFO means
        this runs before greenlet's cleanup.
        a              import atexit
            import greenlet

            def worker():
                greenlet.getcurrent().parent.switch("ready")

            greenlets = []
            for i in range(5):
                g = greenlet.greenlet(worker)
                result = g.switch()
                greenlets.append(g)

            def check_at_exit():
                try:
                    g = greenlet.getcurrent()
                    if g is not None and type(g).__name__ == 'greenlet':
                        print(f"OK: getcurrent returned valid greenlet")
                    elif g is None:
                        print("FAIL: getcurrent returned None (over-blocked)")
                    else:
                        print(f"FAIL: unexpected {g!r}")
                except Exception as e:
                    print(f"OK: getcurrent raised {type(e).__name__}: {e}")

            atexit.register(check_at_exit)
            print(f"OK: {len(greenlets)} active greenlets, atexit registered")
        r   r   r   z)OK: 5 active greenlets, atexit registeredr.   r/   Nr   r   r   r   r   3test_getcurrent_with_active_greenlets_during_atexit  s   zKTestInterpreterShutdown.test_getcurrent_with_active_greenlets_during_atexitc              	   C   r%   )z
        Constructing a new greenlet during atexit (registered after
        import) must succeed, since this runs before greenlet's cleanup.
        a              import atexit
            import greenlet

            def create_greenlets_at_exit():
                try:
                    def noop():
                        pass
                    g = greenlet.greenlet(noop)
                    if g is not None:
                        print(f"OK: created greenlet successfully")
                    else:
                        print("FAIL: greenlet() returned None")
                except Exception as e:
                    print(f"OK: construction raised {type(e).__name__}: {e}")

            atexit.register(create_greenlets_at_exit)
            print("OK: atexit registered")
        r   r   r   r-   z!OK: created greenlet successfullyNr   r   r   r   r   1test_greenlet_construction_during_atexit_no_crash  s   zITestInterpreterShutdown.test_greenlet_construction_during_atexit_no_crashc              	   C   r   )z
        Constructing new greenlets during atexit when other active
        greenlets already exist (maximizes the chance of a non-empty
        deleteme list).
        af              import atexit
            import greenlet

            def worker():
                greenlet.getcurrent().parent.switch("ready")

            greenlets = []
            for i in range(10):
                g = greenlet.greenlet(worker)
                g.switch()
                greenlets.append(g)

            def create_at_exit():
                try:
                    new_greenlets = []
                    for i in range(5):
                        g = greenlet.greenlet(lambda: None)
                        new_greenlets.append(g)
                    print(f"OK: created {len(new_greenlets)} greenlets at exit")
                except Exception as e:
                    print(f"OK: raised {type(e).__name__}: {e}")

            atexit.register(create_at_exit)
            print(f"OK: {len(greenlets)} active greenlets, atexit registered")
        r   r   r   *OK: 10 active greenlets, atexit registeredNr   r   r   r   r   >test_greenlet_construction_with_active_greenlets_during_atexit  s   zVTestInterpreterShutdown.test_greenlet_construction_with_active_greenlets_during_atexitc              	   C   r   )u  
        Create greenlets in a worker thread, transfer them to the main
        thread, then drop them — populating the deleteme list. Then
        construct a new greenlet during atexit. On Python < 3.11
        clear_deleteme_list() could previously crash if the
        PythonAllocator vector copy failed during early Py_FinalizeEx;
        using std::swap eliminates that allocation.
        a              import atexit
            import greenlet
            import threading

            cross_thread_refs = []

            def thread_worker():
                # Create greenlets in this thread
                def gl_body():
                    greenlet.getcurrent().parent.switch("ready")
                for _ in range(20):
                    g = greenlet.greenlet(gl_body)
                    g.switch()
                    cross_thread_refs.append(g)

            t = threading.Thread(target=thread_worker)
            t.start()
            t.join()

            # Dropping these references in the main thread
            # causes them to be added to the main thread's
            # deleteme list (deferred cross-thread dealloc).
            cross_thread_refs.clear()

            def create_at_exit():
                try:
                    g = greenlet.greenlet(lambda: None)
                    print(f"OK: created greenlet at exit {g!r}")
                except Exception as e:
                    print(f"OK: raised {type(e).__name__}: {e}")

            atexit.register(create_at_exit)
            print("OK: cross-thread setup done, atexit registered")
        r   r   r   z.OK: cross-thread setup done, atexit registeredNr   r   r   r   r   Ctest_greenlet_construction_with_cross_thread_deleteme_during_atexit  s   	#z[TestInterpreterShutdown.test_greenlet_construction_with_cross_thread_deleteme_during_atexitc              	   C   r,   )	u  
        greenlet.getcurrent() must return None when called from a
        __del__ method during Py_FinalizeEx's GC collection pass.

        On Python >= 3.11, _Py_IsFinalizing() is True during this
        phase.  Without the Py_IsFinalizing() guard in mod_getcurrent,
        this would return a greenlet — the same unguarded code path
        that leads to SIGSEGV in production (uWSGI worker recycling).
        ax              import gc
            import os
            import greenlet

            gc.disable()

            class CleanupChecker:
                def __del__(self):
                    try:
                        cur = greenlet.getcurrent()
                        if cur is None:
                            os.write(1, b"GUARDED: getcurrent=None\n")
                        else:
                            os.write(1, b"UNGUARDED: getcurrent="
                                     + type(cur).__name__.encode() + b"\n")
                    except Exception as e:
                        os.write(1, b"EXCEPTION: " + str(e).encode() + b"\n")

            # Reference cycle: only collected during Py_FinalizeEx GC pass
            a = CleanupChecker()
            b = {"ref": a}
            a._cycle = b
            del a, b

            print("OK: deferred cycle created")
        r   r   r   zOK: deferred cycle createdGUARDED: getcurrent=Nonetgetcurrent() must return None during GC finalization; returned a live object instead (missing Py_IsFinalizing guard)Nr   r   r   r   r   3test_getcurrent_returns_none_during_gc_finalization:  s   
zKTestInterpreterShutdown.test_getcurrent_returns_none_during_gc_finalizationc              	   C   r,   )	z
        Same as above but with active greenlets at shutdown, which
        increases the amount of C++ destructor work during finalization.
        a              import gc
            import os
            import greenlet

            gc.disable()

            class CleanupChecker:
                def __del__(self):
                    try:
                        cur = greenlet.getcurrent()
                        if cur is None:
                            os.write(1, b"GUARDED: getcurrent=None\n")
                        else:
                            os.write(1, b"UNGUARDED: getcurrent="
                                     + type(cur).__name__.encode() + b"\n")
                    except Exception as e:
                        os.write(1, b"EXCEPTION: " + str(e).encode() + b"\n")

            # Create active greenlets
            greenlets = []
            for _ in range(10):
                def worker():
                    greenlet.getcurrent().parent.switch("suspended")
                g = greenlet.greenlet(worker)
                g.switch()
                greenlets.append(g)

            # Reference cycle deferred to Py_FinalizeEx
            a = CleanupChecker()
            b = {"ref": a}
            a._cycle = b
            del a, b

            print(f"OK: {len(greenlets)} active greenlets, cycle deferred")
        r   r   r   z'OK: 10 active greenlets, cycle deferredr:   r;   Nr   r   r   r   r   Itest_getcurrent_returns_none_during_gc_finalization_with_active_greenletse  s   $zaTestInterpreterShutdown.test_getcurrent_returns_none_during_gc_finalization_with_active_greenletsc              	   C   r,   )	a0  
        Combines cross-thread greenlet deallocation (deleteme list)
        with the GC finalization check.  This simulates the production
        scenario where uWSGI worker threads create greenlets that are
        transferred to the main thread, then cleaned up during
        Py_FinalizeEx.
        a{              import gc
            import os
            import threading
            import greenlet

            gc.disable()

            class CleanupChecker:
                def __del__(self):
                    try:
                        cur = greenlet.getcurrent()
                        if cur is None:
                            os.write(1, b"GUARDED: getcurrent=None\n")
                        else:
                            os.write(1, b"UNGUARDED: getcurrent="
                                     + type(cur).__name__.encode() + b"\n")
                    except Exception as e:
                        os.write(1, b"EXCEPTION: " + str(e).encode() + b"\n")

            # Create cross-thread greenlet references
            cross_refs = []
            def thread_fn():
                for _ in range(20):
                    def body():
                        greenlet.getcurrent().parent.switch("x")
                    g = greenlet.greenlet(body)
                    g.switch()
                    cross_refs.append(g)
            t = threading.Thread(target=thread_fn)
            t.start()
            t.join()
            cross_refs.clear()

            # Reference cycle deferred to Py_FinalizeEx
            a = CleanupChecker()
            b = {"ref": a}
            a._cycle = b
            del a, b

            print("OK: cross-thread cleanup + cycle deferred")
        r   r   r   z)OK: cross-thread cleanup + cycle deferredr:   r;   Nr   r   r   r   r   @test_getcurrent_returns_none_during_gc_finalization_cross_thread  s   *zXTestInterpreterShutdown.test_getcurrent_returns_none_during_gc_finalization_cross_threadc              	   C   r,   )	a  
        greenlet.getcurrent() must return None when called from an
        atexit handler that runs AFTER greenlet's own atexit handler.

        This tests the g_greenlet_shutting_down flag, which is needed
        because _Py_IsFinalizing() is still False during the atexit
        phase on ALL Python versions.  Without g_greenlet_shutting_down,
        getcurrent() proceeds unguarded into partially-torn-down state.
        u              import atexit
            import os

            def late_checker():
                try:
                    import greenlet
                    cur = greenlet.getcurrent()
                    if cur is None:
                        os.write(1, b"GUARDED: getcurrent=None\n")
                    else:
                        os.write(1, b"UNGUARDED: getcurrent="
                                 + type(cur).__name__.encode() + b"\n")
                except Exception as e:
                    os.write(1, b"EXCEPTION: " + str(e).encode() + b"\n")

            # Register BEFORE importing greenlet.  LIFO order:
            # greenlet's handler (registered at import) runs FIRST,
            # late_checker runs SECOND — seeing the flag already set.
            atexit.register(late_checker)

            import greenlet
            print("OK: atexit registered before greenlet import")
        r   r   r   z,OK: atexit registered before greenlet importr:   getcurrent() must return None during atexit phase; returned a live object instead (missing g_greenlet_shutting_down atexit handler)Nr   r   r   r   r   0test_getcurrent_returns_none_during_atexit_phase  s   
zHTestInterpreterShutdown.test_getcurrent_returns_none_during_atexit_phasec              	   C   r,   )	z
        Same as above but with active greenlets, ensuring the atexit
        guard works even when there is greenlet state to clean up.
        a              import atexit
            import os

            def late_checker():
                try:
                    import greenlet
                    cur = greenlet.getcurrent()
                    if cur is None:
                        os.write(1, b"GUARDED: getcurrent=None\n")
                    else:
                        os.write(1, b"UNGUARDED: getcurrent="
                                 + type(cur).__name__.encode() + b"\n")
                except Exception as e:
                    os.write(1, b"EXCEPTION: " + str(e).encode() + b"\n")

            atexit.register(late_checker)

            import greenlet

            greenlets = []
            for _ in range(10):
                def worker():
                    greenlet.getcurrent().parent.switch("parked")
                g = greenlet.greenlet(worker)
                g.switch()
                greenlets.append(g)

            print(f"OK: {len(greenlets)} active greenlets, atexit registered")
        r   r   r   r7   r:   r?   Nr   r   r   r   r   Ftest_getcurrent_returns_none_during_atexit_phase_with_active_greenlets
  s   z^TestInterpreterShutdown.test_getcurrent_returns_none_during_atexit_phase_with_active_greenletsN)__name__
__module____qualname__r   r!   r"   r#   r$   r'   r(   r)   r*   r+   r0   r2   r4   r5   r6   r8   r9   r<   r=   r>   r@   rA   r   r   r   r   r   '   s.    +% $(#@+/M)r   __main__)
__doc__r   r   unittestr	   greenlet.testsr   r   rB   mainr   r   r   r   <module>   s          