我最近需要从Python运行一些命令行应用程序。虽然这在subprocess.Popen中相当简单。我想要能够正确地输送输出。
当从Python运行程序时,我需要三件事:
Popen或stdout或stderr传递一些文件对象(而不是所有文件对象),这是可能的。Popen.stdout传递到sys.stdout和my_file,我应该能够。Popen.communicate或stdout=None时,不可能记录输出并显示输出。所有这些都很容易通过线程使用Popen.stdout来实现。尽管代码的核心相当简单。每次使用Popen时都需要手动创建和管理线程,这是不可取的。此外,考虑到代码是一个方便的包装器,我决定尝试使使用尽可能简单。
"""
Teetime - adding tee like functionality to Popen.
Conveniently allow Popen to pass data to any number of sinks.
Sinks can be shared between both stdout and std err and be handled correctly.
.. code-block::
import sys
import teetime
with open('log.txt', 'wb') as f:
teetime.popen_call(
['python', 'test.py'],
stdout=(sys.stdout.buffered, f),
stderr=(sys.stderr.buffered, f),
)
The :code:`popen_call` function is a convenience over :code:`Sinks`.
If you need to interact with the process when it's live then you can
modify the following to suite your needs.
.. code-block::
import sys
import teetime
with open('log.txt', 'wb') as f:
sinks = Sinks(
(sys.stdout.buffered, f),
(sys.stderr.buffered, f),
)
process = sinks.popen(['python', 'test.py'])
with self.make_threads(process) as threads:
threads.join()
self.reset_head()
"""
from __future__ import annotations
from typing import (
Any, Callable, Iterator, List, Optional, Sequence, Tuple, Type
)
import collections
import io
import queue
import subprocess
import threading
QUEUE_SENTINEL = object()
FILE_SENTINEL = b''
TSink = Callable[[bytes], None]
_Threads = collections.namedtuple('Threads', 'out, err, both, queue')
_Sinks = collections.namedtuple('Sinks', 'out, err, both')
class Threads(_Threads):
"""
Thread manager and interface.
Because we need to listen to stdout and stderr at the same time we
need to put the listeners in threads. This is so we can handle
changes when they happen. This comes with two benifits:
1. We don't have to store all the print information in memory.
2. We can display changes as soon as they happen.
To allow stdout and stderr to merge correctly we have to use an
atomic queue. :code:`queue.Queue` is one such example. If we have
no need of combining output as there are no both sinks then the
queue and associated thread won't be created.
This object holds the three threads and the message queue.
"""
out: threading.Thread
err: threading.Thread
both: threading.Thread
queue: queue.Queue
def __new__(
cls,
process: subprocess.Popen,
sinks: Sinks,
flush: bool = True,
) -> Threads:
"""
Create Threads from an active process and sinks.
This creates the required threads to handle the output and sinks.
These threads are created as daemons and started on creation.
This also creates the message queue that merges both stdout and
stderr when needed.
:param process: Process to tee output from.
:param sinks: Places to tee output to.
:param flush: Whether to force flush flushable sinks.
:return: A thread manager for the process and sinks.
"""
o_thread = None
e_thread = None
b_thread = None
_queue: Optional[queue.Queue] = None
out, err, both = sinks.as_callbacks(flush=flush)
if both:
_queue = queue.Queue()
b_thread = cls._make_thread(
target=cls._handle_sinks,
args=(iter(_queue.get, QUEUE_SENTINEL), both)
)
if out is None:
raise ValueError('both is defined, but out is None')
if err is None:
raise ValueError('both is defined, but err is None')
out += (_queue.put,)
err += (_queue.put,)
if out:
o_thread = cls._make_thread(
target=cls._handle_sinks,
args=(iter(process.stdout.readline, FILE_SENTINEL), out)
)
if err:
e_thread = cls._make_thread(
target=cls._handle_sinks,
args=(iter(process.stderr.readline, FILE_SENTINEL), err)
)
return _Threads.__new__(
cls,
o_thread,
e_thread,
b_thread,
_queue
)
def __enter__(self) -> Threads:
"""Context manager pattern."""
return self
def __exit__(self, _1: Any, _2: Any, _3: Any) -> None:
"""Context manager pattern."""
self.end_queue()
@staticmethod
def _make_thread(*args: Any, **kwargs: Any) -> threading.Thread:
"""Make thread."""
thread = threading.Thread(*args, **kwargs)
thread.daemon = True
thread.start()
return thread
@staticmethod
def _handle_sinks(_input: Iterator[bytes], sinks: List[TSink]) -> None:
"""Threaded function to pass data between source and sink."""
for segment in _input:
for sink in sinks:
sink(segment)
def join(
self,
out: bool = True,
err: bool = True,
both: bool = False,
) -> None:
"""Conveniently join threads."""
if out and self.out is not None:
self.out.join()
if err and self.err is not None:
self.err.join()
if both and self.both is not None:
self.both.join()
def end_queue(self) -> None:
"""Send exit signal to the both thread."""
if self.queue is not None:
self.queue.put(QUEUE_SENTINEL)
def _flushed_write(sink: Any) -> TSink:
"""Flush on write."""
def write(value: bytes) -> None:
sink.write(value)
sink.flush()
return write
class Sinks(_Sinks):
"""
Ease creation and usage of sinks.
This handles where the output should be copied to.
It also contains a few convenience functions to ease usage.
These convenience functions are purely optional and the raw form
is still available to some extent.
"""
out: Optional[Tuple[Any, ...]]
err: Optional[Tuple[Any, ...]]
both: Tuple[Any, ...]
def __new__(cls, out: Sequence[Any], err: Sequence[Any]):
"""Create new sinks."""
both: Tuple[Any, ...] = ()
if out and err:
_out = set(out)
_err = set(err)
_both = _out & _err
_out -= _both
_err -= _both
out = tuple(_out)
err = tuple(_err)
both = tuple(_both)
return _Sinks.__new__(cls, out, err, both)
def run(
self,
*args,
flush: bool = True,
threads: Type[Threads] = Threads,
**kwargs,
) -> subprocess.Popen:
"""Conveniently create and execute a process and threads."""
process = self.popen(*args, **kwargs)
self.run_threads(process, flush=flush, threads=threads)
return process
def popen(
self,
*args: Any,
**kwargs: Any,
) -> subprocess.Popen:
"""Conveniently create a process with out and err defined."""
return subprocess.Popen( # type: ignore
*args,
stdout=self.popen_out,
stderr=self.popen_err,
**kwargs,
)
@property
def popen_out(self) -> Optional[int]:
"""Conveniently get the Popen output argument."""
return subprocess.PIPE if self.out is not None else None
@property
def popen_err(self) -> Optional[int]:
"""Conveniently get the Popen error argument."""
return subprocess.PIPE if self.err is not None else None
def run_threads(
self,
process: subprocess.Popen,
flush: bool = True,
threads: Type[Threads] = Threads,
) -> None:
"""Conveniently create the threads and execute process."""
with self.make_threads(
process,
flush=flush,
threads=threads,
) as _threads:
_threads.join()
self.flush()
self.reset_head()
def make_threads(
self,
process: subprocess.Popen,
flush: bool = True,
threads: Type[Threads] = Threads,
) -> Threads:
"""Conveniently create threads."""
return threads(process, self, flush=flush)
def flush(self) -> None:
"""Flush all sinks."""
for sinks in self:
if sinks is None:
continue
for sink in sinks:
if hasattr(sink, 'flush'):
sink.flush()
def reset_head(self) -> None:
"""Reset the head of all sinks."""
for sinks in self:
for sink in sinks or []:
if hasattr(sink, 'seek'):
try:
sink.seek(0)
except io.UnsupportedOperation:
pass
@staticmethod
def _to_callback(
sinks: Optional[List[Any]],
flush: bool = True,
) -> Optional[Tuple[TSink, ...]]:
"""Convert sinks to a callback."""
if sinks is None:
return None
callbacks: List[TSink] = []
for sink in sinks:
if isinstance(sink, queue.Queue):
callbacks.append(sink.put)
elif hasattr(sink, 'write'):
if flush and hasattr(sink, 'flush'):
callbacks.append(_flushed_write(sink))
else:
callbacks.append(sink.write)
else:
raise ValueError(f'Unknown sink type {type(sink)} for {sink}')
return tuple(callbacks)
def as_callbacks(
self,
flush: bool = True,
) -> Tuple[Optional[Tuple[TSink, ...]], ...]:
"""Convert all sinks to their callbacks."""
return tuple(self._to_callback(sinks, flush=flush) for sinks in self)
def popen_call(*args, stdout=None, stderr=None, **kwargs) -> subprocess.Popen:
"""Initialize process and wait for IO to complete."""
return Sinks(stdout, stderr).run(*args, **kwargs)考虑到大量的代码,使用起来相当简单。以下面的内容为例,它将stdout、stderr和两个输出记录到三个不同的文件中。同时输出输出到stdout和stderr现场。
if __name__ == '__main__':
import sys
import tempfile
with tempfile.TemporaryFile() as fout,\
tempfile.TemporaryFile() as ferr,\
tempfile.TemporaryFile() as fboth:
process = popen_call(
['python', 'test.py'],
stdout=(sys.stdout.buffer, fout, fboth),
stderr=(sys.stderr.buffer, ferr, fboth),
)
process.wait()
print('\n\noriginal')
print(fboth.read().decode('utf-8'))
print('\nstd.out')
print(fout.read().decode('utf-8'))
print('\nstd.err')
print(ferr.read().decode('utf-8'), end='')test.py
import sys
import random
import time
random.seed(42401)
for _ in range(10):
f = random.choice([sys.stdout, sys.stderr])
word = random.sample('Hello World!', random.randrange(1, 12))
f.write(''.join(word) + '\n')
f.flush()
time.sleep(random.randrange(3))仅在Python3.7.2上测试代码。
发布于 2019-12-15 19:58:10
在对关键思想进行推理并回顾起源Unix tee命令时,得出的明显结论是,Sinks项实际上应该是I/O流(wheather二进制、文本或缓冲流或OS级文件对象),或者是可选的queue.Queue实例。在这种情况下,所有其他类型都是实际/无效的。
因此,如果输入序列(out、err)在构造Sinks实例的一开始就实现了基本io.IOBAse接口或queue.Queue类的主要子类,那么对它们进行验证是合理的。
这将允许消除诸如if hasattr(sink, 'flush')、hasattr(sink, 'seek')、hasattr(sink, 'write')等复杂的重复检查--假设“接收器”项是从已经实现flush/seek/write行为的任何(io.RawIOBase, io.BufferedIOBase, io.TextIOBase)派生的实例。
考虑到这一点,我会将各自的静态方法添加到Sinks类中:
@staticmethod
def _validate_sinks(sinks: Sequence[Any]):
for sink in sinks:
if not isinstance(sink, (io.RawIOBase, io.BufferedIOBase, io.TextIOBase, queue.Queue)):
raise TypeError(f'Type `{type(sink)}` is not valid sink type')
@staticmethod
def is_iostream(sink):
return isinstance(sink, (io.RawIOBase, io.BufferedIOBase, io.TextIOBase))现在,Sinks.__new__方法看起来应该是(还可以看到如何优化冗余set重新分配):
def __new__(cls, out: Sequence[Any], err: Sequence[Any]):
"""Create new sinks."""
# Validating I/O streams
if out:
Sinks._validate_sinks(out)
if err:
Sinks._validate_sinks(err)
both: Tuple[Any, ...] = ()
if out and err:
_out = set(out)
_err = set(err)
out = tuple(_out - _err)
err = tuple(_err - _out)
both = tuple(_out & _err)
return _Sinks.__new__(cls, out, err, both)在发布优化的flush、reset_head和_to_callback方法之前--以下是一些微妙的问题:
reset_head法当运行您的方法时,是“原样”,我在最后得到了OSError: [Errno 29] Illegal seek。一些原始二进制流可能不是可见。如果False,will (),tell()和truncate()将引发OSError。因此,让我们捕获两个except (io.UnsupportedOperation, OSError) as ex:异常(请参阅下面的重构方法版本)_to_callback法由于初步验证,简化了该方法。考虑到上述问题以及其他一些次要但冗余的矩/条件(如if sinks is None: continue、for sink in sinks or []: ),上述3种方法如下所示:
def flush(self) -> None:
"""Flush all sinks."""
for sinks in filter(Sinks.is_iostream, self):
for sink in sinks:
sink.flush()
def reset_head(self) -> None:
"""Reset the head of all sinks."""
for sinks in filter(Sinks.is_iostream, self):
for sink in sinks:
try:
sink.seek(0)
except (io.UnsupportedOperation, OSError) as ex:
print(sink, sink.seekable(), ex)
pass
@staticmethod
def _to_callback(
sinks: Optional[List[Any]],
flush: bool = True,
) -> Optional[Tuple[TSink, ...]]:
"""Convert sinks to a callback."""
if sinks is None:
return None
callbacks: List[TSink] = []
for sink in sinks:
if isinstance(sink, queue.Queue):
callbacks.append(sink.put)
elif Sinks.is_iostream(sink):
callbacks.append(_flushed_write(sink) if flush else sink.write)
return tuple(callbacks)示例运行(输出):
doWlloHer
HlWd
lHl o!lreo
!
oWd H
lHlWd
delWlHlor
olrdl!He
HolWlloe
!l
original
doWlloHer
HlWd
lHl o!lreo
!
oWd H
lHlWd
delWlHlor
olrdl!He
HolWlloe
!l
std.out
lHl o!lreo
!
oWd H
lHlWd
!l
std.err
doWlloHer
HlWd
delWlHlor
olrdl!He
HolWlloehttps://codereview.stackexchange.com/questions/234048
复制相似问题