SQLAlchemy是Python的一款開源軟體,提供 SQL 工具包及物件關係對映(ORM)的功能,以 MIT 授權。SQLAlchemy 的設計概念比較接近 Java 的 Hibernate。
另一個常用的 ORM 套件為 Django ORM,他的設計概念跟 SQLAlchemy 不同,是使用 Active Record 的方式。
安裝套件
因我們是以 MariaDB 測試,除了要安裝 SQLAlchemy,還要安裝 mysql 的 driver,我們是用 python 的 driver: PyMySQL 進行測試
使用 python 3 的版本,是用 pip3 的指令,如果是 python 2 是用 pip。
sudo pip3 install SQLAlchemy
sudo pip3 install PyMySQL
DBEngine.py
以 Table: status 為例,在 DBEngine 中,以 create_engine
產生 SQLAlchemy 最基本的 engine,再由 engine 產生 session_maker
備用,待會要在 DAO 裡面使用。
後面的部分,用 engine.dialect.has_table
判斷 table 是否存在,如果不存在,就建立該 table
另外注意 updatedate 是在每次更新 record 時,要更新時間,createdate 則是在記錄產生 record 的時間。
from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
db_user = 'root'
db_password = 'password'
db_host = 'localhost'
db_port = 3306
db_name = 'testdb'
engine = create_engine('mysql+pymysql://'+db_user+':'+db_password+'@'+db_host+':'+str(db_port)+'/'+db_name, encoding="utf8", echo=False, pool_pre_ping=True, pool_recycle=3600)
# echo: 是否顯示SQL的指令與相關訊息
DBSession = sessionmaker( bind=engine )
if not engine.dialect.has_table(engine, "status"):
metadata = MetaData(engine)
radiostatus_table = Table('status', metadata,
Column('rseq', Integer, primary_key=True, autoincrement=True),
Column('statuscode', Integer, nullable=False),
Column('updatedate', TIMESTAMP, default=func.now(), onupdate=func.now()),
Column('createdate', TIMESTAMP, default=func.now())
)
metadata.create_all(engine)
Data Value Object
以 Base = declarative_base()
建立 Status 物件,作為 ORM 的物件定義,在該物件中,分別定義剛剛產生的 table: status的欄位對應。
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Status(Base):
__tablename__='status'
rseq = Column(Integer, primary_key=True, autoincrement=True)
statuscode = Column(Integer, nullable=False)
updatedate = Column(TIMESTAMP, default=func.now(), onupdate=func.now)
createdate = Column(TIMESTAMP, default=func.now())
def __init__(self):
pass
def __init__(self, rseq, statuscode, updatedate, createdate):
self.rseq = rseq
self.statuscode = statuscode
self.updatedate = updatedate
self.createdate = createdate
@classmethod
def to_string(cls, vo):
result = ''
result += "(rseq, statuscode, updatedate, createdate)=({rseq}, {statuscode}, {updatedate}, {createdate})\n".format(rseq=str(vo.rseq), statuscode=str(vo.statuscode), updatedate=str(vo.updatedate), createdate=str(vo.createdate))
return result
Data Access Object
最簡單的是 create data 的 function,首先透過 DBSession() 產生 database session,然後在該 function 中,將要處理的工作做完,完成時要呼叫 commit,發生 error 時要呼叫 rollback,在結束前,以 close 回收 db session。
import logging
from DBEngine import *
from sqlalchemy import *
from sqlalchemy.orm import *
class StatusDao():
def __init__(self):
pass
def create(self, vo):
session = DBSession()
try:
logging.debug("vo="+str(vo))
session.add(vo)
session.commit()
except Exception as e:
logging.error(str(e))
session.rollback()
raise
finally:
session.close()
這是 create_or_update
的 function,如果檢查該 rseq 的資料不存在,再 create record ,否則就以 update 方式更新該 record
def create_or_update(self, vo):
session = DBSession()
try:
oldvo = session.query(Status).filter_by(rseq=vo.rseq).first()
if oldvo:
oldvo.statuscode = vo.statuscode
oldvo.updatedate = datetime.datetime.now()
logging.debug("updating "+str(RadioStatus.to_string(oldvo)))
else:
logging.debug("creating "+str(RadioStatus.to_string(vo)) )
session.add(vo)
session.commit()
newvo = session.query(Status).filter_by(rseq=vo.rseq).first()
return newvo
except Exception as e:
logging.error(str(e))
raise
finally:
session.close()
這是將 status 裡面所有資料都刪除的 function
def get_all(self):
session = DBSession()
try:
radiostatus_list = session.query(Status).all()
return radiostatus_list
except Exception as e:
logging.error(str(e))
raise
finally:
session.close()
取得某個 record 的資料
def get_by_rseq(self, rseq):
session = DBSession()
try:
vo = session.query(Status).filter_by(rseq=rseq).first()
return vo
except Exception as e:
logging.error(str(e))
raise
finally:
session.close()
取得所有 record 的資料
def get_all(self):
session = DBSession()
try:
status_list = session.query(Status).all()
return status_list
except Exception as e:
logging.error(str(e))
raise
finally:
session.close()
使用 DAO
產生 dao 物件後,再直接呼叫 function 即可
dao = StatusDao()
status = Status(None, 0, None, None)
vo = dao.create_or_update(status)
dao.delete_all()
這裡的 DAO 使用的 database session 作用範圍,並沒有超過 function 的呼叫範圍,因此這邊的 DAO 比較像是 business method 的功能,可以將一次 database task 要處理的所有工作,放到同一個 DAO function 裡面,在該 function 結束時,再 close db session,這樣的做法,對於簡單的 project 來說,比較容易管理 DAO 及 db session,也比較不會發生忘記 close db session 的問題。
沒有留言:
張貼留言