Python SQL Alchemy LOCK meselesi

Bir kayıt üzerinde işlem yaparken başka bir kaydın onu değiştirememesi!

Posted on December 29, 2022

Bir veritabanında bir tabloda "gönderilmesi gereken maillerin" tutulduğunu varsayalım. Bu tabloya "gönderilecek olan mailler" sürekli olarak insert edilecek. Varsayılan olarak her kaydın statüsü "Active" olsun.

Diğer taraftada birden fazla "worker" ın bu kayıtları işlemesi gerekiyor. Şöyleki "worker" bir kaydı alacak ve kaydın statüsünü "Running" e çekecek. Kaydı commitleyecek ve mail gönderim sürecini başlatacak. Mail gönderme işlemi normal olarak "async" bir metodla yapılacak. Gönderim başarılı olursa kaydın durumu "Sent" olarak güncellenecek.

Burada mesele şu. İki farklı "worker"ın aynı anda bu kaydı okuması ve iki ayrı mail gönderilmesi olasıdır. Bunu engellemek için kaydı okurken "with_for_update()" komutu ile kaydı okursak, SQL Alchemy okunan kayda "bu işlem commitlenene kadar" geçici lock koymuş olacak. Lock koyulduğunda bu kaydı "with_for_update" ile okumak isteyen başka bir "transaction" lock ortadan kalkana kadar beklemek zorunda kalacak. Bu bekleme süresi "belirli bir süre limiti" geçerse zaten bu işlem (yani ikincil işlemden bahsediyorum) hata almış olacaktır.

Örnek verilerimiz şu şekilde

# Name Status Created At
1 John Active 2022-12-28 00:00:00
2 Ilkay Active 2022-12-28 17:47:54
3 Erhan Active 2022-12-28 17:48:07

Şimdi bu yazılanları test edebileceğimiz kodları paylaşıyorum.

Veritabanı bağlantısı aşağıdaki gibi olacak. Bu test esnasında MYSQL DB kullanıldı. Farklı DB ler için CREATE_ENGINE içindeki "text" değiştirilmelidir.

# db_connect.py

from sqlalchemy import create_engine

def get_engine():
    try:
        # Veritabanı bağlantısını oluşturun
        engine = create_engine('mysql://root:ILK123@localhost/test')
        return engine
    except Exception as e:
        print(f"Error occured while connecting to database: {e}")
        return None

Test adında bir model yaratacağız. Veritabanında da bu tablo yaratılmış olacak

# models.py

from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Test(Base):
    __tablename__ = 'test'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    status = Column(String)
    created_at = Column(DateTime)

Aşağıdaki kod ile DB ye kayıt insert edilecektir.

# addrow.py
from sqlalchemy.orm import sessionmaker
import datetime
from models import *
from db_connect import get_engine

engine = get_engine()

if engine:
    # Oturum oluşturucuyu başlatın
    Session = sessionmaker(bind=engine)

    # Oturum oluşturun
    session = Session()

    # Kayıt ekleyin
    record = Test(name='Beren', status='Active', created_at=datetime.datetime.now())
    session.add(record)
    record = Test(name='Ceren', status='Active', created_at=datetime.datetime.now())
    session.add(record)

    # Değişiklikleri veritabanına kaydedin
    session.commit()

    # Oturumu kapatın
    session.close()
else:
    print("Failed to connect to database")


Aşağıdaki kod bir SQLAlchemy veritabanı oturumu oluşturur ve belirli bir test kaydının durumunu günceller. update_status() adlı bir fonksiyon kullanarak, belirli bir testin id numarasını ve yeni durumunu alır. Ardından, with_for_update() metodu kullanılarak, kayıt okunur ve kilitleme yapılır, böylece aynı kayıt başka bir işlem tarafından güncellenemez.

Daha sonra, for döngüsü kullanarak, 20 saniye boyunca yapay bir gecikme oluşturur ve süre sonunda kaydın durumunu günceller. Bu işlem commit edilerek veritabanına yansıtılır ve veritabanına bağlantı kapatılır. Eğer güncelleme işlemi başarısız olursa, veritabanı işlemleri geri alınır ve hata mesajı yazdırılır.

