Back
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

How-to: Snowflake Data Ingest & ETL with Ascend.io

This How-to will provide you with an overview of how to ingest data into Snowflake by building a production pipeline that automatically keeps data up to date, retries failures, and notifies upon any irrecoverable issues.

Ascend.io
Ascend.io
data-eng@ascend.io

This How-to will provide you with an overview of how to ingest data into Snowflake by building a production pipeline that automatically keeps data up to date, retries failures, and notifies upon any irrecoverable issues.

EXTRACT

Ascend provides pre-built read connectors for common blob stores (such as AWS S3, Azure Blob Store, and Google Cloud Storage), data warehouses (such as Redshift, BigQuery, and Snowflake), databases (such as Postgres, MySQL, and SQL Server), and APIs (such as Twitter, Facebook Ads, Google Analytics, and Zendesk). In the off-chance we don't have something you're looking for, we provide the ability to write a custom connector
as well.

In this How-to, we are going to connect to data in an S3 bucket.

  1. To get started, we create a new S3 Read Connector, name it, and provide some bucket details

  2. Once that is set, we can test our connection to see that we can find files, and ensure that everything is working as it should be.

  3. After the connection is tested, we will need to tell Ascend how to parse the data. All the common formats are already handled. If for some reason your data is not in a standard format, you're not stuck in Ascend; you can always pick “Parser Function”, and write the Python code necessary to deal with the data. (In the case of this how-to we select parquet data).

  4. After making your selection, Ascend automatically generates the schema.

  5. The last step to extract your data, is selecting the refresh schedule. In Ascend, a refresh schedule is simply when to check for new data. On the schedule, Ascend will automatically look to see if any files are new, updated, or deleted, and only propagate those detected changes through the pipeline. (This how-to selects an hourly refresh schedule, that refreshes five minutes after the hour).

  6. Hit “Create”, and we have created our read connector.

TRANSFORM

Ascend has automatically brought in the data. Once we start exploring, we can see the individual records, but we can also query the entirety of the dataset. This allows us to bring the full power of SQL to explore the data as we're ingesting it. Let's see what the first hundred records look like in this how-to:

It's looking pretty good, but one thing that's missing is a Ride_ID —
something to uniquely identify each ride. Let's generate one:

  1. We can start by adding a table alias, and concatting together the Pickup_latitude, Pickup_longitude, and the datetime of the pickup.

  2. We can also shaw this to make it look a bit more like an ID.

  3. Finally, name it, and give that a run!

  4. Now that ride_id has shown up and we are happy with our transformation, we can remove the limit, and hit “To Transform”

  5. Name it, and hit create

Ascend automatically starts running this transform component, but is not trying to redo the pipeline from the beginning.

Although we can run our transformations on Snowflake, Ascend transformations run on Spark. You get the full capabilities of also being able to code in Python or Scala, allowing for more advanced use cases, like machine learning.

LOAD DATA INTO SNOWFLAKE

The final step of this process is creating a write connector to load our data into Snowflake.

  1. Go ahead and click the component, create a new write connector, and choose Snowflake as the write connector location from the list of options.

  2. Fill in your account details and choose your credentials.

  3. We'll also select some intermediate store information-this is where the files get staged before they get copied to Snowflake-and provide some credential for that.

  4. The last piece is how we handle schema mismatches-if the table is Snowflake does not quite match the schema we have in Ascend. We could stop and display the error, but Ascend also gives us the option to adapt the schema changes, including even type mismatches, which allows us to very quickly go to our transforms and add columns, change column types, and have it immediately rematerialized in Snowflake without even having to hit run.

  5. Hit create, and Ascend has automatically started to write that data into Snowflake.

Ascend automatically loads the data into Snowflake incrementally by default, but Ascend will switch automatically to a full reload based on business logic changes.

