![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiYWan5SZykjMmljZyYzYjNjY3ImY1YzY0MTMwQjNxYTYlZzM28CX5d2bs92Yl1iclB3bsVmdlR2LcNWaw9CXt92Yu4GZjlGbh5yYjV3Lc9CX6MHc0RHaiojIsJye.gif)
消息隊列應用場景廣泛,使用SQL實作簡易隊列原理也很簡單,實作起來就是 消息 帶
狀态
和
版本号
字段。
更新時用
版本号
做樂觀鎖。操作邏輯就是個狀态機。
UPDATE mq SET mq.status=new_status mq.version = mq.version + 1 WHERE mq.version = old_version
實作
mysql mq 表結構設計
CREATE TABLE `mq` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`msg` varchar(1024) DEFAULT NULL,
`status` varchar(100) DEFAULT 'ready',
`version` bigint(20) unsigned DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;
生産者
測試,向隊列插入兩條消息。
insert into mq(msg) values('第一條消息 hello world! 1024');
insert into mq(msg) values('第二條消息 歡迎通路 https://spaceack.com');
消費者
擷取隊列中的消息, 此時不會改變隊列(mq表)中的資料。
select * from mq where status='ready' limit 1;
消息确認(關鍵!)
update mq set mq.status = 'ack', mq.version = mq.version + 1 WHERE mq.version = {query_version} and id = {query_id}
确認後的狀态:
再次擷取資料僅能擷取第二條資料。
這樣的一個好處就是消息都是可見的。可以直接查資料庫。
不用加悲觀鎖,因為update成功後就帶鎖。其它同版本的更新都會失敗。
還有個好處就是減少元件依賴。簡單的服務資料庫就能搞定。不用再起個rabbitmq等隊列服務。節約運維成本。
版本号 另一個小作用:InnoDB 如果更新語句沒有改變任何字段值時,影響行數會傳回0,那麼是沒找到記錄還是沒改變值的0呢?前者是bug, 後者是正常情況但區分不了。加版本号後每次修改+1,影響行數一定不為0。
隐藏福利:生産者自動批量測試腳本
import random
import subprocess
def runcmd(command):
ret = subprocess.run(command,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,encoding="utf-8",timeout=1)
if ret.returncode == 0:
# print("success:",ret, ret.stdout)
return ret.stdout
else:
print("error:",ret)
def producer():
sql = "\"insert into mq(msg) values('%s');\"" % (str(random.randint(1,99999)))
cmd = """mysql -uroot -ppassword test -e %s """ % (sql)
runcmd(cmd)