Bu kod bir bir konsoldan run edilince bu kodun __main__ bloğu, kullanıcıdan güncellenecek kaydın id numarasını alır ve bu numarayı update_status() fonsiyonuna gönderir. Bu esnada güncellenecek kaydın ID si girilmeden önce diğer konsoldan bir alttaki "update.py" dosyası çalıştırılmalıdır. Her iki konsolda güncellenecek kaydın ID numarasını soracaktır.

İlk önce ID girilip enter a basılması gereken consol bu kodun çalıştırıldığı konsol olmalıdır

# update_with_delay.py
from sqlalchemy.orm import sessionmaker
import time
import models
from db_connect import get_engine


def update_status(engine, id, new_status='Running'):
    Session = sessionmaker(bind=engine)
    session = Session()

    try:
        record = session.query(models.Test).with_for_update().filter(models.Test.id == id,
                                                                     models.Test.status == "Active").first()
        if record:
            # Burada 20 saniyelik yapay bir gecikme yaratılıyor.
            for i in range(20):
                print(i + 1)
                time.sleep(1)

            record.status = new_status
            session.commit()
            print("Record was updated")
        else:
            print(f"No record found with id {id} and status 'Active'")

    except Exception as e:
        session.rollback()
        print("Update Error", str(e))
    finally:
        session.close()


if __name__ == "__main__":
    engine = get_engine()
    if engine:
        id = int(input("Enter the id of the record to update: "))
        update_status(engine, id)
    else:
        print("Failed to connect to database")

Aşağıdaki kod'da "LOCK Süresini" de test etmiş olacağız. Şöyleki MYSQL'in varsayılan bekleme süresi bendeki kurulumda 50 saniyeydi. Bunu 5 saniyeye çekeceğiz. Ancak bu değişiklik sadece ilgili "DB Engine Instance" için geçerlidir. Yani MYSQL in GLOBAL "lock_wait_timeout" süresini değiştirmiş olmuyoruz.

Aşağıdaki kodda neden "sessionmaker" yerine "EXECUTE" methodu ile doğrudan raw SQL çalıştırıldı diye sormak yerinde bir sorudur. Sebebi ise "innodb_lock_wait_timeout" süresini ancak bu şekilde güncelleyebiliyoruz. "Sqlalchemy Session" ile bu yapılamıyor malesef.

# update.py

from sqlalchemy import text
from db_connect import get_engine

def update_status(engine, id, new_status='Running'):
    try:
        with engine.connect() as conn:
            result = conn.execute(text("SELECT @@innodb_lock_wait_timeout"))
            default_value = result.scalar()
            print("The default value of innodb_lock_wait_timeout is:", default_value)
            # Burada Lock için bekleme süresini 5 sanye olarak set ediyoruz.
            # Bu ayar sadece ilgili db session için geçerlidir.
            conn.execute("SET innodb_lock_wait_timeout = 5")

            result = conn.execute(text("SELECT @@innodb_lock_wait_timeout"))
            default_value = result.scalar()
            print("The new value of innodb_lock_wait_timeout is:", default_value)
            rowcount = 0
            with conn.begin():
                result = conn.execute("""UPDATE test.test SET status = %s
                                WHERE status='Active' and  id=%s""", [new_status, id])
                rowcount = result.rowcount
            # COMMIT işlemi gerçekleştir
            conn.execute("COMMIT")
            if rowcount == 1:
                print(f"{id} nolu Kayıt güncellendi")
            else:
                print(f"{id} nolu Kayıt güncellenemedi")
    except Exception as e:
        print("Update Error", str(e))

if __name__ == "__main__":
    engine = get_engine()

    if engine:
        id = int(input("Enter the id of the record to update: "))
        update_status(engine, id)
    else:
        print("Failed to connect to database")

3 ayrı test yapıldı.

İlk testte "lock_wait_timeout" süresini 5 saniye olarak güncelledim. Bu şartlarda kayıt güncellenmek istendiğinde şu sonuçlar alındı

python update.py
Enter the id of the record to update: 1
The default value of innodb_lock_wait_timeout is: 50
The new value of innodb_lock_wait_timeout is: 5
Update Error (MySQLdb._exceptions.OperationalError) (1205,
        'Lock wait timeout exceeded; try restarting transaction')
