1. Integrate
  2. Background Jobs

Integrate

Background Jobs

Setting up background async processes, i.e email notifications for new user signups using Supabase queue and cron with a Sveltekit endpoint

  1. Enable the Supabase Integrations

    • Go to your Supabase project dashboard
    • Navigate to Database -> Extensions
    • Enable the pg_cron extension (required to schedule jobs)
    • Enable the pg_net extension (required for making HTTP requests)
    • Enable the pg_queue extension (required for background message processing)
  2. Configure Queue Settings

    • Go to Queue Settings in your Supabase dashboard
    • Enable "Expose Queues via PostgREST" to allow interaction with queues using the Supabase client SDK
    • Navigate to the Queue Dashboard
    • Click "Create a Queue" and provide a name (e.g., "email_signups")
    • Select "Basic" for the queue type
    • Click "Create Queue"
  3. Modify Existing User Handler Function

    In the SQL Editor update your existing user handler function to include queueing:

            create or replace function public.handle_new_user()
    returns trigger as $$
    begin
      -- Create profile
      insert into public.profiles (id, full_name, avatar_url,email)
      values (new.id, new.email,new.raw_user_meta_data->>'full_name', new.raw_user_meta_data->>'avatar_url');
      
      -- Queue email notification using Supabase Queue
      perform pgmq_public.send(
        'email_signups',
        jsonb_build_object(
          'type', 'new_user',
          'email', new.email,
          'timestamp', now()
        )
      );
      
      return new;
    end;
    $$ language plpgsql security definer;
    
    -- The trigger is already created, no need to create it again
    
          
  4. Create API Endpoint

    Create a new API endpoint to process the queue:

    /src/routes/api/cron/email-signups/+server.ts
            import { PRIVATE_MAILGUN_API_KEY, PRIVATE_MAILGUN_DOMAIN } from '$env/static/private';
    import { supabaseAdmin } from '$lib/server/supabase/supabase-admin';
    import { json } from '@sveltejs/kit';
    
    export async function POST({ request }) {
        console.log('📬 Attempting to read messages from email_signups queue...');
        
        let messages = [];
        let queueError = null;
        
        try {
            // Read messages from the queue
            const result = await supabaseAdmin
                .schema('pgmq_public')
                .rpc('read', {
                    queue_name: 'email_signups',
                    sleep_seconds: 0,
                    n: 10
                });
                
            messages = result.data || [];
            queueError = result.error;
            
            console.log(`Found ${messages.length} messages in the queue`);
        } catch (err) {
            console.error('Exception accessing queue:', err);
            queueError = err;
        }
    
        if (queueError) {
            console.error('Error reading from queue:', queueError);
            return json({ success: false, error: 'Failed to read from queue', details: queueError }, { status: 500 });
        }
    
        // If no messages found, return early
        if (!messages || messages.length === 0) {
            return json({ message: 'No pending messages' }, { status: 200 });
        }
    
        for (const msg of messages) {
            try {
                console.log('Processing message:', JSON.stringify(msg, null, 2));
                const { type, email, timestamp } = msg.message;
                
                if (type === 'new_user') {
                    // Send email using Mailgun
                    const response = await fetch(`https://api.mailgun.net/v3/${PRIVATE_MAILGUN_DOMAIN}/messages`, {
                        method: 'POST',
                        headers: {
                            'Content-Type': 'application/x-www-form-urlencoded',
                            'Authorization': `Basic ${btoa(`api:${PRIVATE_MAILGUN_API_KEY}`)}`
                        },
                        body: new URLSearchParams({
                            from: `Demo Svelteship <noreply@${PRIVATE_MAILGUN_DOMAIN}>`,
                            to: "[email protected]",
                            subject: 'New User Sign Up',
                            text: `New user signed up to demo svelteship: ${email} at ${timestamp}`
                        })
                    });
    
                    if (!response.ok) {
                        const errorText = await response.text();
                        console.error('Failed to send email for message:', msg.msg_id, 'Error:', errorText);
                    } else {
                        // Delete the message from queue only after successful publish
                        await supabaseAdmin
                            .schema('pgmq_public')
                            .rpc('delete', {
                                queue_name: 'email_signups',
                                message_id: msg.msg_id
                            });
                    }
                }
            } catch (error) {
                console.error('Error processing message:', error);
            }
        }
    
        return json({ success: true }, { status: 200 });
    
          
  5. Setup Cron Job for Queue Processing

    Create a cron job and execute it to call your API endpoint every 5 minutes:

            SELECT cron.schedule(
        'email-signups',    -- job name
        '*/5 * * * *',     -- run every 5 minutes
        $$ SELECT net.http_post(
            url := 'https://your-domain-name/api/cron/email_signups',
            headers := jsonb_build_object(
                'Authorization', 
                'Bearer YOUR-SUPABASE-ANON-KEY'
            ), 
            timeout_milliseconds := 3000
        )$$
    );
    
          
  6. Environment Variables

    Ensure you updated these variables in your .env file:

            PRIVATE_MAILGUN_API_KEY=your_mailgun_api_key
    PRIVATE_MAILGUN_DOMAIN=your_mailgun_domain
    
          
  7. TIP

    You can view any unprocessed queue messages in the Supabase dashboard under Integrations -> Queues.