Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@ describe('envValidationSchema', () => {
expect(value.PRISMA_DATABASE_URL).toContain('/teable');
});

it('accepts split meta/data env without the legacy alias', () => {
it('accepts split meta env without the legacy alias', () => {
const { error, value } = envValidationSchema.validate(
createEnv({
PRISMA_META_DATABASE_URL:
'postgresql://teable:teable@127.0.0.1:5432/teable-meta?schema=public',
PRISMA_DATA_DATABASE_URL:
'postgresql://teable:teable@127.0.0.1:5432/teable-data?schema=public',
})
);

expect(error).toBeUndefined();
expect(value.PRISMA_META_DATABASE_URL).toContain('/teable-meta');
expect(value.PRISMA_DATA_DATABASE_URL).toContain('/teable-data');
});

it('accepts DATABASE_URL as the last-resort meta fallback', () => {
Expand Down
1 change: 0 additions & 1 deletion apps/nestjs-backend/src/configs/env.validation.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ export const envValidationSchema = Joi.object({
// database_url
PRISMA_DATABASE_URL: Joi.string(),
PRISMA_META_DATABASE_URL: Joi.string(),
PRISMA_DATA_DATABASE_URL: Joi.string(),
DATABASE_URL: Joi.string(),

STORAGE_PREFIX: Joi.string().uri().optional(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import type { ISelectFieldOptions } from '@teable/core';
import { FieldType, generateRecordHistoryId } from '@teable/core';
import { DataPrismaService } from '@teable/db-data-prisma';
import type { Field } from '@teable/db-main-prisma';
import { Knex } from 'knex';
import { isEqual, isObject, isString } from 'lodash';
import { InjectModel } from 'nest-knexjs';
import { BaseConfig, IBaseConfig } from '../../configs/base.config';
import { DataLoaderService } from '../../features/data-loader/data-loader.service';
import { rawField2FieldObj } from '../../features/field/model/factory';
import { DATA_KNEX } from '../../global/knex/knex.module';
import { DatabaseRouter } from '../../global/database-router.service';
import { EventEmitterService } from '../event-emitter.service';
import { Events, RecordUpdateEvent } from '../events';

Expand All @@ -21,10 +18,9 @@ const SELECT_FIELD_TYPE_SET = new Set([FieldType.SingleSelect, FieldType.Multipl
@Injectable()
export class RecordHistoryListener {
constructor(
private readonly dataPrismaService: DataPrismaService,
private readonly databaseRouter: DatabaseRouter,
private readonly eventEmitterService: EventEmitterService,
@BaseConfig() private readonly baseConfig: IBaseConfig,
@InjectModel(DATA_KNEX) private readonly knex: Knex,
private readonly dataLoaderService: DataLoaderService
) {}

Expand Down Expand Up @@ -129,9 +125,16 @@ export class RecordHistoryListener {
});

if (recordHistoryList.length) {
const query = this.knex.insert(recordHistoryList).into('record_history').toQuery();

await this.dataPrismaService.txClient().$executeRawUnsafe(query);
const dataKnex = await this.databaseRouter.dataKnexForTable(tableId);
const dataDbUrl = await this.databaseRouter.getDataDatabaseUrlForTable(tableId);
const dataDbInternalSchema = new URL(dataDbUrl).searchParams.get('schema') || 'public';
const query = dataKnex
.withSchema(dataDbInternalSchema)
.insert(recordHistoryList)
.into('record_history')
.toQuery();

await this.databaseRouter.executeDataPrismaForTable(tableId, query);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,55 @@ describe('AggregateService', () => {
it('should be defined', () => {
expect(service).toBeDefined();
});

it('should execute row count SQL through the table scoped data database client', async () => {
const queryRaw = vi.fn().mockResolvedValue([{ count: 7 }]);
const databaseRouter = {
queryDataPrismaForTable: queryRaw,
};
const recordPermissionService = {
wrapView: vi.fn().mockResolvedValue({
builder: {},
}),
};
const recordQueryBuilder = {
createRecordAggregateBuilder: vi.fn().mockResolvedValue({
qb: {
toQuery: () => 'SELECT COUNT(*)::int AS count FROM "bse1"."tbl1"',
},
alias: 'tbl1',
selectionMap: {},
}),
};
const service = new AggregationService(
{} as never,
{} as never,
{} as never,
databaseRouter as never,
{ queryBuilder: vi.fn().mockReturnValue({}) } as never,
{} as never,
{} as never,
{ get: vi.fn().mockReturnValue('usr1') } as never,
recordPermissionService as never,
recordQueryBuilder as never
);

const serviceInternals = service as unknown as {
fetchStatisticsParams: () => Promise<unknown>;
getDbTableName: () => Promise<string>;
};
vi.spyOn(serviceInternals, 'fetchStatisticsParams').mockResolvedValue({
statisticsData: {},
fieldInstanceMap: {},
});
vi.spyOn(serviceInternals, 'getDbTableName').mockResolvedValue('bse1.tbl1');

const result = await service.performRowCount('tbl1', { viewId: 'viw1' });

expect(result.rowCount).toBe(7);
expect(queryRaw).toHaveBeenCalledWith(
'tbl1',
'SELECT COUNT(*)::int AS count FROM "bse1"."tbl1"'
);
});
});
52 changes: 35 additions & 17 deletions apps/nestjs-backend/src/features/aggregation/aggregation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
ViewType,
} from '@teable/core';
import type { IGridColumnMeta, IFilter, IGroup, ISortItem } from '@teable/core';
import { DataPrismaService } from '@teable/db-data-prisma';
import type { Prisma } from '@teable/db-main-prisma';
import { PrismaService } from '@teable/db-main-prisma';
import { StatisticsFunc } from '@teable/openapi';
Expand Down Expand Up @@ -41,6 +40,8 @@ import { IThresholdConfig, ThresholdConfig } from '../../configs/threshold.confi
import { CustomHttpException } from '../../custom.exception';
import { InjectDbProvider } from '../../db-provider/db.provider';
import { IDbProvider } from '../../db-provider/db.provider.interface';
import type { IDataPrismaQueryExecutor } from '../../global/database-router.service';
import { DatabaseRouter } from '../../global/database-router.service';
import { DATA_KNEX } from '../../global/knex/knex.module';
import type { IClsStore } from '../../types/cls';
import { convertValueToStringify, string2Hash } from '../../utils';
Expand All @@ -65,6 +66,7 @@ type IStatisticsData = {
// so this is undefined unless the caller is paginating.
sort?: ISortItem[];
};

/**
* Version 2 implementation of the aggregation service
* This is a placeholder implementation that will be developed in the future
Expand All @@ -77,14 +79,30 @@ export class AggregationService implements IAggregationService {
private readonly recordService: RecordService,
private readonly tableIndexService: TableIndexService,
private readonly prisma: PrismaService,
private readonly dataPrismaService: DataPrismaService,
private readonly databaseRouter: DatabaseRouter,
@InjectModel(DATA_KNEX) private readonly knex: Knex,
@InjectDbProvider() private readonly dbProvider: IDbProvider,
@ThresholdConfig() private readonly thresholdConfig: IThresholdConfig,
private readonly cls: ClsService<IClsStore>,
private readonly recordPermissionService: RecordPermissionService,
@InjectRecordQueryBuilder() private readonly recordQueryBuilder: IRecordQueryBuilder
) {}

private async queryDataPrisma<T>(
tableId: string,
query: string,
...values: unknown[]
): Promise<T> {
return await this.databaseRouter.queryDataPrismaForTable<T>(tableId, query, ...values);
}

private async withDataPrismaTransaction<T>(
tableId: string,
fn: (prisma: IDataPrismaQueryExecutor) => Promise<T>
): Promise<T> {
return await this.databaseRouter.dataPrismaTransactionForTable(tableId, fn);
}

/**
* Perform aggregation operations on table data
* @param params - Parameters for aggregation including tableId, field IDs, view settings, and search
Expand Down Expand Up @@ -127,7 +145,7 @@ export class AggregationService implements IAggregationService {
const isPaginated = take !== undefined;
const baseSort = isPaginated ? [...(groupBy ?? []), ...(resolvedSort ?? [])] : undefined;
const defaultOrderField = isPaginated
? await this.recordService.getBasicOrderIndexField(dbTableName, withView?.viewId)
? await this.recordService.getBasicOrderIndexField(tableId, dbTableName, withView?.viewId)
: undefined;

const rawAggregationData = await this.handleAggregation({
Expand Down Expand Up @@ -325,7 +343,7 @@ export class AggregationService implements IAggregationService {

const aggSql = qb.toQuery();
this.logger.debug('handleAggregation aggSql: %s', aggSql);
return this.dataPrismaService.$queryRawUnsafe<{ [field: string]: unknown }[]>(aggSql);
return this.queryDataPrisma<{ [field: string]: unknown }[]>(tableId, aggSql);
}
/**
* Perform grouped aggregation operations
Expand Down Expand Up @@ -621,7 +639,7 @@ export class AggregationService implements IAggregationService {
const rawQuery = qb.toQuery();

this.logger.debug('handleRowCount raw query: %s', rawQuery);
return await this.dataPrismaService.$queryRawUnsafe<{ count: number }[]>(rawQuery);
return await this.queryDataPrisma<{ count: number }[]>(tableId, rawQuery);
}

private async fetchStatisticsParams(params: {
Expand Down Expand Up @@ -874,7 +892,7 @@ export class AggregationService implements IAggregationService {

const sql = queryBuilder.toQuery();

const result = await this.dataPrismaService.$queryRawUnsafe<{ count: number }[] | null>(sql);
const result = await this.queryDataPrisma<{ count: number }[] | null>(tableId, sql);

return {
count: result ? Number(result[0]?.count) : 0,
Expand Down Expand Up @@ -941,7 +959,11 @@ export class AggregationService implements IAggregationService {
Object.values(fieldInstanceMap).map((f) => [f.id, `"${f.dbFieldName}"`])
);

const basicSortIndex = await this.recordService.getBasicOrderIndexField(dbTableName, viewId);
const basicSortIndex = await this.recordService.getBasicOrderIndexField(
tableId,
dbTableName,
viewId
);

const filterQuery = (qb: Knex.QueryBuilder) => {
this.dbProvider
Expand Down Expand Up @@ -993,7 +1015,7 @@ export class AggregationService implements IAggregationService {
this.logger.debug('getRecordIndexBySearchOrder sql: %s', sql);

try {
return await this.dataPrismaService.$tx(async (prisma) => {
return await this.withDataPrismaTransaction(tableId, async (prisma) => {
const result = await prisma.$queryRawUnsafe<{ __id: string; fieldId: string }[]>(sql);

// no result found
Expand Down Expand Up @@ -1035,9 +1057,7 @@ export class AggregationService implements IAggregationService {
this.logger.debug('getRecordIndexBySearchOrder indexSql: %s', indexSql);
const indexResult =
// eslint-disable-next-line @typescript-eslint/naming-convention
await this.dataPrismaService.$queryRawUnsafe<{ row_num: number; __id: string }[]>(
indexSql
);
await prisma.$queryRawUnsafe<{ row_num: number; __id: string }[]>(indexSql);

if (indexResult?.length === 0) {
return null;
Expand Down Expand Up @@ -1102,7 +1122,7 @@ export class AggregationService implements IAggregationService {
this.logger.debug('getRecordIndex sql: %s', sql);

// eslint-disable-next-line @typescript-eslint/naming-convention
const result = await this.dataPrismaService.$queryRawUnsafe<{ row_num: number }[]>(sql);
const result = await this.queryDataPrisma<{ row_num: number }[]>(tableId, sql);

if (!result?.length) {
return null;
Expand Down Expand Up @@ -1227,11 +1247,9 @@ export class AggregationService implements IAggregationService {
endField: endField as DateFieldDto,
dbTableName: viewCte || dbTableName,
});
const result = await this.dataPrismaService
.txClient()
.$queryRawUnsafe<
{ date: Date | string; count: number; ids: string[] | string }[]
>(queryBuilder.toQuery());
const result = await this.queryDataPrisma<
{ date: Date | string; count: number; ids: string[] | string }[]
>(tableId, queryBuilder.toQuery());

const countMap = result.reduce(
(map, item) => {
Expand Down
Loading
Loading