sqlachemy 是python的orm框架,在使用一段时间后,我们通常会出现事务嵌套的情况,看到很多人写代码的时候,居然是session到处传递,这无疑是加大了代码之间的耦合度。 案例: def save(session): # TODO def update(session): # TODO def service(): session = getSession(); try: save(session); update(session); session.commit(); except Exception as e: session.rollback(); finally: if not session: session.close(); 假设save和update是同一个事务,但是上述的实践缺强制了save和update的 session相耦合来达成,好的实践应该是:save和update无任何关系,只是在实现业务逻辑时,组合到一个事务,确保事务性即可,即: def service(): try: save(); update(); except Exception as e: # TODO 因此如何解决这个问题是本文需要阐述的主题。
解决方案是:
ACID是事务的四个基本特征,通常我们的理解事务是一个操作单元,要么一起成功要么一起失败(原子性);通过一个例子来直接说明如何解决的。 场景:注册用户(添加一个用户,会未用户送10个积分)
创建表
create table user( id int not null auto_increment, name varchar(255) not null default '' comment '用户名', created_at datetime not null default current_timestamp comment '创建时间', updated_at datetime not null default current_timestamp comment '更新时间', primary key (`id`) )engine innodb charset=utf8 comment '用户表'; create table user_credits( id int not null auto_increment, user_id int not null default 0 comment '用户ID', user_name varchar(255) not null default '' comment '用户名', score int not null default 0 comment '积分', created_at datetime not null default current_timestamp comment '创建时间', updated_at datetime not null default current_timestamp comment '更新时间', primary key (`id`), unique key `uk_user_id` (`user_id`) )engine innodb charset=utf8 comment '用户积分表';
sqlalchemy 实现
# -*- coding:utf-8 -*-import datetimefrom sqlalchemy import create_engine, Column, Integer, String, DateTimefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, scoped_sessionengine = create_engine("mysql+pymysql://root:root@localhost:3306/csdn", echo=True)# 必须使用scoped_session,域session可以将session进行共享DBSession = scoped_session(sessionmaker(bind=engine))BaseModel = declarative_base()# ----------- Relation Model Object---------------- #class User(BaseModel): __tablename__ = "user" id = Column(Integer, primary_key=True) name = Column(String) created_at = Column(DateTime, default=datetime.datetime.now) updated_at = Column(DateTime, default=datetime.datetime.now)class UserCredits(BaseModel): __tablename__ = "user_credits" id = Column(Integer, primary_key=True) user_id = Column(Integer) user_name = Column(String) score = Column(Integer) created_at = Column(DateTime, default=datetime.datetime.now) updated_at = Column(DateTime, default=datetime.datetime.now)# ----------- Service implements---------------- #def add_user(user): " 添加用户 " session = DBSession() try: session.add(user) session.commit() except Exception as e: session.rollback() print("AddUser: ======={}=======".format(e)) finally: if not session: session.close()def add_user_credits(userCredits, interrupt=True): " 添加用户积分记录 " session = DBSession() try: if interrupt: raise Exception("--- interrupt ---") session.add(userCredits) session.commit() except Exception as e: session.rollback() print("AddUserCredits: ======={}=======".format(e)) finally: if not session: session.close()def regist_user(): session = DBSession() try: # 开启子事务 session.begin(subtransactions=True) # TODO Service user = User(name='wangzhiping') add_user(user) add_user_credits(UserCredits( user_id=user.id, user_name=user.name, score=10 ), False) session.commit() except Exception as e: session.rollback() print("AddUserCredits: ======={}=======".format(e)) finally: if not session: session.close()# ---------- exec -----------regist_user()
1,设置session时,需要指定为scoped_session,目的是session可以共享(ThreadLocal); 2,session.begin(subtransactions=True) 开启子事务管理; 这是实际上regist_user是在同一个线程中的session,这是add_user,add_user_credits实际上session是同一个,所以可以实现。其实这个可以更进一步扩展,把事务隔离级别,传播属性,这里不做介绍---------------------作者:紫守笨 来源:CSDN 原文:https://blog.csdn.net/program_red/article/details/55194130?utm_source=copy 版权声明:本文为博主原创文章,转载请附上博文链接!
转载:https://blog.csdn.net/program_red/article/details/55194130