Add influxdb export/import + add new influxdb instance for prod
This commit is contained in:
378
scripts/influxdb/import-csv-data.sh
Executable file
378
scripts/influxdb/import-csv-data.sh
Executable file
@@ -0,0 +1,378 @@
|
||||
#!/bin/bash
|
||||
|
||||
# InfluxDB CSV Data Import Script
|
||||
# Usage: ./import-csv-data.sh
|
||||
|
||||
set -e # Exit on any error
|
||||
|
||||
# Colors for output
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
BLUE='\033[0;34m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# Get the directory where the script is located
|
||||
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
|
||||
PROJECT_ROOT="$(dirname "$(dirname "$SCRIPT_DIR")")"
|
||||
SRC_DIR="$PROJECT_ROOT/src"
|
||||
EXPORTS_BASE_DIR="$SCRIPT_DIR/exports"
|
||||
|
||||
# Logging functions
|
||||
log() {
|
||||
echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')] $1${NC}"
|
||||
}
|
||||
|
||||
warn() {
|
||||
echo -e "${YELLOW}[$(date +'%Y-%m-%d %H:%M:%S')] WARNING: $1${NC}"
|
||||
}
|
||||
|
||||
error() {
|
||||
echo -e "${RED}[$(date +'%Y-%m-%d %H:%M:%S')] ERROR: $1${NC}"
|
||||
exit 1
|
||||
}
|
||||
|
||||
info() {
|
||||
echo -e "${BLUE}[$(date +'%Y-%m-%d %H:%M:%S')] INFO: $1${NC}"
|
||||
}
|
||||
|
||||
# Check if influx CLI is installed
|
||||
command -v influx >/dev/null 2>&1 || error "InfluxDB CLI is not installed. Please install it first: brew install influxdb-cli"
|
||||
|
||||
# Check if jq is installed for JSON parsing
|
||||
if ! command -v jq >/dev/null 2>&1; then
|
||||
warn "jq is not installed. Installing it for JSON parsing..."
|
||||
if command -v brew >/dev/null 2>&1; then
|
||||
brew install jq || error "Failed to install jq. Please install it manually: brew install jq"
|
||||
else
|
||||
error "jq is not installed and brew is not available. Please install jq manually."
|
||||
fi
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "============================================"
|
||||
echo " InfluxDB CSV Data Import"
|
||||
echo "============================================"
|
||||
echo ""
|
||||
|
||||
# Check if exports directory exists
|
||||
if [ ! -d "$EXPORTS_BASE_DIR" ]; then
|
||||
error "Exports directory not found: $EXPORTS_BASE_DIR"
|
||||
fi
|
||||
|
||||
# List available source environments
|
||||
echo "Available export source environments:"
|
||||
ENVIRONMENTS=($(ls -d "$EXPORTS_BASE_DIR"/*/ 2>/dev/null | xargs -n 1 basename))
|
||||
|
||||
if [ ${#ENVIRONMENTS[@]} -eq 0 ]; then
|
||||
error "No export environments found in: $EXPORTS_BASE_DIR"
|
||||
fi
|
||||
|
||||
for i in "${!ENVIRONMENTS[@]}"; do
|
||||
echo "$((i+1))) ${ENVIRONMENTS[$i]}"
|
||||
done
|
||||
echo ""
|
||||
read -p "Select source environment (1-${#ENVIRONMENTS[@]}): " ENV_CHOICE
|
||||
|
||||
if [ "$ENV_CHOICE" -lt 1 ] || [ "$ENV_CHOICE" -gt ${#ENVIRONMENTS[@]} ]; then
|
||||
error "Invalid choice"
|
||||
fi
|
||||
|
||||
SOURCE_ENV="${ENVIRONMENTS[$((ENV_CHOICE-1))]}"
|
||||
ENV_EXPORT_DIR="$EXPORTS_BASE_DIR/$SOURCE_ENV"
|
||||
|
||||
log "Selected source environment: $SOURCE_ENV"
|
||||
|
||||
# List available export timestamps
|
||||
echo ""
|
||||
echo "Available exports for $SOURCE_ENV:"
|
||||
EXPORTS=($(ls -d "$ENV_EXPORT_DIR"/*/ 2>/dev/null | xargs -n 1 basename | sort -r))
|
||||
|
||||
if [ ${#EXPORTS[@]} -eq 0 ]; then
|
||||
error "No exports found for environment: $SOURCE_ENV"
|
||||
fi
|
||||
|
||||
for i in "${!EXPORTS[@]}"; do
|
||||
EXPORT_PATH="$ENV_EXPORT_DIR/${EXPORTS[$i]}"
|
||||
METADATA_FILE="$EXPORT_PATH/export-metadata.txt"
|
||||
|
||||
if [ -f "$METADATA_FILE" ]; then
|
||||
EXPORT_SIZE=$(grep "Export Size:" "$METADATA_FILE" | cut -d: -f2 | xargs)
|
||||
DATA_POINTS=$(grep "Data Points:" "$METADATA_FILE" | cut -d: -f2 | xargs)
|
||||
EXPORT_DATE=$(grep "Date:" "$METADATA_FILE" | cut -d: -f2- | xargs)
|
||||
echo "$((i+1))) ${EXPORTS[$i]} - $EXPORT_DATE ($EXPORT_SIZE, $DATA_POINTS points)"
|
||||
else
|
||||
echo "$((i+1))) ${EXPORTS[$i]}"
|
||||
fi
|
||||
done
|
||||
echo ""
|
||||
read -p "Select export to import (1-${#EXPORTS[@]}): " EXPORT_CHOICE
|
||||
|
||||
if [ "$EXPORT_CHOICE" -lt 1 ] || [ "$EXPORT_CHOICE" -gt ${#EXPORTS[@]} ]; then
|
||||
error "Invalid choice"
|
||||
fi
|
||||
|
||||
SELECTED_EXPORT="${EXPORTS[$((EXPORT_CHOICE-1))]}"
|
||||
IMPORT_FROM_DIR="$ENV_EXPORT_DIR/$SELECTED_EXPORT"
|
||||
|
||||
log "Selected export: $SELECTED_EXPORT"
|
||||
info "Export location: $IMPORT_FROM_DIR"
|
||||
|
||||
# Find CSV file
|
||||
CSV_FILE=$(find "$IMPORT_FROM_DIR" -name "*.csv" | head -1)
|
||||
|
||||
if [ ! -f "$CSV_FILE" ]; then
|
||||
error "No CSV file found in: $IMPORT_FROM_DIR"
|
||||
fi
|
||||
|
||||
CSV_SIZE=$(du -sh "$CSV_FILE" | cut -f1)
|
||||
info "CSV file: $(basename "$CSV_FILE") ($CSV_SIZE)"
|
||||
|
||||
# Select target environment for import
|
||||
echo ""
|
||||
echo "Select TARGET environment for import:"
|
||||
echo "1) SandboxLocal"
|
||||
echo "2) ProductionLocal"
|
||||
echo ""
|
||||
read -p "Enter your choice (1 or 2): " TARGET_ENV_CHOICE
|
||||
|
||||
case $TARGET_ENV_CHOICE in
|
||||
1)
|
||||
TARGET_ENVIRONMENT="SandboxLocal"
|
||||
APPSETTINGS_FILE="$SRC_DIR/Managing.Api/appsettings.SandboxLocal.json"
|
||||
;;
|
||||
2)
|
||||
TARGET_ENVIRONMENT="ProductionLocal"
|
||||
APPSETTINGS_FILE="$SRC_DIR/Managing.Api/appsettings.ProductionLocal.json"
|
||||
;;
|
||||
*)
|
||||
error "Invalid choice. Please run the script again and select 1 or 2."
|
||||
;;
|
||||
esac
|
||||
|
||||
log "Target environment: $TARGET_ENVIRONMENT"
|
||||
|
||||
# Check if appsettings file exists
|
||||
if [ ! -f "$APPSETTINGS_FILE" ]; then
|
||||
error "Configuration file not found: $APPSETTINGS_FILE"
|
||||
fi
|
||||
|
||||
log "Reading configuration from: $APPSETTINGS_FILE"
|
||||
|
||||
# Parse InfluxDB settings from JSON
|
||||
INFLUX_URL=$(jq -r '.InfluxDb.Url' "$APPSETTINGS_FILE")
|
||||
INFLUX_ORG=$(jq -r '.InfluxDb.Organization' "$APPSETTINGS_FILE")
|
||||
INFLUX_TOKEN=$(jq -r '.InfluxDb.Token' "$APPSETTINGS_FILE")
|
||||
|
||||
# Validate parsed values
|
||||
if [ "$INFLUX_URL" = "null" ] || [ -z "$INFLUX_URL" ]; then
|
||||
error "Failed to parse InfluxDb.Url from configuration file"
|
||||
fi
|
||||
|
||||
if [ "$INFLUX_ORG" = "null" ] || [ -z "$INFLUX_ORG" ]; then
|
||||
error "Failed to parse InfluxDb.Organization from configuration file"
|
||||
fi
|
||||
|
||||
if [ "$INFLUX_TOKEN" = "null" ] || [ -z "$INFLUX_TOKEN" ]; then
|
||||
error "Failed to parse InfluxDb.Token from configuration file"
|
||||
fi
|
||||
|
||||
info "Target InfluxDB URL: $INFLUX_URL"
|
||||
info "Organization: $INFLUX_ORG"
|
||||
|
||||
# Get bucket name
|
||||
BUCKET_NAME="prices-bucket"
|
||||
|
||||
# Check if bucket exists
|
||||
info "Checking if bucket '$BUCKET_NAME' exists..."
|
||||
if influx bucket list --host "$INFLUX_URL" --org "$INFLUX_ORG" --token "$INFLUX_TOKEN" --name "$BUCKET_NAME" &>/dev/null; then
|
||||
log "✅ Bucket '$BUCKET_NAME' exists"
|
||||
else
|
||||
warn "Bucket '$BUCKET_NAME' does not exist!"
|
||||
read -p "Create the bucket now? (yes/no): " CREATE_BUCKET
|
||||
if [ "$CREATE_BUCKET" = "yes" ]; then
|
||||
influx bucket create \
|
||||
--name "$BUCKET_NAME" \
|
||||
--retention 0 \
|
||||
--host "$INFLUX_URL" \
|
||||
--org "$INFLUX_ORG" \
|
||||
--token "$INFLUX_TOKEN" || error "Failed to create bucket"
|
||||
log "✅ Bucket created successfully"
|
||||
else
|
||||
error "Cannot proceed without target bucket"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Final confirmation
|
||||
echo ""
|
||||
warn "⚠️ IMPORTANT INFORMATION:"
|
||||
echo " Source: $SOURCE_ENV/$SELECTED_EXPORT"
|
||||
echo " Target: $TARGET_ENVIRONMENT ($INFLUX_URL)"
|
||||
echo " Bucket: $BUCKET_NAME"
|
||||
echo " Data Size: $CSV_SIZE"
|
||||
warn " This will ADD data to the bucket (existing data will be preserved)"
|
||||
warn " Duplicate timestamps may cause overwrites"
|
||||
echo ""
|
||||
read -p "Are you sure you want to continue? (yes/no): " CONFIRM
|
||||
|
||||
if [ "$CONFIRM" != "yes" ]; then
|
||||
log "Import cancelled by user"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Perform import
|
||||
echo ""
|
||||
log "🚀 Starting import operation..."
|
||||
log "This may take several minutes for large files..."
|
||||
echo ""
|
||||
|
||||
# Create a temporary file for line protocol conversion
|
||||
TEMP_LP_FILE=$(mktemp)
|
||||
trap "rm -f $TEMP_LP_FILE" EXIT
|
||||
|
||||
info "Converting CSV to line protocol format..."
|
||||
|
||||
# Convert annotated CSV to line protocol using awk
|
||||
# Skip annotation lines (starting with #) and empty lines
|
||||
awk -F',' '
|
||||
BEGIN {OFS=","}
|
||||
# Skip annotation lines
|
||||
/^#/ {next}
|
||||
# Skip empty lines
|
||||
/^[[:space:]]*$/ {next}
|
||||
# Process header to get field positions
|
||||
NR==1 {
|
||||
for (i=1; i<=NF; i++) {
|
||||
field[$i] = i
|
||||
}
|
||||
next
|
||||
}
|
||||
# Process data rows
|
||||
{
|
||||
# Extract values
|
||||
time = $field["_time"]
|
||||
measurement = $field["_measurement"]
|
||||
exchange = $field["exchange"]
|
||||
ticker = $field["ticker"]
|
||||
timeframe = $field["timeframe"]
|
||||
|
||||
# Skip if essential fields are missing
|
||||
if (time == "" || measurement == "" || exchange == "" || ticker == "" || timeframe == "") next
|
||||
|
||||
# Build line protocol
|
||||
# Format: measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp
|
||||
printf "%s,exchange=%s,ticker=%s,timeframe=%s ", measurement, exchange, ticker, timeframe
|
||||
|
||||
# Add fields
|
||||
first = 1
|
||||
for (fname in field) {
|
||||
if (fname != "_time" && fname != "_start" && fname != "_stop" && fname != "_measurement" &&
|
||||
fname != "exchange" && fname != "ticker" && fname != "timeframe" &&
|
||||
fname != "result" && fname != "table" && fname != "") {
|
||||
val = $field[fname]
|
||||
if (val != "" && val != "NaN") {
|
||||
if (!first) printf ","
|
||||
# Check if value is numeric
|
||||
if (val ~ /^[0-9]+$/) {
|
||||
printf "%s=%si", fname, val
|
||||
} else {
|
||||
printf "%s=%s", fname, val
|
||||
}
|
||||
first = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Add timestamp (convert RFC3339 to nanoseconds if needed)
|
||||
printf " %s\n", time
|
||||
}
|
||||
' "$CSV_FILE" > "$TEMP_LP_FILE" 2>/dev/null || {
|
||||
warn "CSV parsing method 1 failed, trying direct import..."
|
||||
|
||||
# Alternative: Use influx write with CSV format directly
|
||||
info "Attempting direct CSV import..."
|
||||
|
||||
if influx write \
|
||||
--host "$INFLUX_URL" \
|
||||
--org "$INFLUX_ORG" \
|
||||
--token "$INFLUX_TOKEN" \
|
||||
--bucket "$BUCKET_NAME" \
|
||||
--format csv \
|
||||
--file "$CSV_FILE" 2>&1; then
|
||||
|
||||
log "✅ Import completed successfully using direct CSV method!"
|
||||
|
||||
echo ""
|
||||
log "📊 Import Summary"
|
||||
echo "============================================"
|
||||
info "Source: $SOURCE_ENV/$SELECTED_EXPORT"
|
||||
info "Target: $TARGET_ENVIRONMENT"
|
||||
info "Bucket: $BUCKET_NAME"
|
||||
log "Status: Success"
|
||||
echo "============================================"
|
||||
echo ""
|
||||
exit 0
|
||||
else
|
||||
error "Both import methods failed. Please check the error messages above."
|
||||
fi
|
||||
}
|
||||
|
||||
# If line protocol was generated, import it
|
||||
if [ -s "$TEMP_LP_FILE" ]; then
|
||||
LP_LINES=$(wc -l < "$TEMP_LP_FILE" | xargs)
|
||||
info "Generated $LP_LINES lines of line protocol"
|
||||
|
||||
# Import in batches to avoid timeouts
|
||||
BATCH_SIZE=5000
|
||||
TOTAL_LINES=$LP_LINES
|
||||
CURRENT_LINE=0
|
||||
|
||||
info "Importing in batches of $BATCH_SIZE lines..."
|
||||
|
||||
while [ $CURRENT_LINE -lt $TOTAL_LINES ]; do
|
||||
END_LINE=$((CURRENT_LINE + BATCH_SIZE))
|
||||
BATCH_NUM=$((CURRENT_LINE / BATCH_SIZE + 1))
|
||||
PROGRESS=$((CURRENT_LINE * 100 / TOTAL_LINES))
|
||||
|
||||
info "Processing batch $BATCH_NUM (Progress: ${PROGRESS}%)..."
|
||||
|
||||
# Extract batch and import
|
||||
sed -n "$((CURRENT_LINE + 1)),${END_LINE}p" "$TEMP_LP_FILE" | \
|
||||
influx write \
|
||||
--host "$INFLUX_URL" \
|
||||
--org "$INFLUX_ORG" \
|
||||
--token "$INFLUX_TOKEN" \
|
||||
--bucket "$BUCKET_NAME" \
|
||||
--precision s 2>&1 || {
|
||||
warn "Batch $BATCH_NUM had errors, continuing..."
|
||||
}
|
||||
|
||||
CURRENT_LINE=$END_LINE
|
||||
done
|
||||
|
||||
log "✅ Import completed successfully!"
|
||||
else
|
||||
error "Failed to generate line protocol data"
|
||||
fi
|
||||
|
||||
# Final summary
|
||||
echo ""
|
||||
echo "============================================"
|
||||
log "📊 Import Summary"
|
||||
echo "============================================"
|
||||
info "Source: $SOURCE_ENV/$SELECTED_EXPORT"
|
||||
info "Target: $TARGET_ENVIRONMENT"
|
||||
info "Bucket: $BUCKET_NAME"
|
||||
info "File: $(basename "$CSV_FILE")"
|
||||
info "Size: $CSV_SIZE"
|
||||
log "Status: Complete"
|
||||
echo "============================================"
|
||||
echo ""
|
||||
|
||||
log "🎉 Data successfully imported to $TARGET_ENVIRONMENT!"
|
||||
echo ""
|
||||
info "Verify the import with:"
|
||||
echo " influx query 'from(bucket:\"$BUCKET_NAME\") |> range(start:-1d) |> limit(n:10)' \\"
|
||||
echo " --host \"$INFLUX_URL\" --org \"$INFLUX_ORG\" --token \"$INFLUX_TOKEN\""
|
||||
echo ""
|
||||
|
||||
Reference in New Issue
Block a user