程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长

+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

暂无数据

通过Python中的谓词对可迭代进行分组

发布于2019-09-19 12:30     阅读(1187)     评论(0)     点赞(0)     收藏(3)


我正在解析这样的文件:

--header--
数据1
DATA2
--header--
DATA3
DATA4
DATA5
--header--
--header--
...

我想要这样的团体:

[ [header, data1, data2], [header, data3, data4, data5], [header], [header], ... ]

所以我可以像这样迭代它们:

for grp in group(open('file.txt'), lambda line: 'header' in line):
    for item in grp:
        process(item)

并将detect-a-group逻辑与process-a-group逻辑分开。

但我需要一个可迭代的迭代,因为这些组可以任意大,我不想存储它们。也就是说,每当遇到“sentinel”或“header”项时,我想将一个iterable分成子组,如谓词所示。看起来这将是一项常见的任务,但我找不到有效的Pythonic实现。

这是一个愚蠢的追加到列表的实现:

def group(iterable, isstart=lambda x: x):
    """Group `iterable` into groups starting with items where `isstart(item)` is true.

    Start items are included in the group.  The first group may or may not have a 
    start item.  An empty `iterable` results in an empty result (zero groups)."""
    items = []
    for item in iterable:
        if isstart(item) and items:
            yield iter(items)
            items = []
        items.append(item)
    if items:
        yield iter(items) 

感觉就像是一个不错的itertools版本,但它让我望而却步。“明显的”(?!)groupby解决方案似乎不起作用,因为可能存在相邻的标头,并且它们需要分开组。我能想到的最好的是(ab)使用groupby一个保持计数器的关键功能:

def igroup(iterable, isstart=lambda x: x):
    def keyfunc(item):
        if isstart(item):
            keyfunc.groupnum += 1       # Python 2's closures leave something to be desired
        return keyfunc.groupnum
    keyfunc.groupnum = 0
    return (group for _, group in itertools.groupby(iterable, keyfunc))

但我觉得Python可以做得更好 - 而且遗憾的是,这比哑巴列表更慢:

#ippy
%time deque(组(xrange(10 ** 7),lambda x:x%1000 == 0),maxlen = 0)
CPU时间:用户4.20秒,系统:0.03秒,总计:4.23秒

%time deque(igroup(xrange(10 ** 7),lambda x:x%1000 == 0),maxlen = 0)
CPU时间:用户5.45秒,系统:0.01秒,总计:5.46秒

为了方便您,这里有一些单元测试代码:

class Test(unittest.TestCase):
    def test_group(self):
        MAXINT, MAXLEN, NUMTRIALS = 100, 100000, 21
        isstart = lambda x: x == 0
        self.assertEqual(next(igroup([], isstart), None), None)
        self.assertEqual([list(grp) for grp in igroup([0] * 3, isstart)], [[0]] * 3)
        self.assertEqual([list(grp) for grp in igroup([1] * 3, isstart)], [[1] * 3])
        self.assertEqual(len(list(igroup([0,1,2] * 3, isstart))), 3)        # Catch hangs when groups are not consumed
        for _ in xrange(NUMTRIALS):
            expected, items = itertools.tee(itertools.starmap(random.randint, itertools.repeat((0, MAXINT), random.randint(0, MAXLEN))))
            for grpnum, grp in enumerate(igroup(items, isstart)):
                start = next(grp)
                self.assertTrue(isstart(start) or grpnum == 0)
                self.assertEqual(start, next(expected))
                for item in grp:
                    self.assertFalse(isstart(item))
                    self.assertEqual(item, next(expected))

那么:我如何在Python中优雅高效地通过谓词对可迭代子进行子组化?


解决方案


如何在Python中优雅高效地通过谓词对可迭代的子类进行子组化?

这是一个简洁,内存有效的实现,与您的问题非常类似:

from itertools import groupby, imap
from operator import itemgetter

def igroup(iterable, isstart):
    def key(item, count=[False]):
        if isstart(item):
           count[0] = not count[0] # start new group
        return count[0]
    return imap(itemgetter(1), groupby(iterable, key))

它支持无限组。

tee基于解决方案的速度略快,但它消耗了当前组的内存(类似于list问题基于解决方案):

from itertools import islice, tee

def group(iterable, isstart):
    it, it2 = tee(iterable)
    count = 0
    for item in it:
        if isstart(item) and count:
            gr = islice(it2, count)
            yield gr
            for _ in gr:  # skip to the next group
                pass
            count = 0
        count += 1
    if count:
       gr = islice(it2, count)
       yield gr
       for _ in gr:  # skip to the next group
           pass

groupby-solution可以用纯Python实现:

def igroup_inline_key(iterable, isstart):
    it = iter(iterable)

    def grouper():
        """Yield items from a single group."""
        while not p[START]:
            yield p[VALUE]  # each group has at least one element (a header)
            p[VALUE] = next(it)
            p[START] = isstart(p[VALUE])

    p = [None]*2 # workaround the absence of `nonlocal` keyword in Python 2.x
    START, VALUE = 0, 1
    p[VALUE] = next(it)
    while True:
        p[START] = False # to distinguish EOF and a start of new group
        yield grouper()
        while not p[START]: # skip to the next group
            p[VALUE] = next(it)
            p[START] = isstart(p[VALUE])

为避免重复代码,while True循环可写为:

while True:
    p[START] = False  # to distinguish EOF and a start of new group
    g = grouper()
    yield g
    if not p[START]:  # skip to the next group
        for _ in g:
            pass
        if not p[START]:  # EOF
            break

虽然之前的变体可能更明确和可读。

我认为纯Python中一般的内存高效解决方案不会明显快于groupby基于Python的解决方案

If process(item) is fast compared to igroup() and a header could be efficiently found in a string (e.g., for a fixed static header) then you could improve performance by reading your file in large chunks and splitting on the header value. It should make your task IO-bound.



所属网站分类: 技术文章 > 问答

作者:黑洞官方问答小能手

链接:https://www.pythonheidong.com/blog/article/115047/86306276bb48bbe212f9/

来源:python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

0 0
收藏该文
已收藏

评论内容:(最多支持255个字符)