python 实现一个自定义上下文管理器
1、什么是上下文管理器?
上下文管理器(context manager)是Python2.5开始支持的一种语法,用于规定某个对象的使用范围。一旦进入或者离开该使用范围,会有特殊操作被调用 (比如为对象分配或者释放内存)。它的语法形式是,with...as... 使用关键字 with 和 as; 上下文管理器是指在一段代码执行之前执行一段代码作预处理工作;执行之后再执行一段代码,用于一些清理工作。比如打开文件进行读写,读写完之后需要将文件关闭。又比如在数据库操作中,操作之前需要连接数据库,操作之后需要关闭数据库。在上下文管理协议中,有两个方法__enter__和__exit__,分别实现上述两个功能;
links:
https://www.geeksforgeeks.org/context-manager-in-python/
http://book.pythontips.com/en/latest/context_managers.html
https://www.python.org/dev/peps/pep-0343/ 官方文档
2、实现一个自定义上下文
任何定义了__enter__()和__exit__()方法的对象都可以用于上下文管理器。
话不多说,上代码 : demo1
# -*- coding: utf-8 -*-
"""
@author: sato
@file: context.py
@date: 2019/9/12 18:25
"""
import os
import subprocess
import datetime
class CustomFile(object):
"""自定义一个上下文 文件类"""
def __init__(self, file_name, file_model, *args, **kwargs):
self.file_name = file_name
self.file_model = file_model
def __enter__(self, *args, **kwargs):
self.my_print("before open file, do something!")
self.f = open(self.file_name, self.file_model)
return self.f
def __exit__(self, *args, **kwargs):
"""
__exit__方法有三个参数:exc_type, exc_val, exc_tb。如果代码块BLOCK发生异常并退出,那么分别对应异常的type、value 和 traceback。否则三个参数全为None。
__exit__方法的返回值可以为True或者False。如果为True,那么表示异常被忽视,相当于进行了try-except操作;如果为False,则该异常会被重新raise。
"""
self.my_print(args)
self.my_print("before close , do something!")
self.f.close()
@classmethod
def my_print(cls, *args):
print('当前时间:{}'.format(datetime.datetime.now()), *args)
temp_file = './temp_file'
if not os.path.exists(temp_file):
subprocess.getoutput('echo "hello world" >> {}'.format(temp_file))
CustomFile.my_print('start')
with CustomFile(temp_file, 'r') as file:
data = file.read()
CustomFile.my_print(data)
CustomFile.my_print('end')
输出:
/Users/sato/.virtualenvs/review/bin/python3.6 /Users/sato/Desktop/review/context.py
当前时间:2019-09-12 19:08:04.933938 start
当前时间:2019-09-12 19:08:04.933993 before open file, do something!
当前时间:2019-09-12 19:08:04.934072 hello world
当前时间:2019-09-12 19:08:04.934091 (None, None, None)
当前时间:2019-09-12 19:08:04.934103 before close , do something!
当前时间:2019-09-12 19:08:04.934123 end
Process finished with exit code 0
demo2:
from contextlib import contextmanager
temp_file = './temp_file'
if not os.path.exists(temp_file):
subprocess.getoutput('echo "hello world" >> {}'.format(temp_file))
def demo2():
from contextlib import contextmanager
@contextmanager
# 该装饰器将一个函数中yield语句之前的代码当做__enter__方法执行,yield语句之后的代码当做__exit__方法执行。
# 同时yield返回值赋值给as后的变量。
def custom_file(_temp_file, model):
# __enter__ start
f = open(_temp_file, model)
try:
yield f
# __enter__ end
# __exit__ start
finally:
f.close()
return custom_file
with demo2()(temp_file, 'r') as file: # 我也不知道为什么要这么写
data = file.read()
CustomFile.my_print(data)
装饰器contextmanager 源码:
def contextmanager(func):
"""@contextmanager decorator.
Typical usage:
@contextmanager
def some_generator(<arguments>):
<setup>
try:
yield <value>
finally:
<cleanup>
This makes this:
with some_generator(<arguments>) as <variable>:
<body>
equivalent to this:
<setup>
try:
<variable> = <value>
<body>
finally:
<cleanup>
"""
@wraps(func)
def helper(*args, **kwds):
return _GeneratorContextManager(func, args, kwds)
return helper
_GeneratorContextManager 源码:
class _GeneratorContextManager(ContextDecorator, AbstractContextManager):
"""Helper for @contextmanager decorator."""
def __init__(self, func, args, kwds): # 被装饰的函数 func
self.gen = func(*args, **kwds)
self.func, self.args, self.kwds = func, args, kwds
# Issue 19330: ensure context manager instances have good docstrings
doc = getattr(func, "__doc__", None)
if doc is None:
doc = type(self).__doc__
self.__doc__ = doc
# Unfortunately, this still doesn't provide good help output when
# inspecting the created context manager instances, since pydoc
# currently bypasses the instance docstring and shows the docstring
# for the class instead.
# See http://bugs.python.org/issue19404 for more details.
def _recreate_cm(self):
# _GCM instances are one-shot context managers, so the
# CM must be recreated each time a decorated function is
# called
return self.__class__(self.func, self.args, self.kwds)
def __enter__(self): # 也是实现了__enter__方法的
try:
return next(self.gen)
except StopIteration:
raise RuntimeError("generator didn't yield") from None
def __exit__(self, type, value, traceback): # 也是实现了__exit__方法的
if type is None:
try:
next(self.gen)
except StopIteration:
return False
else:
raise RuntimeError("generator didn't stop")
else:
if value is None:
# Need to force instantiation so we can reliably
# tell if we get the same exception back
value = type()
try:
self.gen.throw(type, value, traceback)
except StopIteration as exc:
# Suppress StopIteration *unless* it's the same exception that
# was passed to throw(). This prevents a StopIteration
# raised inside the "with" statement from being suppressed.
return exc is not value
except RuntimeError as exc:
# Don't re-raise the passed in exception. (issue27122)
if exc is value:
return False
# Likewise, avoid suppressing if a StopIteration exception
# was passed to throw() and later wrapped into a RuntimeError
# (see PEP 479).
if type is StopIteration and exc.__cause__ is value:
return False
raise
except:
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise
# an exception unless __exit__() itself failed. But throw()
# has to raise the exception to signal propagation, so this
# fixes the impedance mismatch between the throw() protocol
# and the __exit__() protocol.
#
if sys.exc_info()[1] is value:
return False
raise
raise RuntimeError("generator didn't stop after throw()")
如果我重写装饰器和content类代码如下:
# -*- coding: utf-8 -*-
"""
@author: sato
@file: context.py
@date: 2019/9/12 18:25
"""
import os
import subprocess
import datetime
import sys
from contextlib import ContextDecorator, AbstractContextManager
from functools import wraps
temp_file = './temp_file'
if not os.path.exists(temp_file):
subprocess.getoutput('echo "hello world" >> {}'.format(temp_file))
class CustomFile(object):
"""自定义一个上下文 文件类"""
def __init__(self, file_name, file_model, *args, **kwargs):
self.file_name = file_name
self.file_model = file_model
def __enter__(self, *args, **kwargs):
self.my_print("before open file, do something!")
self.f = open(self.file_name, self.file_model)
return self.f
def __exit__(self, *args, **kwargs):
"""
__exit__方法有三个参数:exc_type, exc_val, exc_tb。如果代码块BLOCK发生异常并退出,那么分别对应异常的type、value 和 traceback。否则三个参数全为None。
__exit__方法的返回值可以为True或者False。如果为True,那么表示异常被忽视,相当于进行了try-except操作;如果为False,则该异常会被重新raise。
"""
self.my_print(args)
self.my_print("before close , do something!")
self.f.close()
@classmethod
def my_print(cls, *args):
print('当前时间:{}'.format(datetime.datetime.now()), *args)
def demo1(_temp_file):
CustomFile.my_print('start')
with CustomFile(_temp_file, 'r') as file:
data = file.read()
CustomFile.my_print(data)
CustomFile.my_print('end')
def demo2():
from contextlib import contextmanager
@contextmanager
# 该装饰器将一个函数中yield语句之前的代码当做__enter__方法执行,yield语句之后的代码当做__exit__方法执行。
# 同时yield返回值赋值给as后的变量。
def custom_file(_temp_file, model):
# __enter__ start
f = open(_temp_file, model)
try:
yield f
# __enter__ end
# __exit__ start
finally:
f.close()
return custom_file
class GeneratorContextManager(ContextDecorator, AbstractContextManager):
"""Helper for @contextmanager decorator."""
def __init__(self, func, args, kwds):
self.gen = func(*args, **kwds)
self.func, self.args, self.kwds = func, args, kwds
# Issue 19330: ensure context manager instances have good docstrings
doc = getattr(func, "__doc__", None)
if doc is None:
doc = type(self).__doc__
self.__doc__ = doc
# Unfortunately, this still doesn't provide good help output when
# inspecting the created context manager instances, since pydoc
# currently bypasses the instance docstring and shows the docstring
# for the class instead.
# See http://bugs.python.org/issue19404 for more details.
def _recreate_cm(self):
# _GCM instances are one-shot context managers, so the
# CM must be recreated each time a decorated function is
# called
return self.__class__(self.func, self.args, self.kwds)
def __enter__(self):
CustomFile.my_print('before open do something')
try:
return next(self.gen)
except StopIteration:
raise RuntimeError("generator didn't yield") from None
def __exit__(self, type, value, traceback):
CustomFile.my_print('before close do something')
if type is None:
try:
next(self.gen)
except StopIteration:
return False
else:
raise RuntimeError("generator didn't stop")
else:
if value is None:
# Need to force instantiation so we can reliably
# tell if we get the same exception back
value = type()
try:
self.gen.throw(type, value, traceback)
except StopIteration as exc:
# Suppress StopIteration *unless* it's the same exception that
# was passed to throw(). This prevents a StopIteration
# raised inside the "with" statement from being suppressed.
return exc is not value
except RuntimeError as exc:
# Don't re-raise the passed in exception. (issue27122)
if exc is value:
return False
# Likewise, avoid suppressing if a StopIteration exception
# was passed to throw() and later wrapped into a RuntimeError
# (see PEP 479).
if type is StopIteration and exc.__cause__ is value:
return False
raise
except:
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise
# an exception unless __exit__() itself failed. But throw()
# has to raise the exception to signal propagation, so this
# fixes the impedance mismatch between the throw() protocol
# and the __exit__() protocol.
#
if sys.exc_info()[1] is value:
return False
raise
raise RuntimeError("generator didn't stop after throw()")
def my_contextmanager(func):
"""@contextmanager decorator.
Typical usage:
@contextmanager
def some_generator(<arguments>):
<setup>
try:
yield <value>
finally:
<cleanup>
This makes this:
with some_generator(<arguments>) as <variable>:
<body>
equivalent to this:
<setup>
try:
<variable> = <value>
<body>
finally:
<cleanup>
"""
@wraps(func)
def helper(*args, **kwds):
return GeneratorContextManager(func, args, kwds)
return helper
@my_contextmanager
def custom_file(_temp_file, model):
# __enter__ start
f = open(_temp_file, model)
try:
yield f
# __enter__ end
# __exit__ start
finally:
f.close()
return custom_file
CustomFile.my_print('start')
with custom_file(temp_file, 'r') as f:
data = f.read()
CustomFile.my_print(data)
CustomFile.my_print('end')
输出:
/Users/sato/.virtualenvs/review/bin/python3.6 /Users/sato/Desktop/review/context.py
当前时间:2019-09-12 19:37:17.099099 start
当前时间:2019-09-12 19:37:17.099160 before open do something
当前时间:2019-09-12 19:37:17.099236 hello world
当前时间:2019-09-12 19:37:17.099254 before close do something
当前时间:2019-09-12 19:37:17.099279 end
Process finished with exit code 0
可以看到执行顺序同样是: 先enter 后exit
如果继续看 AbstractContextManager 会发现 :
class AbstractContextManager(abc.ABC):
"""An abstract base class for context managers."""
def __enter__(self):
"""Return `self` upon entering the runtime context."""
return self
@abc.abstractmethod
def __exit__(self, exc_type, exc_value, traceback):
"""Raise any exception triggered within the runtime context."""
return None
@classmethod
def __subclasshook__(cls, C):
if cls is AbstractContextManager:
return _collections_abc._check_methods(C, "__enter__", "__exit__")
return NotImplemented
AbstractContextManager 同样有__enter__ __exit__ 方法
这就说明:
This PEP adds a new statement "with" to the Python language to make it possible to factor out standard uses of try/finally statements.
In this PEP, context managers provide __enter__() and __exit__() methods that are invoked on entry to and exit from the body of the with statement.
https://www.python.org/dev/peps/pep-0343/#specification-the-with-statement