def fetch_commit_history(
  repos: Union[str, List[str], pathlib.Path],
  timeout_seconds: int = 120,
  since_date: Optional[str] = None,
  from_ref: Optional[str] = None,
  to_ref: Optional[str] = None,
) -> Dict[str, List[Dict[str, Any]]]:
  """
  Fetches commit history from one or multiple GitHub repositories using the GitHub CLI.
  Works with both public and private repositories, provided the authenticated user has access.
  """
  # Check GitHub CLI is installed
  subprocess.run(
    ["gh", "--version"],
    capture_output=True,
    check=True,
    timeout=timeout_seconds,
  )

  # Process the repos input to handle various formats
  if isinstance(repos, pathlib.Path) or (isinstance(repos, str) and os.path.exists(repos) and repos.endswith(".json")):
    with open(repos, "r") as f:
      repos = json.load(f)
  elif isinstance(repos, str):
    repos = [repo.strip() for repo in repos.split(",")]

  results = {}
  for repo in repos:
    # Get repository info and default branch
    default_branch_cmd = subprocess.run(
      ["gh", "api", f"/repos/{repo}"],
      capture_output=True,
      text=True,
      check=True,
      timeout=timeout_seconds,
    )
    repo_info = json.loads(default_branch_cmd.stdout)
    default_branch = repo_info.get("default_branch", "main")

    # Build API query with parameters
    api_path = f"/repos/{repo}/commits"
    query_params = ["per_page=100"]

    if since_date:
      query_params.append(f"since={since_date}T00:00:00Z")

    target_ref = to_ref or default_branch
    query_params.append(f"sha={target_ref}")

    api_url = f"{api_path}?{'&'.join(query_params)}"

    # Fetch commits using GitHub CLI
    result = subprocess.run(
      ["gh", "api", api_url],
      capture_output=True,
      text=True,
      check=True,
      timeout=timeout_seconds,
    )

    commits = json.loads(result.stdout)
    results[repo] = commits

  return results

Key implementation details:

  • GitHub CLI integration: Uses the `gh` command-line tool for authenticated API access to both public and private repositories
  • Flexible input handling: Accepts single repos, comma-separated lists, or JSON files containing repository lists
  • Robust error handling: Validates GitHub CLI installation and repository access before attempting to fetch commits
  • Configurable date filtering: Supports both date-based and ref-based commit filtering

AI-Powered Summarization

def summarize_text(content: str, api_key: Optional[str] = None) -> str:
  """
  Summarize provided text content (e.g., commit messages) using OpenAI API.
  """
  if not content.strip():
    return "No commit data found to summarize"

  # Get API key from parameter or environment
  api_key = api_key or os.getenv("OPENAI_API_KEY")
  if not api_key:
    raise RuntimeError("OpenAI API key not found. Set the OPENAI_API_KEY environment variable.")

  client = OpenAI(api_key=api_key)
  response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
      {"role": "system", "content": SYSTEM_PROMPT},
      {"role": "user", "content": content},
    ],
    temperature=0.1,
    max_tokens=1000,
  )
  return response.choices[0].message.content.strip()

def summarize_commits(content: str, add_date_header: bool = True) -> str:
  """
  Summarize commit content and optionally add a date header.
  """
  summary_body = summarize_text(content)

  if add_date_header:
    # Add header with week date
    now_iso = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    monday = get_monday_of_week(now_iso)
    return f"## 🗓️ Week of {monday}\n\n{summary_body}"

  return summary_body

Our initial system prompt for consistent categorization:

You are a commit message organizer. Analyze the commit messages and organize them into a clear summary.

Group similar commits and format as bullet points under these categories:
- 🚀 Features
- ⚠️ Breaking changes
- 🌟 Improvements
- 🛠️ Bug fixes
- 📝 Additional changes
...
Within the Improvements section, do not simply say "Improved X" or "Fixed Y" or "Added Z" or "Removed W".
Instead, provide a more detailed and user-relevant description of the improvement or fix.

Convert technical commit messages to user-friendly descriptions and remove PR numbers and other technical IDs.
Focus on changes that would be relevant to users and skip internal technical changes.

Format specifications:
- Format entries as bullet points: "- [Feature description]"
- Use clear, user-friendly language while preserving technical terms
- For each item, convert technical commit messages to user-friendly descriptions:
   - "add line" → "New line functionality has been added"
   - "fix css overflow" → "CSS overflow issue has been fixed"
- Capitalize Ascend-specific terms in bullet points such as "Components"

Strictly exclude the following from your output:
- Any mentions of branches (main, master, develop, feature, etc.)
- Any mentions of AI rules such as "Added the ability to specify keywords for rules"
- Any references to branch integration or merges
- Any language about "added to branch" or "integrated into branch"
- Dependency upgrades and version bumps

Prompt engineering:

  • Structured categorization: Our prompt enforces specific emoji-categorized sections for consistent output formatting
  • User-focused translation: Explicitly instructs the AI to convert technical commits into user-friendly language
  • Content filtering: Automatically excludes dependency updates, test changes, and internal technical modifications
  • Low temperature setting: Uses 0.1 temperature for consistent, factual output rather than creative interpretation

