@量化致富 改那里可以区分出来,我这样改可以不,是不是独立的二个子线程中运行,stream_names = ["smallgo", "smallgo1"]
def consume(self, stream_names, consumer_name, id=">", block=60000, count=1, target=None):
for stream_name in stream_names:
streams = {stream_name: id}
rst = self.redis.xreadgroup(self.consumer_group, consumer_name, streams, block=block, count=count)
if not rst or not rst[0] or not rst[0][1]:
continue
for item in rst[0][1]:
try:
msgId, data = self.__get_data(item)
if target and target(msgId, data):
self.redis.xack(stream_name, self.consumer_group, msgId)
except Exception as e:
print("consumer is error:", repr(e))