Skip to content

Upsert Operations

Upsert operations combine INSERT and UPDATE functionality, inserting new records or updating existing ones based on unique constraints or primary keys. This is particularly useful for data synchronization and handling duplicate entries gracefully.

Basic Upsert Operations

Single Record Upsert

typescript
import { createClient } from '@webcoded/pgrestify';

interface User {
  id: string;
  email: string;
  name: string;
  created_at: string;
  updated_at: string;
}

const client = createClient({ url: 'http://localhost:3000' });

// Basic upsert - will insert or update based on primary key
const user = await client
  .from<User>('users')
  .upsert({
    id: '123',
    email: 'john@example.com',
    name: 'John Doe'
  })
  .single()
  .execute();

console.log('Upserted user:', user.data);

Multiple Record Upsert

typescript
// Batch upsert multiple records
const users = await client
  .from<User>('users')
  .upsert([
    {
      id: '123',
      email: 'john@example.com',
      name: 'John Doe'
    },
    {
      id: '456',
      email: 'jane@example.com',
      name: 'Jane Smith'
    }
  ])
  .execute();

console.log('Upserted users:', users.data);

Conflict Resolution Strategies

Default Behavior

By default, PostgREST uses the primary key for conflict detection:

typescript
// Will conflict on 'id' field (primary key)
const result = await client
  .from<User>('users')
  .upsert({
    id: '123',
    email: 'newemail@example.com',
    name: 'Updated Name'
  })
  .execute();

Custom Conflict Columns

Use raw parameters to specify custom conflict resolution:

typescript
// Conflict on email field instead of primary key
const result = await client
  .from<User>('users')
  .upsert({
    email: 'john@example.com',
    name: 'John Updated'
  })
  .rawParams({
    'on_conflict': 'email'
  })
  .execute();

Multiple Conflict Columns

typescript
// Conflict on multiple columns
const result = await client
  .from<User>('users')
  .upsert({
    email: 'john@example.com',
    department: 'Engineering',
    name: 'John Doe'
  })
  .rawParams({
    'on_conflict': 'email,department'
  })
  .execute();

Advanced Upsert Patterns

Conditional Updates

typescript
interface Product {
  id: string;
  name: string;
  price: number;
  stock: number;
  updated_at: string;
}

// Only update if new price is different
const result = await client
  .from<Product>('products')
  .upsert({
    id: 'prod-123',
    name: 'Widget',
    price: 29.99,
    stock: 100
  })
  .rawParams({
    'on_conflict': 'id',
    'resolution': 'merge-duplicates'
  })
  .execute();

Partial Updates on Conflict

typescript
// Only update specific fields when conflict occurs
const result = await client
  .from<User>('users')
  .upsert({
    email: 'john@example.com',
    last_login: new Date().toISOString(),
    login_count: 1
  })
  .rawParams({
    'on_conflict': 'email'
  })
  .execute();

Ignore Conflicts

typescript
// Insert only, ignore if record exists
const result = await client
  .from<User>('users')
  .upsert({
    email: 'john@example.com',
    name: 'John Doe'
  })
  .rawParams({
    'on_conflict': 'email',
    'resolution': 'ignore'
  })
  .execute();

Bulk Upsert Operations

Large Dataset Processing

typescript
interface DataPoint {
  id: string;
  timestamp: string;
  value: number;
  sensor_id: string;
}

async function bulkUpsertSensorData(dataPoints: DataPoint[]) {
  const batchSize = 1000;
  const results: DataPoint[] = [];
  
  for (let i = 0; i < dataPoints.length; i += batchSize) {
    const batch = dataPoints.slice(i, i + batchSize);
    
    try {
      const result = await client
        .from<DataPoint>('sensor_data')
        .upsert(batch)
        .rawParams({
          'on_conflict': 'sensor_id,timestamp'
        })
        .execute();
      
      if (result.data) {
        results.push(...result.data);
      }
    } catch (error) {
      console.error(`Failed to upsert batch ${i / batchSize + 1}:`, error);
      throw error;
    }
  }
  
  return results;
}

// Usage
const sensorData = [
  // ... array of sensor data points
];

const upsertedData = await bulkUpsertSensorData(sensorData);
console.log(`Successfully upserted ${upsertedData.length} data points`);

Progress Tracking

