量化致富 发布于2023-11-02
回复 92
浏览 2208
39
## 基于redis stream的消费者
- 因qmt这一段没封装好,故隐去了qmt及email发送的逻辑,可自行实现
- 首次运行,放开最后面的新增consume group的代码注释即可
> [基于redis stream下单函数](https://www.joinquant.com/view/community/detail/44545)
``` python
import re
import redis
#import miniqmt
from threading import Thread
#import mail
import signal
import time
class RedisService(object):
host = "ip or domain"
#host='127.0.0.1'
port = 12142
pwd = "password"
stream_name = "order"
consumer_group = "order_group"
consumer_name = 'single'
path = r'D:\国金证券QMT交易端\userdata_mini'
acc = 'stock account'
receivers = ['xxx@qq.com']
def __init__(self):
self.redis = redis.Redis(
host=self.host,
port=self.port,
password=self.pwd)
self.stk_code_pattern = re.compile(r'(\d{6}).*')
self.thread = Thread(target=self.loop_consuming, daemon=True)
def start_background(self):
self.thread.start()
def xgroup_create(self,id):
self.redis.xgroup_create(self.stream_name, self.consumer_group, id=id, mkstream=True)
def __get_data(self,data):
if not data or not data[0]:
return None, None
msgId = str(data[0], 'utf-8')
data = {str(key, 'utf-8'): str(val, 'utf-8') for key, val in data[1].items()}
return msgId, data
def loop_consuming(self):
while True:
self.consume(self.consumer_name, count=3, target=self.biz_execute)
def consume(self,consumer_name,id=">",block=60000, count=1,target=None):
"""
消费数据
:param consumer_name: 消费者名称,建议传递ip
:param id: 从哪开始消费
:param block: 无消息阻塞时间,毫秒,默认60秒,在60秒内有消息直接消费,0 时阻塞等待
:param count: 消费多少条,默认1
:param target: 业务处理回调方法
:return:
"""
streams = {self.stream_name: id}
rst = self.redis.xreadgroup(self.consumer_group, consumer_name, streams, block=block, count=count)
print(f'消费到的数据 {rst}')
if not rst or not rst[0] or not rst[0][1]:
return
# 遍历获取到的列表信息(可以消费多条,根据count)
for item in rst[0][1]:
try:
msgId, data = self.__get_data(item)
if target and target(msgId,data):
self.redis.xack( self.stream_name, self.consumer_group, msgId)
except Exception as e:
# 消费失败,下次从头消费(消费成功的都已经提交ack了,可以先不处理,以后再处理)
print("consumer is error:",repr(e))
def biz_execute(self, msgId, data):
print(f'业务执行msgId={msgId} bizData={data}')
if not data.get('code'):
return True
try:
match_res = self.stk_code_pattern.search(data['code'])
if not match_res:
print("illegal param code %s" % data['code'])
return True
code = match_res.group(1)
code = (code + '.SH') if code.startswith(('5','6')) else (code + '.SZ')
amt = int(data['amt'])
remark = data.get('remark', '')
stg = data.get('stg', '')
print(f"{data['type']} {code}, price is { data['price'] if data['price'] != '0' else 'market'}, amt {amt} ")
return True
except Exception as e:
print(repr(e))
return False
if __name__ == '__main__':
stream = RedisService()
#注意:第一次运行需要创建消费组,取消下面这一句注释即可
#stream.xgroup_create(0)
stream.start_background()
input("press enter to exit...\n")
```
评论
@夏刚华 应该可以 我还是万三
2023-11-02