Sometimes you want to do some background processing in reaction to some new entry in a database table. Here I provide one possible solution.
Schema
The scheme is flexible in that you can add a new subscription without changing the table trigger. Also if there are no subscriptions no event log entries are made. You can also use the Firebird event ‘new_event’ to trigger a check for events in your code.
create table languages
(
language_id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
name varchar(20),
year_released integer
);
create table event_subscriber
(
sub_id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
target_table varchar(100),
action varchar(2),
tag varchar(20)
);
create table event_log
(
log_id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
sub_id integer,
pk integer,
work_update TIMESTAMP
);
CREATE INDEX IDX_event_log_sub_id
ON event_log (sub_id);
CREATE TRIGGER event_log_insert FOR event_log
ACTIVE AFTER INSERT POSITION 0
AS
BEGIN
POST_EVENT 'new_event';
END;
CREATE TRIGGER languages_event FOR languages
ACTIVE AFTER INSERT OR UPDATE OR DELETE POSITION 0
AS
BEGIN
IF (DELETING) THEN
insert into event_log (sub_id,pk) select sub_id,OLD.language_id as pk from event_subscriber where target_table = 'languages' AND action = 'D';
IF (INSERTING) THEN
insert into event_log (sub_id,pk) select sub_id,NEW.language_id as pk from event_subscriber where target_table = 'languages' AND action = 'I';
IF (UPDATING) THEN
insert into event_log (sub_id,pk) select sub_id,NEW.language_id as pk from event_subscriber where target_table = 'languages' AND action = 'U';
END;
--Finally add some subscriptions
UPDATE OR INSERT into event_subscriber (action,target_table,tag) values ('I','languages','send_email') MATCHING (action,target_table,tag) RETURNING sub_id;
UPDATE OR INSERT into event_subscriber (action,target_table,tag) values ('I','languages','ring_bell') MATCHING (action,target_table,tag) RETURNING sub_id;
Single Threaded Worker
Here we process events for a certain subscription using one thread. Advantages are events are processed in order and code is simpler.
from util import printrs,getConnection
from schema import RecreateDatabase
import time
def processEvents():
con = getConnection()
cur = con.cursor()
cur.execute("UPDATE OR INSERT into event_subscriber (action,target_table,tag) values (?,?,?) MATCHING (action,target_table,tag) RETURNING sub_id",('I','languages','ring_bell'))
subid = cur.fetchone()[0]
con.commit()
processed_count = 0
while True:
cur.execute("SELECT log_id,sub_id,pk FROM event_log WHERE work_update IS NULL AND sub_id = ?",(subid,))
rs = cur.fetchall()
con.commit()
for row in rs:
log_id,sub_id,pk = row
cur.execute("UPDATE event_log SET work_update = CURRENT_TIMESTAMP WHERE log_id = ?",(log_id,))
con.commit()
# time.sleep(0.1) #DO SOME WORK
cur.execute("DELETE FROM event_log WHERE log_id = ?",(log_id,))
con.commit()
processed_count += 1
if not rs:
return processed_count
if __name__ == "__main__":
print("Process Single Thread")
print("generate database")
RecreateDatabase()
N = 500
print("Generating",N,"Entries")
con = getConnection()
cur = con.cursor()
con.begin()
for x in range(0,N):
cur.execute("insert into languages (name, year_released) values (?, ?)",('Dylan', x))
con.commit()
print("processEvents")
start_time = time.time()
processed_count = processEvents()
end_time = time.time()
time_lapsed = end_time - start_time
print(time_lapsed,"seconds",(N/time_lapsed),'rps')
print("processed_count",processed_count)
Multi Threaded / Process
Here we process events for a certain subscription using multiple processes. This can be much faster if processing take a bit of time. Disadvantages are events are not processed in order.
Of interest here is the sql used to get 1 event to process.
SELECT log_id,sub_id,pk FROM event_log WHERE work_update IS NULL AND sub_id = ? ORDER BY RAND() ROWS 1 FOR UPDATE WITH LOCK
work_update IS NULL – is to not select items being processed
ORDER BY RAND() ROWS 1 – get 1 row in random order to prevent competing with other workers to claim the same row.
FOR UPDATE WITH LOCK – make firebird deadlock as soon as possible
see the firebird language reference for more.
from multiprocessing import Process,Pool,Queue
from firebird.driver.types import DatabaseError
import os
from util import printrs,getConnection
import time
import sys, traceback
from schema import RecreateDatabase
from firebird.driver import tpb, TPB, Isolation, TraAccessMode, TableShareMode, TableAccessMode
locking_transaction_options = tpb(Isolation.CONCURRENCY,0,TraAccessMode.WRITE)
def nextLogRow(con,cur,subid):
try:
cur.execute("SELECT log_id,sub_id,pk FROM event_log WHERE work_update IS NULL AND sub_id = ? ORDER BY RAND() ROWS 1 FOR UPDATE WITH LOCK",(subid,))
row = cur.fetchone()
if row:
log_id,sub_id,pk = row
cur.execute("UPDATE event_log SET work_update = CURRENT_TIMESTAMP WHERE log_id = ?",(log_id,))
con.commit()
return row
else:
return False
except DatabaseError as ex:
con.rollback()
if (ex.sqlcode == -913 or ex.sqlcode == -901):
pass #deadlock / lock conflict on no wait transaction
else:
print('sqlcode',ex.sqlcode,'sqlstate',ex.sqlstate)
print(ex)
traceback.print_exc(file=sys.stdout)
return None
def getSUBID(con,cur):
try:
cur.execute("UPDATE OR INSERT into event_subscriber (action,target_table,tag) values (?,?,?) MATCHING (action,target_table,tag) RETURNING sub_id",('I','languages','ring_bell'))
subid = cur.fetchone()[0]
con.commit()
return subid
except DatabaseError as ex:
con.rollback()
if (ex.sqlcode == -913 or ex.sqlcode == -901):
return getSUBID(con,cur)
else:
raise ex
def processEvents(q,tag):
pid = os.getpid()
print('tag',tag,'parent process:', os.getppid(),'process id:', pid)
con = getConnection()
cur = con.cursor()
subid = getSUBID(con,cur)
deadlockcount = 0
update_contention = 0
processed_count = 0
while True:
con.begin(locking_transaction_options)
row = nextLogRow(con,cur,subid)
try:
if row:
log_id,sub_id,pk = row
# time.sleep(0.1) #DO SOME WORK
con.begin()
cur.execute("DELETE FROM event_log WHERE log_id = ?",(log_id,))
con.commit()
processed_count += 1
else:
if row == False:
q.put((pid,deadlockcount,update_contention,processed_count))
return
if row == None:
time.sleep(0.3)
deadlockcount += 1
except DatabaseError as ex:
con.rollback()
if (ex.sqlcode == -913):
print("UPDATE/DELETE contention @ pid",pid)
time.sleep(0.3)
update_contention += 1
else:
print('sqlcode',ex.sqlcode,'sqlstate',ex.sqlstate)
print(ex)
traceback.print_exc(file=sys.stdout)
if __name__ == '__main__':
print("Process Multi Process")
print("generate database")
RecreateDatabase()
N = 500
print("Generating",N,"Entries")
con = getConnection()
cur = con.cursor()
con.begin()
for x in range(0,N):
cur.execute("insert into languages (name, year_released) values (?, ?)",('Lisp', x))
con.commit()
con.close()
print("processEvents")
q = Queue()
procs = []
for x in range(0,5):
p = Process(target=processEvents, args=(q,'ring_bell',))
p.start()
procs.append(p)
start_time = time.time()
for p in procs:
p.join()
end_time = time.time()
time_lapsed = end_time - start_time
print(time_lapsed,"seconds",(N/time_lapsed),'rps')
deadlockcount_total = 0
update_contention_total = 0
processed_count_total = 0
while not q.empty():
r = q.get()
print(r)
pid,deadlockcount,update_contention,processed_count = r
deadlockcount_total += deadlockcount
update_contention_total += update_contention
processed_count_total += processed_count
print('deadlockcount_total',deadlockcount_total)
print('update_contention_total',update_contention_total)
print('processed_count_total',processed_count_total)
Performance
I am sure it’s not the fastest since a database is not really built for this. Still it is very useful to not add another piece of technology for simple needs. There is a cost related to the trigger on the table and a select to the subscription table but since it’s a small table hopefully in the cache the impact should be small.
Some numbers from my machine
single threaded worker – 500 events in 1.40 seconds (356 events/second)
multi threaded worker – 500 events in 3.34 seconds (149 events/second)