typescript
async function bulkUpsertWithProgress<T>(
  tableName: string,
  records: Partial<T>[],
  options: {
    batchSize?: number;
    onProgress?: (processed: number, total: number) => void;
    conflictColumns?: string[];
  } = {}
) {
  const { 
    batchSize = 1000, 
    onProgress,
    conflictColumns = []
  } = options;
  
  const results: T[] = [];
  let processed = 0;
  
  for (let i = 0; i < records.length; i += batchSize) {
    const batch = records.slice(i, i + batchSize);
    
    const query = client.from<T>(tableName).upsert(batch);
    
    // Add conflict resolution if specified
    if (conflictColumns.length > 0) {
      query.rawParams({
        'on_conflict': conflictColumns.join(',')
      });
    }
    
    const result = await query.execute();
    
    if (result.data) {
      results.push(...result.data);
    }
    
    processed += batch.length;
    onProgress?.(processed, records.length);
  }
  
  return results;
}

// Usage with progress tracking
const result = await bulkUpsertWithProgress<User>(
  'users',
  userData,
  {
    batchSize: 500,
    conflictColumns: ['email'],
    onProgress: (processed, total) => {
      console.log(`Progress: ${processed}/${total} (${Math.round(processed/total * 100)}%)`);
    }
  }
);

Return Specifications

Returning Specific Columns

typescript
// Return only specific columns after upsert
const result = await client
  .from<User>('users')
  .select('id', 'email', 'updated_at')
  .upsert({
    email: 'john@example.com',
    name: 'John Updated'
  })
  .execute();

console.log('Updated fields:', result.data);

Returning All Data

typescript
// Return complete record after upsert
const result = await client
  .from<User>('users')
  .select('*')
  .upsert({
    id: '123',
    email: 'john@example.com',
    name: 'John Doe'
  })
  .execute();

Count Information

typescript
// Get count information about upserted records
const result = await client
  .from<User>('users')
  .upsert(userData)
  .execute({ count: 'exact' });

console.log(`Upserted ${result.count} records`);

Error Handling

Basic Error Handling

typescript
try {
  const result = await client
    .from<User>('users')
    .upsert({
      email: 'invalid-email', // Will fail validation
      name: 'Test User'
    })
    .execute();
    
  console.log('Success:', result.data);
} catch (error) {
  console.error('Upsert failed:', error);
}

Validation Errors

typescript
interface UpsertError {
  column: string;
  message: string;
  value: any;
}

async function safeUpsert<T>(
  tableName: string,
  data: Partial<T>[],
  conflictColumns: string[] = []
) {
  try {
    const query = client.from<T>(tableName).upsert(data);
    
    if (conflictColumns.length > 0) {
      query.rawParams({
        'on_conflict': conflictColumns.join(',')
      });
    }
    
    return await query.execute();
  } catch (error: any) {
    // Parse PostgreSQL constraint errors
    if (error.message?.includes('violates check constraint')) {
      const constraintMatch = error.message.match(/constraint "(.+?)"/);
      const constraint = constraintMatch?.[1] || 'unknown';
      
      throw new Error(`Validation failed: ${constraint} constraint violated`);
    }
    
    if (error.message?.includes('duplicate key')) {
      throw new Error('Duplicate key error: Record already exists');
    }
    
    throw error;
  }
}

Retry Logic

typescript
async function upsertWithRetry<T>(
  tableName: string,
  data: Partial<T>[],
  maxRetries: number = 3,
  conflictColumns: string[] = []
) {
  let lastError: Error | null = null;
  
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const query = client.from<T>(tableName).upsert(data);
      
      if (conflictColumns.length > 0) {
        query.rawParams({
          'on_conflict': conflictColumns.join(',')
        });
      }
      
      return await query.execute();
    } catch (error: any) {
      lastError = error;
      
      // Don't retry for validation errors
      if (error.message?.includes('violates check constraint')) {
        throw error;
      }
      
      if (attempt === maxRetries) {
        break;
      }
      
      // Exponential backoff
      const delay = Math.pow(2, attempt - 1) * 1000;
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
  
  throw lastError || new Error('Upsert failed after all retries');
}

Performance Considerations

Batch Size Optimization

typescript
// Optimal batch sizes for different scenarios
const BATCH_SIZES = {
  simple: 1000,      // Simple records with few columns
  complex: 500,      // Records with many columns or relations
  withBlobs: 100,    // Records containing binary data
  highConcurrency: 200 // When many clients are writing simultaneously
};

async function optimizedUpsert<T>(
  tableName: string,
  records: Partial<T>[],
  complexity: 'simple' | 'complex' | 'withBlobs' | 'highConcurrency' = 'simple'
) {
  const batchSize = BATCH_SIZES[complexity];
  // ... implementation with appropriate batch size
}

Index Optimization

typescript
// For frequent upserts, ensure proper indexes exist
// This would typically be done in your database migration:

// CREATE UNIQUE INDEX CONCURRENTLY idx_users_email 
// ON users(email) WHERE email IS NOT NULL;

