PATH: //proc/thread-self/root/opt/alt/python311/lib64/python3.11/test/test_asyncio
FILE_BARU
CREATE
FOLDER_BARU
MKDIR
UPLOAD_FILE
GO
[ .. KEMBALI ]
📄 __init__.py
↓
X
📄 __main__.py
↓
X
📁 __pycache__/
X
📄 echo.py
↓
X
📄 echo2.py
↓
X
📄 echo3.py
↓
X
📄 functional.py
↓
X
📄 test_base_events.py
↓
X
📄 test_buffered_proto.py
↓
X
📄 test_context.py
↓
X
📄 test_events.py
↓
X
📄 test_futures.py
↓
X
📄 test_futures2.py
↓
X
📄 test_locks.py
↓
X
📄 test_pep492.py
↓
X
📄 test_proactor_events.py
↓
X
📄 test_protocols.py
↓
X
📄 test_queues.py
↓
X
📄 test_runners.py
↓
X
📄 test_selector_events.py
↓
X
📄 test_sendfile.py
↓
X
📄 test_server.py
↓
X
📄 test_sock_lowlevel.py
↓
X
📄 test_ssl.py
↓
X
📄 test_sslproto.py
↓
X
📄 test_streams.py
↓
X
📄 test_subprocess.py
↓
X
📄 test_taskgroups.py
↓
X
📄 test_tasks.py
↓
X
📄 test_threads.py
↓
X
📄 test_timeouts.py
↓
X
📄 test_transports.py
↓
X
📄 test_unix_events.py
↓
X
📄 test_waitfor.py
↓
X
📄 test_windows_events.py
↓
X
📄 test_windows_utils.py
↓
X
📄 utils.py
↓
X
SAVING...
BERHASIL DIUBAH!
EDITING: test_threads.py
"""Tests for asyncio/threads.py""" import asyncio import unittest from contextvars import ContextVar from unittest import mock def tearDownModule(): asyncio.set_event_loop_policy(None) class ToThreadTests(unittest.IsolatedAsyncioTestCase): async def test_to_thread(self): result = await asyncio.to_thread(sum, [40, 2]) self.assertEqual(result, 42) async def test_to_thread_exception(self): def raise_runtime(): raise RuntimeError("test") with self.assertRaisesRegex(RuntimeError, "test"): await asyncio.to_thread(raise_runtime) async def test_to_thread_once(self): func = mock.Mock() await asyncio.to_thread(func) func.assert_called_once() async def test_to_thread_concurrent(self): func = mock.Mock() futs = [] for _ in range(10): fut = asyncio.to_thread(func) futs.append(fut) await asyncio.gather(*futs) self.assertEqual(func.call_count, 10) async def test_to_thread_args_kwargs(self): # Unlike run_in_executor(), to_thread() should directly accept kwargs. func = mock.Mock() await asyncio.to_thread(func, 'test', something=True) func.assert_called_once_with('test', something=True) async def test_to_thread_contextvars(self): test_ctx = ContextVar('test_ctx') def get_ctx(): return test_ctx.get() test_ctx.set('parrot') result = await asyncio.to_thread(get_ctx) self.assertEqual(result, 'parrot') if __name__ == "__main__": unittest.main()
SIMPAN PERUBAHAN