Multi-Tenancy in Python, FastAPI and SqlAlchemy using Postgres Row Level Security

Photo by Ed Hardie on Unsplash

Multi-Tenancy in Python, FastAPI and SqlAlchemy using Postgres Row Level Security

Multi-Tenancy is a software architecture in which a shared instance of a software serves multiple customers or tenants. A tenant is a group of users that belong together. For example, Salesforce is a multi-tenant Saas Provider, and every customer who opens an account on Salesforce is a tenant. Most Software as a Service(SaaS) companies choose to architect their applications as multi-tenant to save on costs. After all, it becomes prohibitively expensive to provision a new database/server for every customer.

The challenge with multi-tenancy applications is data isolation. Since all tenants share the same database, we need to ensure that users belonging to one tenant cannot access data belonging to another tenant and vice-versa. There are several approaches to achieving multi-tenancy in cloud-based applications, each with pros and cons. Today we will explore achieving multi-tenancy using Postgres Row Level Security for Python apps.

What is Row Level Security

Row Level Security is an [RBAC](https://en.wikipedia.org/wiki/Role-based_access_control) system in Postgresql. It allows us to define a set of [policies](https://www.postgresql.org/docs/current/ddl-rowsecurity.html) which restrict which rows are accessible by users. It allows individual tenants to have full SQL access to the database while hiding each tenant’s information from other tenants.

Enabling Row Level Security on Python Apps

The first thing we need to enable Row level security is at least two database users. One user creates the tables, run migrations, etc., and another accesses data on behalf of tenants. This is because, in Postgres, the table owner isn’t restricted by security policies by default. Furthermore, superusers and users created with the BYPASSRLS permission aren't subject to the table's policies.

Database changes

So let's start by creating an app called toy_app that has a database called toy_db

CREATE DATABASE toy_db;
CREATE ROLE admin with ENCRYPTED PASSWORD 'password';
GRANT ALL ON DATABASE toy_db TO admin;

We also need another user that will be used to access the database on behalf of the tenants. Let's call this user tenant_user. We will grant this user only the permissions it needs to deal with tenant data. This can be created via an alembic migration.

def upgrade() -> None:
    op.execute(
        """
        DO $$
        BEGIN
        IF NOT EXISTS(SELECT * FROM pg_roles WHERE rolname = 'tenant_user') THEN
            CREATE ROLE tenant_user;
            GRANT tenant_user TO admin;
            GRANT USAGE ON SCHEMA public TO tenant_user;
            GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO tenant_user;
            ALTER DEFAULT PRIVILEGES IN SCHEMA pyblic GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO tenant_user;
            GRANT USAGE ON ALL SEQUENCES IN SCHEMA public TO tenant_user;
            ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT USAGE ON SEQUENCES TO tenant_user;
        END IF;
        END
        $$
        """
    )

Now that the Postgres roles are available, we need to create a table to store the list of tenants accessing our app. Let us create this in another migration.

def upgrade() -> None:
    op.create_table(
        "tenants",
        sa.Column("id", sa.INTEGER, primary_key=True),
        sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False)
    )
    op.execute(
        "ALTER TABLE tenants ENABLE ROW LEVEL SECURITY;",
    )
    op.execute(
        "CREATE POLICY tenant_isolation on tenants \
            USING (id = current_setting('app.current_tenant')::int);",
    )

You'll notice that we have two statements after the create_table operation.

ALTER TABLE tenants ENABLE ROW LEVEL SECURITY;

What this does is that it tells Postgres to enable Row Level Security on this table.

CREATE POLICY tenant_isolation on tenants USING (id = current_setting('app.current_tenant'))::int;

Over here, we are creating a Policy telling Postgres to restrict access to this table by id. We will see how to set app.current_tenant later. The ::int is just a type cast because the current_tenant is stored as a string.

Now that we have a tenants table, we can create a table that stores some information about our tenants. Let's create a table called toys.

