Database Schema
The Mass Payout backend uses PostgreSQL for persistent storage.
Database Entities
Asset
Stores imported asset tokens and their lifecycle contracts.
Table: asset
CREATE TABLE asset (
id UUID PRIMARY KEY,
token_id VARCHAR NOT NULL UNIQUE, -- Asset token ID (0.0.xxxxx)
name VARCHAR NOT NULL, -- Asset name
symbol VARCHAR NOT NULL, -- Asset symbol
total_supply BIGINT NOT NULL, -- Total token supply
decimals INT NOT NULL, -- Token decimals
lifecycle_contract_id VARCHAR, -- LifeCycle contract ID
payment_token_id VARCHAR, -- Payment token ID (USDC, etc.)
last_synced_at TIMESTAMP, -- Last blockchain sync
sync_status VARCHAR, -- SYNCED, SYNCING, FAILED
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
Fields:
token_id: Hedera token ID (e.g.,0.0.789012)lifecycle_contract_id: Associated LifeCycle Cash Flow contractpayment_token_id: Token used for payments (e.g., USDC0.0.429274)sync_status: Blockchain sync state
Indexes:
CREATE UNIQUE INDEX idx_asset_token_id ON asset(token_id);
CREATE INDEX idx_asset_sync_status ON asset(sync_status);
Distribution
Stores dividend/coupon distributions.
Table: distribution
CREATE TABLE distribution (
id UUID PRIMARY KEY,
asset_id UUID NOT NULL REFERENCES asset(id),
type VARCHAR NOT NULL, -- DIVIDEND, COUPON, SNAPSHOT
execution_type VARCHAR NOT NULL, -- MANUAL, SCHEDULED, RECURRING, AUTOMATIC
payout_type VARCHAR, -- FIXED, PERCENTAGE (for equities)
amount DECIMAL, -- Total distribution amount
concept VARCHAR, -- Description/label
scheduled_time TIMESTAMP, -- For scheduled distributions
frequency VARCHAR, -- HOURLY, DAILY, WEEKLY, MONTHLY
start_time TIMESTAMP, -- For recurring distributions
trigger_condition VARCHAR, -- For automatic distributions
status VARCHAR NOT NULL, -- PENDING, PROCESSING, COMPLETED, FAILED
executed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
Fields:
type: Distribution type (DIVIDEND for equities, COUPON for bonds)execution_type: How distribution is triggered- MANUAL: Execute immediately
- SCHEDULED: Execute at specific time
- RECURRING: Execute on recurring schedule
- AUTOMATIC: Execute based on trigger
payout_type: FIXED (same amount per holder) or PERCENTAGE (proportional to holdings)status: Current processing state
Indexes:
CREATE INDEX idx_distribution_asset ON distribution(asset_id);
CREATE INDEX idx_distribution_status ON distribution(status);
CREATE INDEX idx_distribution_scheduled ON distribution(scheduled_time) WHERE status = 'PENDING';
Holder
Stores asset holders and their payment amounts.
Table: holder
CREATE TABLE holder (
id UUID PRIMARY KEY,
asset_id UUID NOT NULL REFERENCES asset(id),
distribution_id UUID REFERENCES distribution(id),
account_id VARCHAR NOT NULL, -- Holder account ID
balance BIGINT NOT NULL, -- Token balance
payment_amount DECIMAL, -- Payment amount for distribution
paid BOOLEAN DEFAULT FALSE, -- Payment status
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
UNIQUE(asset_id, distribution_id, account_id)
);
Fields:
account_id: Hedera account ID (e.g.,0.0.123456)balance: Token holdings (in smallest unit)payment_amount: Calculated payment for this distributionpaid: Whether payment has been executed
Indexes:
CREATE INDEX idx_holder_asset ON holder(asset_id);
CREATE INDEX idx_holder_distribution ON holder(distribution_id);
CREATE INDEX idx_holder_account ON holder(account_id);
CREATE UNIQUE INDEX idx_holder_unique ON holder(asset_id, distribution_id, account_id);
BatchPayout
Tracks payout batch execution.
Table: batch_payout
CREATE TABLE batch_payout (
id UUID PRIMARY KEY,
distribution_id UUID NOT NULL REFERENCES distribution(id),
batch_number INT NOT NULL, -- Batch sequence number
total_holders INT NOT NULL, -- Holders in this batch
successful_payments INT DEFAULT 0, -- Successful payments
failed_payments INT DEFAULT 0, -- Failed payments
transaction_id VARCHAR, -- Blockchain transaction ID
status VARCHAR NOT NULL, -- PENDING, PROCESSING, COMPLETED, FAILED
error_message TEXT, -- Error details if failed
executed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
Fields:
batch_number: Sequence number for this batch in the distributiontransaction_id: Hedera transaction ID (e.g.,0.0.123456@1234567890.123456789)status: Batch processing stateerror_message: Failure reason if status is FAILED
Indexes:
CREATE INDEX idx_batch_distribution ON batch_payout(distribution_id);
CREATE INDEX idx_batch_status ON batch_payout(status);
BlockchainEventListenerConfig
Configuration for blockchain event listeners.
Table: blockchain_event_listener_config
CREATE TABLE blockchain_event_listener_config (
id UUID PRIMARY KEY,
contract_id VARCHAR NOT NULL, -- Contract to listen to
last_processed_timestamp BIGINT, -- Last processed event timestamp
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
Fields:
contract_id: LifeCycle Cash Flow contract being monitoredlast_processed_timestamp: Timestamp of last processed event (prevents reprocessing)enabled: Whether listener is active
Indexes:
CREATE UNIQUE INDEX idx_listener_contract ON blockchain_event_listener_config(contract_id);
Entity Relationships
┌──────────┐
│ Asset │
└────┬─────┘
│
│ 1:N
│
▼
┌─────────────────┐
│ Distribution │
└────┬────────────┘
│
│ 1:N
├──────────────────┐
│ │
▼ ▼
┌──────────┐ ┌──────────────┐
│ Holder │ │ BatchPayout │
└──────────┘ └──────────────┘
TypeORM Entities
Asset Entity
@Entity("asset")
export class AssetEntity {
@PrimaryGeneratedColumn("uuid")
id: string;
@Column({ unique: true })
tokenId: string;
@Column()
name: string;
@Column()
symbol: string;
@Column({ type: "bigint" })
totalSupply: string;
@Column()
decimals: number;
@Column({ nullable: true })
lifecycleContractId?: string;
@Column({ nullable: true })
paymentTokenId?: string;
@Column({ nullable: true })
lastSyncedAt?: Date;
@Column({ nullable: true })
syncStatus?: string;
@CreateDateColumn()
createdAt: Date;
@UpdateDateColumn()
updatedAt: Date;
@OneToMany(() => DistributionEntity, (distribution) => distribution.asset)
distributions: DistributionEntity[];
}
Distribution Entity
@Entity("distribution")
export class DistributionEntity {
@PrimaryGeneratedColumn("uuid")
id: string;
@Column()
assetId: string;
@ManyToOne(() => AssetEntity, (asset) => asset.distributions)
@JoinColumn({ name: "assetId" })
asset: AssetEntity;
@Column()
type: string;
@Column()
executionType: string;
@Column({ nullable: true })
payoutType?: string;
@Column({ type: "decimal", nullable: true })
amount?: string;
@Column({ nullable: true })
concept?: string;
@Column({ nullable: true })
scheduledTime?: Date;
@Column({ nullable: true })
frequency?: string;
@Column({ nullable: true })
startTime?: Date;
@Column({ nullable: true })
triggerCondition?: string;
@Column()
status: string;
@Column({ nullable: true })
executedAt?: Date;
@CreateDateColumn()
createdAt: Date;
@UpdateDateColumn()
updatedAt: Date;
@OneToMany(() => HolderEntity, (holder) => holder.distribution)
holders: HolderEntity[];
@OneToMany(() => BatchPayoutEntity, (batch) => batch.distribution)
batches: BatchPayoutEntity[];
}
Migrations
The backend uses TypeORM migrations for schema management.
Creating Migrations
# Generate migration from entity changes
npm run typeorm:migration:generate -- -n MigrationName
# Create empty migration
npm run typeorm:migration:create -- -n MigrationName
Running Migrations
# Run pending migrations
npm run typeorm:migration:run
# Revert last migration
npm run typeorm:migration:revert
Example Migration
export class CreateAssetTable1234567890 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.createTable(
new Table({
name: "asset",
columns: [
{
name: "id",
type: "uuid",
isPrimary: true,
generationStrategy: "uuid",
default: "uuid_generate_v4()",
},
{
name: "token_id",
type: "varchar",
isUnique: true,
},
// ... more columns
],
}),
);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.dropTable("asset");
}
}
Repository Pattern
Repositories abstract database access:
Example Repository
@Injectable()
export class DistributionRepository {
constructor(
@InjectRepository(DistributionEntity)
private readonly repo: Repository<DistributionEntity>,
) {}
async findById(id: string): Promise<Distribution | null> {
const entity = await this.repo.findOne({
where: { id },
relations: ["asset", "holders", "batches"],
});
return entity ? this.toDomain(entity) : null;
}
async findPendingScheduled(now: Date): Promise<Distribution[]> {
const entities = await this.repo.find({
where: {
status: "PENDING",
scheduledTime: LessThanOrEqual(now),
},
});
return entities.map((e) => this.toDomain(e));
}
async save(distribution: Distribution): Promise<void> {
const entity = this.toEntity(distribution);
await this.repo.save(entity);
}
private toDomain(entity: DistributionEntity): Distribution {
// Map entity to domain model
}
private toEntity(domain: Distribution): DistributionEntity {
// Map domain model to entity
}
}
Database Configuration
Location: apps/mass-payout/backend/.env
# PostgreSQL connection
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_USER=postgres
DATABASE_PASSWORD=postgres
DATABASE_NAME=mass_payout
TypeORM Configuration: apps/mass-payout/backend/ormconfig.ts
export default {
type: "postgres",
host: process.env.DATABASE_HOST,
port: parseInt(process.env.DATABASE_PORT, 10),
username: process.env.DATABASE_USER,
password: process.env.DATABASE_PASSWORD,
database: process.env.DATABASE_NAME,
entities: ["src/**/*.entity.ts"],
migrations: ["src/infrastructure/persistence/migrations/*.ts"],
synchronize: false, // Use migrations in production
logging: process.env.NODE_ENV === "development",
};
Best Practices
Use Migrations
Always use migrations for schema changes:
# 1. Modify entity
# 2. Generate migration
npm run typeorm:migration:generate -- -n AddPaymentTokenToAsset
# 3. Review generated migration
# 4. Run migration
npm run typeorm:migration:run
Transaction Management
Use transactions for multi-step operations:
async createDistributionWithHolders(
distribution: Distribution,
holders: Holder[],
): Promise<void> {
await this.dataSource.transaction(async manager => {
await manager.save(DistributionEntity, this.toEntity(distribution));
await manager.save(HolderEntity, holders.map(h => this.toHolderEntity(h)));
});
}
Query Optimization
Use indexes and efficient queries:
// Good: Use indexed columns
await this.repo.find({
where: { status: "PENDING" }, // Indexed
});
// Bad: Full table scan
await this.repo.find().then((all) => all.filter((d) => d.status === "PENDING"));
Pagination
Paginate large result sets:
async findAll(page: number, limit: number): Promise<[Distribution[], number]> {
const [entities, total] = await this.repo.findAndCount({
skip: (page - 1) * limit,
take: limit,
order: { createdAt: 'DESC' },
});
return [entities.map(e => this.toDomain(e)), total];
}
Troubleshooting
Database Connection Failed
Problem: Cannot connect to PostgreSQL
Solutions:
- Verify PostgreSQL is running:
docker-compose ps - Check credentials in
.env - Ensure database exists:
CREATE DATABASE mass_payout;
Migration Failed
Problem: Migration execution fails
Solutions:
- Check migration SQL for errors
- Verify database state matches expected
- Revert last migration:
npm run typeorm:migration:revert - Fix and regenerate migration
Slow Queries
Problem: Database queries are slow
Solutions:
- Add indexes to frequently queried columns
- Use pagination for large result sets
- Enable query logging to identify slow queries
- Use
EXPLAIN ANALYZEin PostgreSQL to optimize
Next Steps
- Architecture Overview - Backend architecture and layers
- Blockchain Integration - Event sync and scheduled processing
- Running & Testing - Development and testing