Pipes in Python
The pipe operator is one of the best features of linux shells that allows to combine different operations with easy. Here we will try to implement the same syntax in Python.
I’ve always wanted to have a way to build data processing pipelines in Python using pipes, like this range(10) | F(is_odd) | P(lambda x: x * 2), instead of functions and generators and maps and loops. So I’ve tried …
The idea is pretty simple: let’s create a class with implemented OR and ROR operators, the pipes.
def __or__(self, other):
other.source = self
return other
def __ror__(self, other):
self.source = (
iter(other)
if not isinstance(other, (str, bytes)) and hasattr(other, "__iter__")
else other
)
return selfThe tricky part was implementation of __next__ since I wanted it to be a lazy operation. After a few trials and errors I’ve ended up with a pretty simple approach where the wrapping class implementing the pipe will call next to its source, added by OR or ROR, apply a transformation and then return the result of the transformation.
def __next__(self):
if self.source is None:
raise StopIteration
value = next(self.source)
result = self.operator(value)
return resultIt did the trick for standard transformations but not for the filters like is_odd since those skips some data rather than return it like a transformation. To filters easily addable I’ve implemented a wrapper that works similar to the pipe class but applies a filter to the data going through it.
def __next__(self):
while True:
if self.source is None:
raise StopIteration
value = next(self.source)
if self.predicate(value):
return valueThe resulting code allowed me to do things like this:
# Example usage with filtering
pipe = range(10) | P(lambda x: x + 3)
print("range(10) | P(lambda x: x + 3):", list(pipe))
pipe = range(10) | P(lambda x: x + 3) | P(lambda x: x * 2)
print("range(10) | P(lambda x: x + 3) | P(lambda x: x * 2):", list(pipe))
pipe = range(10) | F(is_odd)
print("range(10) | F(is_odd):", list(pipe))
pipe = range(10) | F(is_odd) | P(lambda x: x * 2)
print("range(10) | F(is_odd) | P(lambda x: x * 2):", list(pipe))
pipe = list(range(10)) | F(is_odd) | P(lambda x: x * 2)
print("list(range(10)) | F(is_odd) | P(lambda x: x * 2):", list(pipe))
def gen_fn():
for i in range(10):
yield i
pipe = gen_fn() | F(is_odd) | P(lambda x: x * 2)
print("gen_fn() | F(is_odd) | P(lambda x: x * 2):", list(pipe))Results:
range(10) | P(lambda x: x + 3): [3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
range(10) | P(lambda x: x + 3) | P(lambda x: x * 2): [6, 8, 10, 12, 14, 16, 18, 20, 22, 24]
range(10) | F(is_odd): [1, 3, 5, 7, 9]
range(10) | F(is_odd) | P(lambda x: x * 2): [2, 6, 10, 14, 18]
list(range(10)) | F(is_odd) | P(lambda x: x * 2): [2, 6, 10, 14, 18]
gen_fn() | F(is_odd) | P(lambda x: x * 2): [2, 6, 10, 14, 18]Moving from this:
result = [x * 2 for x in range(10) if is_odd(x)]to this:
result = list(range(10)) | F(is_odd) | P(lambda x: x * 2))This may not be seen as a big deal but if you try to implement something like
range(10) | F(f1) | P(op1) | P(op2) | P(op3) | F(f2)and make it work in a lazy evaluation fashion, you will discovered rather quickly that it is not that easy at all or intuitive. There are ways like Queue but they require a lot of boilerplate code.
The complete code can be found here: pipe.py
Originally posted on: https://chaotic.land/posts/ideas/python-pipes/