[SQL: UPDATE test.test SET status = %s WHERE status='Active' and  id=%s]
[parameters: ('Running', 1)]
(Background on this error at: https://sqlalche.me/e/14/e3q8)

İkinci testte "lock_wait_timeout" süresini güncellemedim ve varsayılan değer olan 50 saniye olarak bıraktım. Bu şartlarda kayıt güncellenmek istendiğinde şu sonuçlar alındı (Testte aynı kayıt kullanılacaksa manuel olarak DB'den durumu tekrar "Active" olarak güncellenmelidir)

python update.py
Enter the id of the record to update: 1
The default value of innodb_lock_wait_timeout is: 50
The new value of innodb_lock_wait_timeout is: 50
1 nolu Kayıt güncellenemedi

Üçüncü bir test ise en önemlisi. "update_with_delay.py" dosyasında "with_for_update()" kısmını sildim. Bu şekilde iki kodu da çalıştırdım. Önce "update_with_delay.py" dosyası için kaydın numarasını seçtim. Bu esnada 20 saniyelilk döngü başladı. Sonrasında "update.py" için aynı kayıt numarası girdim. Enter a basar basmaz kaydın başarılı bir şekilde güncellendiğini gördüm.

Buradaki temel soru şu. Peki "update_with_delay.py" dosyasındaki döngü bittiğinde ne olacak? Cevap: Kayıt bir kez daha güncellenecek. Böylece aslında aynı kayıt 2 farklı session'a konu olacak ve bu işlemin bir "bakiye" güncellemesi olduğunu varsayarsak bu durumda işler bayağı karışmış olacaktır.

Son olarak önemli bir not daha. "Update.py" dosyasında bağlantı açarken " with engine.connect() as conn:" şeklinde WITH ile bağlantı açıyoruz.

Burada, with ifadesi bir context manager kullanarak oturum açma işleminin yönetilmesini sağlıyor. With ifadesi, bu durumda, oturum açma işlemi bittiğinde otomatik olarak conn nesnesinin kapatılmasını ve kaynakların serbest bırakılmasını sağlıyor.

Ayrıca 'with conn.begin()' bloğu ile de , bir veritabanı işlemi (transaction) başlatılır ve transaction'ın başarısız olması durumunda geri alma (rollback) işlemi yapılır. Bu şekilde, veritabanındaki güncelleme işlemi güvenli bir şekilde gerçekleştirilir. Bloktan çıktıktan sonra işlemin "conn.execute("COMMIT")" ile komitlendiğine dikkat edin.

Tekrar söylmek gerekirse doğrudan RAW sql kullanılmasının sebebi "innodb_lock_wait_timeout" süresini güncellemek içindir. Yoksa zaten bu yöntemi kullanmak normalde önerilmez.


SQLAlchemy olmadan doğrudan SQL Kulllanımı için de DB lerin kendine özel paketleri oluyor. MYSQL için "mysql-connector-python" kütüphanesi bu iş için uygundur.

pip install mysql-connector-python ile kurulum sağlanır.

Aşağıdaki kod ile yukarıda anlatılan işlemler benzer şekilde yapılabilir.

Burada "conn.autocommit = False" kısmına dikkat. Yani "cursor = conn.cursor()" dedikten sonra "conn.commit()" diyene kadar yapılan işlemler DB ye yansıtıllmaz. Ayrıca güncellenmekte olan kayda lock koyulur. Bu esnada hata alınırsa "conn.rollback()" çağrısı ile veritabanındaki değişiklikler geri alınacaktır.

# mysqldb.py
import mysql.connector
import time

conn=None
cursor=None
try:
    conn = mysql.connector.connect(host='localhost',
                                   database='test',
                                   user='USER',
                                   password='PASSWORD')

    conn.autocommit = False
    cursor = conn.cursor()
    # withdraw from account A
    sql_update_query = """Update test set name = "ILKAY" where id = 1"""
    cursor.execute(sql_update_query)

    for i in range(20):
        print(i + 1)
        time.sleep(1)

    # Commit your changes
    conn.commit()

except Exception as error:
    print("Failed to update record to database : {}".format(error))
    # reverting changes because of exception
    if conn:
        conn.rollback()
finally:
    # closing database connection.
    if conn:
        if conn.is_connected():
            if cursor:
                cursor.close()
            conn.close()
            print("connection is closed")