Content Integration and File Management

def get_monday_of_week(date_str: str) -> str:
  """
  Get the Monday of the week containing the given date, except for Sunday which returns the next Monday.
  """
  date = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ")

  # For Sunday (weekday 6), get the following Monday
  if date.weekday() == 6:  # Sunday
    days_ahead = 1
  else:  # For all other days, get the Monday of the current week
    days_behind = date.weekday()
    days_ahead = -days_behind

  target_monday = date + timedelta(days=days_ahead)
  return target_monday.strftime("%Y-%m-%d")

File handling considerations:

  • Consistent date formatting: Automatically calculates the Monday of the current week for consistent release note headers
  • Encoding safety: Properly handles Unicode characters in commit messages from international contributors
  • Atomic file operations: Uses temporary files during processing to prevent corruption if the process is interrupted

GitHub Actions: Orchestrating the Automation

Our workflow ties everything together with robust automation that handles the complexities of CI/CD environments.

Workflow Triggers and Inputs

name: Weekly Release Notes Update
on:
  workflow_dispatch:
    inputs:
      year:
        description: 'Year (YYYY) of date to start collecting releases from'
        default: '2025'
      month:
        description: 'Month (MM) of date to start collecting releases from'
        default: '01'
      day:
        description: 'Day (DD) of date to start collecting releases from'
        default: '01'
      repo_filters:
        description: 'JSON string defining filters for specific repos'
        required: false
      timeout_seconds:
        description: 'Timeout in seconds for API calls'
        default: '45'

Flexible triggering options:

  • Manual dispatch with granular date control: Separate year, month, day inputs for precise date filtering
  • Repository-specific filtering: JSON configuration allows different filtering strategies per repository
  • Configurable timeouts: Adjustable API timeout settings for different network conditions

Secure Authentication Flow

- uses: actions/create-github-app-token@v2
  id: app-token
  with:
    app-id: <YOUR-APP-ID>
    private-key: ${{ secrets.GHA_DOCS_PRIVATE_KEY }}
    owner: ascend-io
    repositories: ascend-docs,ascend-core,ascend-ui,ascend-backend

Security best practices:

  • GitHub App with specific repository access: Explicitly lists only the repositories that need access
  • Scoped permissions: App configured with minimal necessary permissions for the specific repositories
  • Secret management: Private key stored securely in GitHub Secrets

Repository Configuration Processing

- name: Prepare repository filter configuration
  run: |
    CONFIG_FILE=$(mktemp)
    echo "{}" > "$CONFIG_FILE"

    if [ -n "${{ github.event.inputs.repo_filters }}" ]; then
      echo '${{ github.event.inputs.repo_filters }}' > "$CONFIG_FILE"
    else
      DATE_STRING="${{ github.event.inputs.year }}-${{ github.event.inputs.month }}-${{ github.event.inputs.day }}"
      jq -r '.[]' bin/release_notes/input_repos.json | while read -r REPO; do
        FILTER="since:$DATE_STRING"
        jq --arg repo "$REPO" --arg filter "$FILTER" '. + {($repo): $filter}' "$CONFIG_FILE" > "${CONFIG_FILE}.tmp" && mv "${CONFIG_FILE}.tmp" "$CONFIG_FILE"
      done
    fi

    CONFIG_JSON=$(cat "$CONFIG_FILE")
    echo "config_json<<EOF" >> $GITHUB_OUTPUT
    echo "$CONFIG_JSON" >> $GITHUB_OUTPUT
    echo "EOF" >> $GITHUB_OUTPUT

Data Processing and File Management

- name: Generate release notes
  env:
    OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
    GITHUB_TOKEN: ${{ steps.app-token.outputs.token }}
  run: |
    CONFIG_JSON='${{ steps.repo_config.outputs.config_json }}'
    CONFIG_FILE=$(mktemp)
    echo "$CONFIG_JSON" > "$CONFIG_FILE"

    RAW_OUTPUT=$(python bin/release_notes/generate_release_notes.py \
      --repo-config-string "$(cat "$CONFIG_FILE")" \
      --timeout "${{ github.event.inputs.timeout_seconds }}")

    # Split summary and commits using delimiter
    SUMMARY=$(echo "$RAW_OUTPUT" | sed -n '1,/^### END SUMMARY ###$/p' | sed '$d')
    MONDAY_DATE=$(echo "$SUMMARY" | head -n1 | grep -oE "[0-9]{4}-[0-9]{2}-[0-9]{2}")

    echo "monday_date=$MONDAY_DATE" >> $GITHUB_OUTPUT
    echo 'summary<<EOF' >> $GITHUB_OUTPUT
    echo "$SUMMARY" >> $GITHUB_OUTPUT
    echo 'EOF' >> $GITHUB_OUTPUT

