379 lines
12 KiB
Bash
Executable File
379 lines
12 KiB
Bash
Executable File
#!/bin/bash
|
|
|
|
# InfluxDB CSV Data Import Script
|
|
# Usage: ./import-csv-data.sh
|
|
|
|
set -e # Exit on any error
|
|
|
|
# Colors for output
|
|
RED='\033[0;31m'
|
|
GREEN='\033[0;32m'
|
|
YELLOW='\033[1;33m'
|
|
BLUE='\033[0;34m'
|
|
NC='\033[0m' # No Color
|
|
|
|
# Get the directory where the script is located
|
|
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
|
|
PROJECT_ROOT="$(dirname "$(dirname "$SCRIPT_DIR")")"
|
|
SRC_DIR="$PROJECT_ROOT/src"
|
|
EXPORTS_BASE_DIR="$SCRIPT_DIR/exports"
|
|
|
|
# Logging functions
|
|
log() {
|
|
echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] $1${NC}"
|
|
}
|
|
|
|
warn() {
|
|
echo -e "${YELLOW}[$(date +'%Y-%m-%d %H:%M:%S')] WARNING: $1${NC}"
|
|
}
|
|
|
|
error() {
|
|
echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] ERROR: $1${NC}"
|
|
exit 1
|
|
}
|
|
|
|
info() {
|
|
echo -e "${BLUE}[$(date +'%Y-%m-%d %H:%M:%S')] INFO: $1${NC}"
|
|
}
|
|
|
|
# Check if influx CLI is installed
|
|
command -v influx >/dev/null 2>&1 || error "InfluxDB CLI is not installed. Please install it first: brew install influxdb-cli"
|
|
|
|
# Check if jq is installed for JSON parsing
|
|
if ! command -v jq >/dev/null 2>&1; then
|
|
warn "jq is not installed. Installing it for JSON parsing..."
|
|
if command -v brew >/dev/null 2>&1; then
|
|
brew install jq || error "Failed to install jq. Please install it manually: brew install jq"
|
|
else
|
|
error "jq is not installed and brew is not available. Please install jq manually."
|
|
fi
|
|
fi
|
|
|
|
echo ""
|
|
echo "============================================"
|
|
echo " InfluxDB CSV Data Import"
|
|
echo "============================================"
|
|
echo ""
|
|
|
|
# Check if exports directory exists
|
|
if [ ! -d "$EXPORTS_BASE_DIR" ]; then
|
|
error "Exports directory not found: $EXPORTS_BASE_DIR"
|
|
fi
|
|
|
|
# List available source environments
|
|
echo "Available export source environments:"
|
|
ENVIRONMENTS=($(ls -d "$EXPORTS_BASE_DIR"/*/ 2>/dev/null | xargs -n 1 basename))
|
|
|
|
if [ ${#ENVIRONMENTS[@]} -eq 0 ]; then
|
|
error "No export environments found in: $EXPORTS_BASE_DIR"
|
|
fi
|
|
|
|
for i in "${!ENVIRONMENTS[@]}"; do
|
|
echo "$((i+1))) ${ENVIRONMENTS[$i]}"
|
|
done
|
|
echo ""
|
|
read -p "Select source environment (1-${#ENVIRONMENTS[@]}): " ENV_CHOICE
|
|
|
|
if [ "$ENV_CHOICE" -lt 1 ] || [ "$ENV_CHOICE" -gt ${#ENVIRONMENTS[@]} ]; then
|
|
error "Invalid choice"
|
|
fi
|
|
|
|
SOURCE_ENV="${ENVIRONMENTS[$((ENV_CHOICE-1))]}"
|
|
ENV_EXPORT_DIR="$EXPORTS_BASE_DIR/$SOURCE_ENV"
|
|
|
|
log "Selected source environment: $SOURCE_ENV"
|
|
|
|
# List available export timestamps
|
|
echo ""
|
|
echo "Available exports for $SOURCE_ENV:"
|
|
EXPORTS=($(ls -d "$ENV_EXPORT_DIR"/*/ 2>/dev/null | xargs -n 1 basename | sort -r))
|
|
|
|
if [ ${#EXPORTS[@]} -eq 0 ]; then
|
|
error "No exports found for environment: $SOURCE_ENV"
|
|
fi
|
|
|
|
for i in "${!EXPORTS[@]}"; do
|
|
EXPORT_PATH="$ENV_EXPORT_DIR/${EXPORTS[$i]}"
|
|
METADATA_FILE="$EXPORT_PATH/export-metadata.txt"
|
|
|
|
if [ -f "$METADATA_FILE" ]; then
|
|
EXPORT_SIZE=$(grep "Export Size:" "$METADATA_FILE" | cut -d: -f2 | xargs)
|
|
DATA_POINTS=$(grep "Data Points:" "$METADATA_FILE" | cut -d: -f2 | xargs)
|
|
EXPORT_DATE=$(grep "Date:" "$METADATA_FILE" | cut -d: -f2- | xargs)
|
|
echo "$((i+1))) ${EXPORTS[$i]} - $EXPORT_DATE ($EXPORT_SIZE, $DATA_POINTS points)"
|
|
else
|
|
echo "$((i+1))) ${EXPORTS[$i]}"
|
|
fi
|
|
done
|
|
echo ""
|
|
read -p "Select export to import (1-${#EXPORTS[@]}): " EXPORT_CHOICE
|
|
|
|
if [ "$EXPORT_CHOICE" -lt 1 ] || [ "$EXPORT_CHOICE" -gt ${#EXPORTS[@]} ]; then
|
|
error "Invalid choice"
|
|
fi
|
|
|
|
SELECTED_EXPORT="${EXPORTS[$((EXPORT_CHOICE-1))]}"
|
|
IMPORT_FROM_DIR="$ENV_EXPORT_DIR/$SELECTED_EXPORT"
|
|
|
|
log "Selected export: $SELECTED_EXPORT"
|
|
info "Export location: $IMPORT_FROM_DIR"
|
|
|
|
# Find CSV file
|
|
CSV_FILE=$(find "$IMPORT_FROM_DIR" -name "*.csv" | head -1)
|
|
|
|
if [ ! -f "$CSV_FILE" ]; then
|
|
error "No CSV file found in: $IMPORT_FROM_DIR"
|
|
fi
|
|
|
|
CSV_SIZE=$(du -sh "$CSV_FILE" | cut -f1)
|
|
info "CSV file: $(basename "$CSV_FILE") ($CSV_SIZE)"
|
|
|
|
# Select target environment for import
|
|
echo ""
|
|
echo "Select TARGET environment for import:"
|
|
echo "1) SandboxLocal"
|
|
echo "2) ProductionLocal"
|
|
echo ""
|
|
read -p "Enter your choice (1 or 2): " TARGET_ENV_CHOICE
|
|
|
|
case $TARGET_ENV_CHOICE in
|
|
1)
|
|
TARGET_ENVIRONMENT="SandboxLocal"
|
|
APPSETTINGS_FILE="$SRC_DIR/Managing.Api/appsettings.SandboxLocal.json"
|
|
;;
|
|
2)
|
|
TARGET_ENVIRONMENT="ProductionLocal"
|
|
APPSETTINGS_FILE="$SRC_DIR/Managing.Api/appsettings.ProductionLocal.json"
|
|
;;
|
|
*)
|
|
error "Invalid choice. Please run the script again and select 1 or 2."
|
|
;;
|
|
esac
|
|
|
|
log "Target environment: $TARGET_ENVIRONMENT"
|
|
|
|
# Check if appsettings file exists
|
|
if [ ! -f "$APPSETTINGS_FILE" ]; then
|
|
error "Configuration file not found: $APPSETTINGS_FILE"
|
|
fi
|
|
|
|
log "Reading configuration from: $APPSETTINGS_FILE"
|
|
|
|
# Parse InfluxDB settings from JSON
|
|
INFLUX_URL=$(jq -r '.InfluxDb.Url' "$APPSETTINGS_FILE")
|
|
INFLUX_ORG=$(jq -r '.InfluxDb.Organization' "$APPSETTINGS_FILE")
|
|
INFLUX_TOKEN=$(jq -r '.InfluxDb.Token' "$APPSETTINGS_FILE")
|
|
|
|
# Validate parsed values
|
|
if [ "$INFLUX_URL" = "null" ] || [ -z "$INFLUX_URL" ]; then
|
|
error "Failed to parse InfluxDb.Url from configuration file"
|
|
fi
|
|
|
|
if [ "$INFLUX_ORG" = "null" ] || [ -z "$INFLUX_ORG" ]; then
|
|
error "Failed to parse InfluxDb.Organization from configuration file"
|
|
fi
|
|
|
|
if [ "$INFLUX_TOKEN" = "null" ] || [ -z "$INFLUX_TOKEN" ]; then
|
|
error "Failed to parse InfluxDb.Token from configuration file"
|
|
fi
|
|
|
|
info "Target InfluxDB URL: $INFLUX_URL"
|
|
info "Organization: $INFLUX_ORG"
|
|
|
|
# Get bucket name
|
|
BUCKET_NAME="prices-bucket"
|
|
|
|
# Check if bucket exists
|
|
info "Checking if bucket '$BUCKET_NAME' exists..."
|
|
if influx bucket list --host "$INFLUX_URL" --org "$INFLUX_ORG" --token "$INFLUX_TOKEN" --name "$BUCKET_NAME" &>/dev/null; then
|
|
log "✅ Bucket '$BUCKET_NAME' exists"
|
|
else
|
|
warn "Bucket '$BUCKET_NAME' does not exist!"
|
|
read -p "Create the bucket now? (yes/no): " CREATE_BUCKET
|
|
if [ "$CREATE_BUCKET" = "yes" ]; then
|
|
influx bucket create \
|
|
--name "$BUCKET_NAME" \
|
|
--retention 0 \
|
|
--host "$INFLUX_URL" \
|
|
--org "$INFLUX_ORG" \
|
|
--token "$INFLUX_TOKEN" || error "Failed to create bucket"
|
|
log "✅ Bucket created successfully"
|
|
else
|
|
error "Cannot proceed without target bucket"
|
|
fi
|
|
fi
|
|
|
|
# Final confirmation
|
|
echo ""
|
|
warn "⚠️ IMPORTANT INFORMATION:"
|
|
echo " Source: $SOURCE_ENV/$SELECTED_EXPORT"
|
|
echo " Target: $TARGET_ENVIRONMENT ($INFLUX_URL)"
|
|
echo " Bucket: $BUCKET_NAME"
|
|
echo " Data Size: $CSV_SIZE"
|
|
warn " This will ADD data to the bucket (existing data will be preserved)"
|
|
warn " Duplicate timestamps may cause overwrites"
|
|
echo ""
|
|
read -p "Are you sure you want to continue? (yes/no): " CONFIRM
|
|
|
|
if [ "$CONFIRM" != "yes" ]; then
|
|
log "Import cancelled by user"
|
|
exit 0
|
|
fi
|
|
|
|
# Perform import
|
|
echo ""
|
|
log "🚀 Starting import operation..."
|
|
log "This may take several minutes for large files..."
|
|
echo ""
|
|
|
|
# Create a temporary file for line protocol conversion
|
|
TEMP_LP_FILE=$(mktemp)
|
|
trap "rm -f $TEMP_LP_FILE" EXIT
|
|
|
|
info "Converting CSV to line protocol format..."
|
|
|
|
# Convert annotated CSV to line protocol using awk
|
|
# Skip annotation lines (starting with #) and empty lines
|
|
awk -F',' '
|
|
BEGIN {OFS=","}
|
|
# Skip annotation lines
|
|
/^#/ {next}
|
|
# Skip empty lines
|
|
/^[[:space:]]*$/ {next}
|
|
# Process header to get field positions
|
|
NR==1 {
|
|
for (i=1; i<=NF; i++) {
|
|
field[$i] = i
|
|
}
|
|
next
|
|
}
|
|
# Process data rows
|
|
{
|
|
# Extract values
|
|
time = $field["_time"]
|
|
measurement = $field["_measurement"]
|
|
exchange = $field["exchange"]
|
|
ticker = $field["ticker"]
|
|
timeframe = $field["timeframe"]
|
|
|
|
# Skip if essential fields are missing
|
|
if (time == "" || measurement == "" || exchange == "" || ticker == "" || timeframe == "") next
|
|
|
|
# Build line protocol
|
|
# Format: measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp
|
|
printf "%s,exchange=%s,ticker=%s,timeframe=%s ", measurement, exchange, ticker, timeframe
|
|
|
|
# Add fields
|
|
first = 1
|
|
for (fname in field) {
|
|
if (fname != "_time" && fname != "_start" && fname != "_stop" && fname != "_measurement" &&
|
|
fname != "exchange" && fname != "ticker" && fname != "timeframe" &&
|
|
fname != "result" && fname != "table" && fname != "") {
|
|
val = $field[fname]
|
|
if (val != "" && val != "NaN") {
|
|
if (!first) printf ","
|
|
# Check if value is numeric
|
|
if (val ~ /^[0-9]+$/) {
|
|
printf "%s=%si", fname, val
|
|
} else {
|
|
printf "%s=%s", fname, val
|
|
}
|
|
first = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
# Add timestamp (convert RFC3339 to nanoseconds if needed)
|
|
printf " %s\n", time
|
|
}
|
|
' "$CSV_FILE" > "$TEMP_LP_FILE" 2>/dev/null || {
|
|
warn "CSV parsing method 1 failed, trying direct import..."
|
|
|
|
# Alternative: Use influx write with CSV format directly
|
|
info "Attempting direct CSV import..."
|
|
|
|
if influx write \
|
|
--host "$INFLUX_URL" \
|
|
--org "$INFLUX_ORG" \
|
|
--token "$INFLUX_TOKEN" \
|
|
--bucket "$BUCKET_NAME" \
|
|
--format csv \
|
|
--file "$CSV_FILE" 2>&1; then
|
|
|
|
log "✅ Import completed successfully using direct CSV method!"
|
|
|
|
echo ""
|
|
log "📊 Import Summary"
|
|
echo "============================================"
|
|
info "Source: $SOURCE_ENV/$SELECTED_EXPORT"
|
|
info "Target: $TARGET_ENVIRONMENT"
|
|
info "Bucket: $BUCKET_NAME"
|
|
log "Status: Success"
|
|
echo "============================================"
|
|
echo ""
|
|
exit 0
|
|
else
|
|
error "Both import methods failed. Please check the error messages above."
|
|
fi
|
|
}
|
|
|
|
# If line protocol was generated, import it
|
|
if [ -s "$TEMP_LP_FILE" ]; then
|
|
LP_LINES=$(wc -l < "$TEMP_LP_FILE" | xargs)
|
|
info "Generated $LP_LINES lines of line protocol"
|
|
|
|
# Import in batches to avoid timeouts
|
|
BATCH_SIZE=5000
|
|
TOTAL_LINES=$LP_LINES
|
|
CURRENT_LINE=0
|
|
|
|
info "Importing in batches of $BATCH_SIZE lines..."
|
|
|
|
while [ $CURRENT_LINE -lt $TOTAL_LINES ]; do
|
|
END_LINE=$((CURRENT_LINE + BATCH_SIZE))
|
|
BATCH_NUM=$((CURRENT_LINE / BATCH_SIZE + 1))
|
|
PROGRESS=$((CURRENT_LINE * 100 / TOTAL_LINES))
|
|
|
|
info "Processing batch $BATCH_NUM (Progress: ${PROGRESS}%)..."
|
|
|
|
# Extract batch and import
|
|
sed -n "$((CURRENT_LINE + 1)),${END_LINE}p" "$TEMP_LP_FILE" | \
|
|
influx write \
|
|
--host "$INFLUX_URL" \
|
|
--org "$INFLUX_ORG" \
|
|
--token "$INFLUX_TOKEN" \
|
|
--bucket "$BUCKET_NAME" \
|
|
--precision s 2>&1 || {
|
|
warn "Batch $BATCH_NUM had errors, continuing..."
|
|
}
|
|
|
|
CURRENT_LINE=$END_LINE
|
|
done
|
|
|
|
log "✅ Import completed successfully!"
|
|
else
|
|
error "Failed to generate line protocol data"
|
|
fi
|
|
|
|
# Final summary
|
|
echo ""
|
|
echo "============================================"
|
|
log "📊 Import Summary"
|
|
echo "============================================"
|
|
info "Source: $SOURCE_ENV/$SELECTED_EXPORT"
|
|
info "Target: $TARGET_ENVIRONMENT"
|
|
info "Bucket: $BUCKET_NAME"
|
|
info "File: $(basename "$CSV_FILE")"
|
|
info "Size: $CSV_SIZE"
|
|
log "Status: Complete"
|
|
echo "============================================"
|
|
echo ""
|
|
|
|
log "🎉 Data successfully imported to $TARGET_ENVIRONMENT!"
|
|
echo ""
|
|
info "Verify the import with:"
|
|
echo " influx query 'from(bucket:\"$BUCKET_NAME\") |> range(start:-1d) |> limit(n:10)' \\"
|
|
echo " --host \"$INFLUX_URL\" --org \"$INFLUX_ORG\" --token \"$INFLUX_TOKEN\""
|
|
echo ""
|
|
|