A robust, production-ready Apache Flink application for processing unstructured log data, parsing it using configurable Grok patterns, and storing it in Apache Iceberg tables.
This Flink-based data processor is designed to ingest unstructured log data from various sources, parse different log formats using customizable Grok patterns, and store the processed data in Apache Iceberg tables for further analysis. The application leverages Apache Flink for stream processing and AWS services for storage and catalog management.
NEW: Now supports generic log processing with configurable patterns! Process Apache logs, Syslog, custom application logs, or any format you define.
📚 Documentation: User Guide | Architecture | Deployment | Troubleshooting
- Continuous Monitoring: Real-time monitoring of S3 paths for new log files
- Flexible Parsing: Configurable Grok patterns for parsing various log formats
- Data Transformation: Automatic mapping of unstructured data to structured formats
- Iceberg Integration: Optimized storage in Apache Iceberg tables with ACID guarantees
- AWS Integration: Support for AWS Glue catalog and S3 storage
- Privacy Protection: Built-in PII data anonymization capabilities
- Scalable: Designed for high-throughput, distributed processing
- Resilient: Fault-tolerant with Flink checkpointing
The application follows a streaming architecture:
┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ S3 Logs │─────▶│ Flink Source │─────▶│ Grok Parser │─────▶│ Iceberg │
│ (Input) │ │ (Monitor) │ │ (Transform) │ │ Tables │
└─────────────┘ └──────────────┘ └─────────────────┘ └──────────────┘
Processing Flow:
- Data Ingestion: Continuously monitors and reads log files from Amazon S3
- Data Parsing: Applies configurable Grok patterns to extract structured data
- Data Transformation: Maps parsed records to Flink RowData format
- Data Storage: Writes data to Apache Iceberg tables with AWS Glue catalog
- Java 11 or higher
- Apache Maven 3.6+
- AWS account with appropriate permissions
- Apache Flink 1.18.1
- Apache Iceberg 1.6.1
The application uses a JSON configuration file (application-properties-local.json) with the following sections:
{
"PropertyGroupId": "job.config",
"PropertyMap": {
"source.window": 1,
"source.s3path": "s3://your-bucket/logs/"
}
}{
"PropertyGroupId": "iceberg.table.properties",
"PropertyMap": {
"format-version": "2",
"write.target-file-size-bytes": "536870912",
"write.parquet.row-group-size-bytes": "134217728",
"write.parquet.page-size-bytes": "1048576",
"write.metadata.delete-after-commit.enabled": "true",
"write.metadata.previous-versions-max": "5",
"write.distribution-mode": "hash"
}
}{
"PropertyGroupId": "iceberg.config",
"PropertyMap": {
"iceberg.database.name": "logs_db",
"iceberg.table.name.default": "parsed_logs",
"iceberg.warehouse": "s3://your-iceberg-warehouse",
"iceberg.write.parallelism": "4"
}
}mvn clean packageThis will create a fat JAR in the target directory.
java -cp target/flink-log-processor-0.1.jar com.github.ghoshp83.flinklogprocessor.UnstructuredDataProcessor- Upload the JAR to an S3 bucket
- Create a Kinesis Data Analytics application
- Configure the application to use the JAR from S3
- Set up the application properties as described in the Configuration section
The application uses Grok patterns to parse log files. Configure patterns in application-properties-generic.json:
{
"PropertyGroupId": "log.patterns",
"PropertyMap": {
"pattern.apache": "%{COMBINEDAPACHELOG}",
"pattern.syslog": "%{SYSLOGBASE} %{GREEDYDATA:message}",
"pattern.custom": "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
}
}- ✅ Apache/Nginx access logs
- ✅ Syslog format
- ✅ Custom application logs
- ✅ JSON structured logs
- ✅ Any Grok-parseable format
Apache Access Log:
127.0.0.1 - - [10/Oct/2024:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 2326
Application Log:
2024-01-15T10:30:00.123Z INFO Application started successfully
Syslog:
Jan 15 10:30:00 server1 app[1234]: User login successful
For comprehensive guide, see GENERIC_LOG_GUIDE.md
For Grok patterns, see Grok Pattern Documentation
The easiest way to try the application locally:
# Run the quick start script
cd examples
./quick-start.sh
# Access Flink UI
open http://localhost:8081This will:
- Build the application
- Start LocalStack (S3 + Glue)
- Start Flink cluster (JobManager + TaskManager)
- Upload sample logs to S3
- Create Glue database
See examples/README.md for detailed instructions.
# Build
mvn clean package
# Run with Docker Compose
docker-compose up -d
# Access Flink UI
open http://localhost:8081# Build
mvn clean package
# Run
java -cp target/flink-log-processor-0.1.jar \
com.github.ghoshp83.flinklogprocessor.UnstructuredDataProcessorThe project includes comprehensive unit and integration tests. Run them with:
mvn test- GENERIC_LOG_GUIDE.md - Comprehensive user guide
- ARCHITECTURE.md - System architecture and design
- DEPLOYMENT.md - Deployment instructions (AWS, K8s, Docker)
- TROUBLESHOOTING.md - Common issues and solutions
- PERFORMANCE.md - Performance benchmarks
- CONTRIBUTING.md - How to contribute
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── enterprise
│ │ │ └── department
│ │ │ ├── catalog # Iceberg catalog and schema management
│ │ │ ├── config # Configuration utilities
│ │ │ ├── functions # Flink mapping functions
│ │ │ ├── model # Data models
│ │ │ ├── parser # Log parsing logic
│ │ │ └── UnstructuredDataProcessor.java # Main application
│ │ └── resources
│ │ ├── application-properties-local.json # Configuration file
│ │ └── log4j2.properties # Logging configuration
│ └── test
│ ├── java # Unit and integration tests
│ └── resources # Test resources
├── pom.xml # Maven configuration
└── README.md # This file
- Grok Pattern Validation: All Grok patterns are validated against a whitelist of allowed elements
- Log Line Length Limits: Maximum log line length is enforced (10,000 characters)
- S3 Path Validation: S3 paths are validated for proper format and security
- Log Type Validation: Log types are restricted to alphanumeric characters and underscores
- Parameterized S3 Buckets: Use organization prefix and environment variables to prevent bucket sniping
- IAM Roles: AWS credentials are handled through IAM roles, never hardcoded
- Network Security: All communication uses HTTPS/TLS encryption
- Updated Dependencies: All dependencies are kept up-to-date with latest security patches
- Vulnerability Scanning: Regular dependency vulnerability scanning in CI/CD
- Minimal Dependencies: Only necessary dependencies are included
- Error Handling: Comprehensive error handling prevents information leakage
- Logging Security: Sensitive data is not logged or is properly sanitized
- Resource Limits: Memory and processing limits prevent DoS attacks
# Set required environment variables
export ORGANIZATION_PREFIX="pralaydata"
export ENVIRONMENT="prod"
# S3 bucket names will be: pralaydata-flink-logs-prod, pralaydata-iceberg-warehouse-prod- Iceberg tables use hash distribution mode for optimal write performance
- Flink checkpointing is configured for fault tolerance
- Iceberg table properties are optimized for file size and row group size
- Fork the repository
- Create a feature branch
- Make your changes
- Run tests
- Submit a pull request
- Application Log Processing: Parse and analyze application logs from microservices
- Web Server Analytics: Process Apache/Nginx access logs for traffic analysis
- Security Log Analysis: Process security logs for threat detection
- System Monitoring: Analyze syslog data from servers and network devices
- IoT Data Processing: Handle sensor and device logs at scale
- Audit Trail Management: Store and query audit logs with time-travel capabilities
- Multi-Format Processing: Handle multiple log formats in a single pipeline
- Generic log format support with configurable Grok patterns
- Dynamic schema creation based on log structure
- Comprehensive documentation and examples
- Support for Kafka as input source
- Real-time alerting based on log patterns
- Built-in data quality checks
- Support for multiple output formats (Parquet, ORC)
- Web UI for monitoring and configuration
- CI/CD pipeline with GitHub Actions
- Docker and Kubernetes deployment
MIT License - See LICENSE file for details
Contributions are welcome! Please feel free to submit a Pull Request.
For questions or support, please open an issue on GitHub.