Firebird Table Subscriptions

Sometimes you want to do some background processing in reaction to some new entry in a database table. Here I provide one possible solution.

Schema

The scheme is flexible in that you can add a new subscription without changing the table trigger. Also if there are no subscriptions no event log entries are made. You can also use the Firebird event ‘new_event’ to trigger a check for events in your code.

    create table languages
    (
        language_id    integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        name               varchar(20),
        year_released      integer
    );

    create table event_subscriber
    (
        sub_id    integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        target_table      varchar(100),
        action               varchar(2),
        tag      varchar(20)
    );

    create table event_log
    (
        log_id      integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
        sub_id      integer,
        pk          integer,
        work_update TIMESTAMP
    );

    CREATE INDEX IDX_event_log_sub_id
    ON event_log (sub_id);

    CREATE TRIGGER event_log_insert FOR event_log
    ACTIVE AFTER INSERT  POSITION 0
    AS
    BEGIN
    POST_EVENT 'new_event';
    END;

    CREATE TRIGGER languages_event FOR languages
    ACTIVE AFTER INSERT OR UPDATE OR DELETE POSITION 0
    AS
    BEGIN


    IF (DELETING) THEN
        insert into event_log (sub_id,pk) select sub_id,OLD.language_id as pk from event_subscriber where target_table = 'languages' AND action = 'D';

    IF (INSERTING) THEN
        insert into event_log (sub_id,pk) select sub_id,NEW.language_id as pk from event_subscriber where target_table = 'languages' AND action = 'I';

    IF (UPDATING) THEN
        insert into event_log (sub_id,pk) select sub_id,NEW.language_id as pk from event_subscriber where target_table = 'languages' AND action = 'U';


    END;

--Finally add some subscriptions
UPDATE OR INSERT into event_subscriber (action,target_table,tag) values ('I','languages','send_email') MATCHING (action,target_table,tag) RETURNING sub_id;

UPDATE OR INSERT into event_subscriber (action,target_table,tag) values ('I','languages','ring_bell') MATCHING (action,target_table,tag) RETURNING sub_id;

Single Threaded Worker

Here we process events for a certain subscription using one thread. Advantages are events are processed in order and code is simpler.

from util import printrs,getConnection
from schema import RecreateDatabase
import time

def processEvents():
    con = getConnection()

    cur = con.cursor()

    cur.execute("UPDATE OR INSERT into event_subscriber (action,target_table,tag) values (?,?,?) MATCHING (action,target_table,tag) RETURNING sub_id",('I','languages','ring_bell'))
    subid = cur.fetchone()[0]
    con.commit()

    processed_count = 0

    while True:
        cur.execute("SELECT log_id,sub_id,pk FROM event_log WHERE work_update IS NULL AND sub_id = ?",(subid,)) 

        rs = cur.fetchall()
        con.commit()

        for row in rs:
            log_id,sub_id,pk = row
            cur.execute("UPDATE event_log SET work_update = CURRENT_TIMESTAMP WHERE log_id = ?",(log_id,))
            con.commit()

            # time.sleep(0.1) #DO SOME WORK
            
            cur.execute("DELETE FROM event_log WHERE log_id = ?",(log_id,))
            con.commit()
            processed_count += 1
        if not rs:
            return processed_count



if __name__ == "__main__":
    print("Process Single Thread")

    print("generate database")
    RecreateDatabase()

    N = 500
    print("Generating",N,"Entries")
    con = getConnection()
    cur = con.cursor()

    con.begin()
    for x in range(0,N):
        cur.execute("insert into languages (name, year_released) values (?, ?)",('Dylan', x))
    con.commit()

    print("processEvents")
    start_time = time.time()
    processed_count = processEvents()
    end_time = time.time()

    time_lapsed = end_time - start_time
    print(time_lapsed,"seconds",(N/time_lapsed),'rps')
    print("processed_count",processed_count)
    
    

Multi Threaded / Process

Here we process events for a certain subscription using multiple processes. This can be much faster if processing take a bit of time. Disadvantages are events are not processed in order.

Of interest here is the sql used to get 1 event to process.

SELECT log_id,sub_id,pk FROM event_log WHERE work_update IS NULL AND sub_id = ? ORDER BY RAND() ROWS 1 FOR UPDATE WITH LOCK

work_update IS NULL – is to not select items being processed
ORDER BY RAND() ROWS 1 – get 1 row in random order to prevent competing with other workers to claim the same row.
FOR UPDATE WITH LOCK – make firebird deadlock as soon as possible

see the firebird language reference for more.