Key implementation lessons:

  • Temporary file strategy: We learned the hard way that GitHub Actions environments can lose data between steps. Writing to temporary files solved reliability issues where data would appear blank in subsequent steps.
  • Complex JSON handling: Uses `jq` for safe JSON manipulation and temporary files to avoid shell quoting issues with complex JSON strings
  • Output parsing: Logic to split AI-generated summaries from raw commit data using delimiter markers
  • Robust error handling: `set -euo pipefail` ensures the script fails fast on any error, preventing silent failures

File Integration and Pull Request Creation

- name: Update whats-new.mdx with release notes
  run: |
    FILE="website/docs/whats-new.mdx"
    BRANCH_NAME="notes-${{ steps.generate_notes.outputs.monday_date }}"
    git branch $BRANCH_NAME main
    git switch $BRANCH_NAME

    TEMP_SUMMARY_FILE=$(mktemp)
    echo '${{ steps.generate_notes.outputs.summary }}' > "$TEMP_SUMMARY_FILE"
    cat "$TEMP_SUMMARY_FILE" "$FILE" > "${FILE}.new"
    mv "${FILE}.new" "$FILE"
    rm -f "$TEMP_SUMMARY_FILE"

File management features:

  • Atomic file operations: Uses temporary files and atomic moves to prevent file corruption
  • Branch management: Creates date-based branches for organized PR tracking
  • Content preservation: Carefully prepends new content while preserving existing documentation structure

Lessons Learned and Best Practices

Building this pipeline taught us valuable lessons about documentation automation that go beyond the technical implementation.

Technical Insights

File persistence matters in CI/CD environments. GitHub Actions environments can be unpredictable—always write important data to files rather than relying on environment variables or memory. We learned this the hard way when release notes would mysteriously appear blank in PRs.

API reliability requires defensive programming. Build retry logic and fallbacks for external API calls (OpenAI, GitHub). Network issues and rate limits are inevitable, especially as your usage scales.

Prompt engineering is crucial for consistent output. Spend time crafting prompts that consistently produce the format and tone you want. Small changes in wording can dramatically affect AI output quality and consistency.

Human review is essential, even with AI generation. Having team members review PRs catches edge cases, ensures quality, and builds confidence in the automated system. The goal isn't to eliminate human oversight—it's to make it more efficient and focused.

Historical tracking and product evolution insights. Automated generation creates a consistent record of product evolution that's valuable for retrospectives, planning, and onboarding new team members.

Results and Impact

The automation has fundamentally transformed our release process and team dynamics:

Quantifiable Improvements

Dramatic time savings: Reduced release note creation from 2-3 hours of writing time to 15 minutes of review time. That's a 90% reduction in effort while improving quality and consistency.

Perfect consistency: Every release now has properly formatted, comprehensive notes. No more missed releases or inconsistent formatting across different team members.

Increased frequency: We can now generate release notes weekly, providing users with more timely updates about product improvements.

Complete coverage: Captures changes across all repositories without manual coordination, eliminating the risk of missing important updates.

Next Steps and Future Enhancements

We're continuously improving the pipeline based on team feedback and evolving needs:

Immediate Roadmap

Slack integration: Building a Slackbot to automatically share release notes with our community channels, extending the reach beyond just documentation updates.

Repository tracing: Categorize the raw commits by repository and add links so it's easy to (literally) double-click into each PR for additional context.

Future Possibilities

Multi-language support: Generating release notes in different languages for global audiences as we expand internationally.

Ready to automate your own release notes? Start with the requirements above and build incrementally. Begin with a single repository, get the basic workflow running, then expand to multiple repos and add advanced features. Your future self (and your team) will thank you for eliminating this manual drudgery and creating a more consistent, professional release process.

The investment in automation pays dividends immediately—not just in time saved, but in the improved quality and consistency of your user communication. In a time where software moves fast, automated release notes ensure your documentation keeps pace.

Try it out. Your future self will thank you :)