def upgrade() -> None:
    op.create_table(
        "toys",
        sa.Column("id", sa.INTEGER, primary_key=True),
        sa.Column("tenant_id", sa.Integer, sa.ForeignKey("tenants.id")),
        sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False)
    )
    op.execute(
        "ALTER TABLE toys ENABLE ROW LEVEL SECURITY;",
    )
    op.execute(
        "CREATE POLICY tenant_isolation on toys \
            USING (tenant_id = current_setting('app.current_tenant')::int);",
    )

As you can see, we have added a tenant_id column on toys. This lets us know which toys belong to which tenant. We have also enabled row-level security on the table and created a policy restricting access by tenant id.

Note: You will have to enable row-level security and create a policy on every table to which you want to restrict access.

Now that we have our data model set up, we can see how to implement row-level security for querying data.

Middleware

We restrict access to data using a tenant_id which means every request to our service will need to have the tenant id included as part of its context. Usually, this is included via a subdomain, as part of an authorization header, or even as a request param. For our service, let's assume we get the tenant id via a header called X-Tenant-Id. Every request to our service needs to include this header to set the tenant context.

To set the tenant context in our app, we will use middleware. Since we are using FastAPI, we can create ASGI middleware.

from contextvars import ContextVar

from starlette.datastructures import Headers
from starlette.responses import JSONResponse
from starlette.responses import Response
from starlette.types import ASGIApp
from starlette.types import Receive
from starlette.types import Scope
from starlette.types import Send


global_tenant_id: ContextVar[str] = ContextVar("global_tenant_id", default=None)


class PostgresRLSMiddleware:
    def __init__(
        self,
        app: ASGIApp,
    ) -> None:
        self.app = app

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] not in (
            "http",
            "websocket",
        ):
            await self.app(scope, receive, send)
            return

        headers = Headers(scope=scope)
        request_tenant_id = headers.get("X-Tenant-Id")

        response: Response
        if request_tenant_id is None:
            response = JSONResponse(
                {
                    "error": "Tenant must be provided",
                },
                status_code=401,
                headers=dict(headers),
            )
            await response(scope, receive, send)
        else:
            global_tenant_id.set(request_tenant_id)
            await self.app(scope, receive, send)


def get_global_tenant_id():
    return global_tenant_id.get()

What this does it it saves the tenant id as a [contextvar](https://docs.python.org/3/library/contextvars.html). In case you haven't used them, contextvars allow you to to save global vars inside a specific runtime context. Using this library, we can create a new running context for every incoming request, and save a value on a global var, which will only be available within that context. This means that we can save the tenant id specific to particular request.

Establishing a database connection

Now that we have the tenant id saved, we need to figure out how to pass it to the data layer so that our queries are restricted by tenant id. Fortunately, we can accomplish this easily by creating a connection using SQLAlchemy.

When creating a connection via SQLAlchemy, we can set tenant context as follows.

from middleware import get_global_tenant_id

async def get_session() -> AsyncSession:
    async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        try:
            tenant_id = int(get_global_tenant_id())
            query = text(f"SET app.current_tenant={tenant_id};")
            await session.execute(text("SET SESSION ROLE tenant_user;"))
            await session.execute(query)
            yield session
        except:
            await session.rollback()
            raise
        finally:
            await session.execute(text("RESET ROLE;"))
            await session.commit()
            pass

Pay attention to this line. SET app.current_tenant='{tenant_id}'

We are setting the tenant context for this particular request here. This ensures that row level security will only return values associated with that particular tenant.

SET SESSION ROLE tenant_user; This line is used because we connected to the database as a superuser role in Postgres. But you can also connect to the DB using this role instead.

You can now use this method to get a connection and use it to query all data

@app.get("/toys", response_model=list[Toy])
async def get_songs(session: AsyncSession = Depends(get_session)):
    result = await session.execute(select(Toy))
    toys = result.scalars().all()
    return [Toy(name=toy.name, id=toy.id) for toy in toys]

That's it. You now have row level security enabled on your app.

Source code for this project can be found here: https://github.com/theundeadmonk/postgres-rls-fastapi-demo