from multiprocessing import Process,Pool,Queue
from firebird.driver.types import DatabaseError
import os
from util import printrs,getConnection
import time
import sys, traceback
from schema import RecreateDatabase
from firebird.driver import tpb, TPB, Isolation, TraAccessMode, TableShareMode, TableAccessMode

locking_transaction_options = tpb(Isolation.CONCURRENCY,0,TraAccessMode.WRITE)

def nextLogRow(con,cur,subid):
    try:
        cur.execute("SELECT log_id,sub_id,pk FROM event_log WHERE work_update IS NULL AND sub_id = ? ORDER BY RAND() ROWS 1 FOR UPDATE WITH LOCK",(subid,)) 

        row = cur.fetchone()
        if row:
            log_id,sub_id,pk = row
            cur.execute("UPDATE event_log SET work_update = CURRENT_TIMESTAMP WHERE log_id = ?",(log_id,))
            con.commit()

            return row
        else:
            return False
    except DatabaseError as ex:
        con.rollback()
        if (ex.sqlcode == -913 or ex.sqlcode == -901):
            pass #deadlock / lock conflict on no wait transaction
        else:
            print('sqlcode',ex.sqlcode,'sqlstate',ex.sqlstate)
            print(ex)
            traceback.print_exc(file=sys.stdout)        
        return None

def getSUBID(con,cur):
    try:
        cur.execute("UPDATE OR INSERT into event_subscriber (action,target_table,tag) values (?,?,?) MATCHING (action,target_table,tag) RETURNING sub_id",('I','languages','ring_bell'))
        subid = cur.fetchone()[0]
        con.commit()
        return subid
    except DatabaseError as ex:
        con.rollback()
        if (ex.sqlcode == -913 or ex.sqlcode == -901):
            return getSUBID(con,cur)
        else:
            raise ex
    


def processEvents(q,tag):
    pid = os.getpid()
    print('tag',tag,'parent process:', os.getppid(),'process id:', pid)
    con = getConnection()

    cur = con.cursor()

    subid = getSUBID(con,cur)

    deadlockcount = 0
    update_contention = 0
    processed_count = 0
    while True:
        con.begin(locking_transaction_options)
        row = nextLogRow(con,cur,subid)
        try:
            if row:
                log_id,sub_id,pk = row

                # time.sleep(0.1) #DO SOME WORK

                con.begin()
                cur.execute("DELETE FROM event_log WHERE log_id = ?",(log_id,))
                con.commit()
                processed_count += 1
            else:
                if row == False:
                    q.put((pid,deadlockcount,update_contention,processed_count))
                    return
                if row == None:
                    time.sleep(0.3)
                    deadlockcount += 1

        except DatabaseError as ex:
            con.rollback()
            if (ex.sqlcode == -913):
                print("UPDATE/DELETE contention @ pid",pid)
                time.sleep(0.3)
                update_contention += 1
            else:
                print('sqlcode',ex.sqlcode,'sqlstate',ex.sqlstate)
                print(ex)
                traceback.print_exc(file=sys.stdout)

            


if __name__ == '__main__':
    print("Process Multi Process")

    print("generate database")
    RecreateDatabase()

    N = 500
    print("Generating",N,"Entries")
    con = getConnection()
    cur = con.cursor()

    con.begin()
    for x in range(0,N):
        cur.execute("insert into languages (name, year_released) values (?, ?)",('Lisp',  x))
    con.commit()
    con.close()
    print("processEvents")

    q = Queue()
    procs = []
    for x in range(0,5):
        p = Process(target=processEvents, args=(q,'ring_bell',))
        p.start()
        procs.append(p)

    start_time = time.time()
    for p in procs:
        p.join()
    end_time = time.time()

    time_lapsed = end_time - start_time
    print(time_lapsed,"seconds",(N/time_lapsed),'rps')

    deadlockcount_total = 0
    update_contention_total = 0
    processed_count_total = 0
    while not q.empty():
        r = q.get()
        print(r)
        pid,deadlockcount,update_contention,processed_count = r
        deadlockcount_total += deadlockcount
        update_contention_total += update_contention
        processed_count_total += processed_count
    
    print('deadlockcount_total',deadlockcount_total)
    print('update_contention_total',update_contention_total)
    print('processed_count_total',processed_count_total)

Performance

I am sure it’s not the fastest since a database is not really built for this. Still it is very useful to not add another piece of technology for simple needs. There is a cost related to the trigger on the table and a select to the subscription table but since it’s a small table hopefully in the cache the impact should be small.

Some numbers from my machine
single threaded worker – 500 events in 1.40 seconds (356 events/second)
multi threaded worker – 500 events in 3.34 seconds (149 events/second)