// CREATE INDEX CONCURRENTLY idx_products_sku_category 
// ON products(sku, category) WHERE active = true;

// Your upsert operations will be much faster with proper indexes
const result = await client
  .from<Product>('products')
  .upsert(productData)
  .rawParams({
    'on_conflict': 'sku,category'
  })
  .execute();

Memory Management

typescript
async function memoryEfficientUpsert<T>(
  tableName: string,
  dataStream: AsyncIterable<T[]>,
  conflictColumns: string[] = []
) {
  let totalProcessed = 0;
  
  for await (const batch of dataStream) {
    const query = client.from<T>(tableName).upsert(batch);
    
    if (conflictColumns.length > 0) {
      query.rawParams({
        'on_conflict': conflictColumns.join(',')
      });
    }
    
    await query.execute();
    totalProcessed += batch.length;
    
    // Force garbage collection for large datasets
    if (totalProcessed % 10000 === 0) {
      if (global.gc) {
        global.gc();
      }
    }
  }
  
  return totalProcessed;
}

Best Practices

1. Choose Appropriate Conflict Columns

typescript
// Good: Use natural unique constraints
await client.from<User>('users')
  .upsert(userData)
  .rawParams({ 'on_conflict': 'email' })
  .execute();

// Avoid: Using non-unique columns for conflict detection
// This may not work as expected

2. Validate Data Before Upsert

typescript
function validateUserData(userData: Partial<User>[]): Partial<User>[] {
  return userData.filter(user => {
    if (!user.email || !isValidEmail(user.email)) {
      console.warn('Skipping invalid user:', user);
      return false;
    }
    return true;
  });
}

const validatedData = validateUserData(rawUserData);
await client.from<User>('users').upsert(validatedData).execute();
typescript
// For related data that must be consistent, use transactions
// (This would be implemented with PostgreSQL transactions)
async function upsertUserWithProfile(userData: User, profileData: Profile) {
  // Begin transaction
  const user = await client
    .from<User>('users')
    .upsert(userData)
    .single()
    .execute();
  
  if (user.data) {
    await client
      .from<Profile>('profiles')
      .upsert({
        ...profileData,
        user_id: user.data.id
      })
      .execute();
  }
  // Commit transaction
}

4. Monitor Performance

typescript
async function monitoredUpsert<T>(
  tableName: string,
  data: Partial<T>[],
  conflictColumns: string[] = []
) {
  const startTime = Date.now();
  
  try {
    const query = client.from<T>(tableName).upsert(data);
    
    if (conflictColumns.length > 0) {
      query.rawParams({
        'on_conflict': conflictColumns.join(',')
      });
    }
    
    const result = await query.execute();
    
    const duration = Date.now() - startTime;
    console.log(`Upserted ${data.length} records in ${duration}ms`);
    
    return result;
  } catch (error) {
    const duration = Date.now() - startTime;
    console.error(`Upsert failed after ${duration}ms:`, error);
    throw error;
  }
}

Common Use Cases

1. Data Synchronization

typescript
// Synchronizing external data with local database
async function syncExternalUsers(externalUsers: ExternalUser[]) {
  const localUsers = externalUsers.map(external => ({
    external_id: external.id,
    email: external.email,
    name: external.full_name,
    last_sync: new Date().toISOString()
  }));
  
  return await client
    .from<User>('users')
    .upsert(localUsers)
    .rawParams({
      'on_conflict': 'external_id'
    })
    .execute();
}

2. Cache Updates

typescript
// Updating cached computed values
async function updateUserStats(userId: string, newStats: UserStats) {
  return await client
    .from<UserStats>('user_stats')
    .upsert({
      user_id: userId,
      ...newStats,
      updated_at: new Date().toISOString()
    })
    .rawParams({
      'on_conflict': 'user_id'
    })
    .execute();
}

3. Configuration Management

typescript
// Managing application settings
async function updateSettings(settings: AppSetting[]) {
  return await client
    .from<AppSetting>('app_settings')
    .upsert(settings)
    .rawParams({
      'on_conflict': 'key'
    })
    .execute();
}

Summary

Upsert operations in PGRestify provide a powerful way to handle INSERT/UPDATE scenarios gracefully. Key points:

  • Flexibility: Support for single and batch operations with customizable conflict resolution
  • Performance: Optimized for large datasets with proper batching and error handling
  • Safety: Built-in error handling and validation support
  • Integration: Seamless integration with PostgREST's native upsert capabilities

Choose upsert operations when you need to handle data that may or may not already exist, such as data synchronization, cache updates, or configuration management scenarios.

Released under the MIT License.