Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design - Session based EMR-S Integration #2271

Closed
Tracked by #2089
penghuo opened this issue Oct 10, 2023 · 0 comments
Closed
Tracked by #2089

Design - Session based EMR-S Integration #2271

penghuo opened this issue Oct 10, 2023 · 0 comments
Labels
enhancement New feature or request Flint v2.12.0 Issues targeting release v2.12.0

Comments

@penghuo
Copy link
Collaborator

penghuo commented Oct 10, 2023

Overview

  • The design could be extend to support Livy session API without change on Client side.
  • The design should no easily integrate with Juno.

In high level, we introduce following concept

  • Session: Session define the statement execution context. Each session is binding to one Spark Job.
  • Statement: Statement represent query to execute in session. One statement map to one session.
Screenshot 2023-10-09 at 2 56 28 PM

Use Cases

Select

Client - Plugin

image

Plugin - Spark

image (1)

Design

API - Todo

  • Create Session API
  • Close Session API

Statement

A session defines the context for executing statements. Each session is bound to a single Spark job.

  • Statement is stored in flint_ql_sessions index
    • type field is statement
    • docId is statementId
  • statement type doc mapping
{
    "flint_ql_sessions": {
        "mappings": {
            "properties": {
                "type": {
                    "type": "keyword" // statement
                },
                "state": {
                    "type": "keyword"
                },
                "statementId": {
                    "type": "keyword"
                },
                "sessionId": {
                    "type": "keyword"
                },
                "error": {
                    "type": "text"
                },
                "lang": {
                    "type": "keyword"
                },                    
                "query": {
                    "type": "keyword"
                }
                "submitTime": {
                    "type": "date",
                    "format": "strict_date_time||epoch_millis"
                },
                "queryId": {
                    "type": "keyword"
                }             
            }
        }
    }
}

Statement State

image (2)

  • waiting("waiting"), Plugin set Statement to waiting state at init stage.
  • running("running"), Spark set Statement to running state before execute statement and if statement is in waiting state and sort by submitTime.
  • success("success"), Spark set to success state if statement run success.
  • failed("failed"), Spark set to failed state if statement run failed.
  • cancelled("cancelled"), Plugin set Statement to Cancelled if state is waiting.

Failure Handling

  • If statement running longer than 10mins, Spark Job (1) Spark Job set state to failed. error = “timeout” (2) cancel statement running.

Session

Session define the statement execution context. There are 3 types of sessions

SessionManager

SessionManager provide follow APIs for Rest/Transports

  • Session allocateSession(CreateSessionRequest req)
  • Optional getSession(SessionId id)

Session state store

  • Session is stored in flint_ql_session index.
    • type field is session
    • docId is sessionId
{
    "flint_ql_sessions": {
        "mappings": {
            "properties": {
                "type": {
                    "type": "keyword" // session
                },
                "state": {
                    "type": "keyword" // not_started running dead fail
                },
                "sessionId": {
                    "type": "keyword"
                },
                "error": {
                    "type": "text"
                },
                "applicationId": {
                    "type": "keyword"
                },
                "jobId": {
                    "type": "keyword"
                },
                "dataSourceName": {
                    "type": "keyword"
                },                
                "lastUpdateTime": {
                    "type": "date",
                    "format": "strict_date_time||epoch_millis"
                }                                             
            }
        }
    }
}

Session State

image (3)

  • NOT_STARTED:
    • Plugin create Session
    • Plugin create doc in flint_ql_sessions
    • Plugin set to NOT_STARTED.
    • Plugin start Spark Job
  • RUNNING: when Spark job running, Spark job set state to STARTED
  • DEAD: Client Close session. Plugin call cancel Job.
    • if spark job closed, no side effect.
    • if spark job running, job will be closed. Spark job receive shutdown_hock, set state to DEAD.
  • FAIL: Spark job set state in case of any failure case. set state to FAIL
  • HEARTBEAT:
    • Plugin check lastUpdateTime, if it is not updated more than 15mins. Plugin call EMR-S job state API
      • if job is running: Plugin cancel EMR-S job. Plugin set state to FAIL.
      • if job is cancelled: Plugin set state to FAIL

Spark Job

Plugin compose Spark Job parameters and submit Spark Job to EMR-S. The submit parameters are:

FlintREPLJob arguments:
  SQL: "select 1"
  QueryResultIndex: queryResultIndex
  SessionId: sessionId
  
mainclass = FlintREPL

Flint Session Index Mapping

{
    "mappings": {
        "properties": {
            "type": {
                "type": "keyword" // session,statement
            },
            "state": {
                "type": "keyword"
            },
            "statementId": {
                "type": "keyword"
            },
            "applicationId": {
                "type": "keyword"
            },
            "sessionId": {
                "type": "keyword"
            },
            "error": {
                "type": "text"
            },
            "lang": {
                "type": "keyword"
            },
            "query": {
                "type": "text"
            },
            "dataSourceName": {
                "type": "keyword"
            },
            "submitTime": {
                "type": "date",
                "format": "strict_date_time||epoch_millis"
            },
            "jobId": {
                "type": "keyword"
            },
            "lastUpdateTime": {
                "type": "date",
                "format": "strict_date_time||epoch_millis"
            },
            "queryId": {
                "type": "keyword"
            }
        }
    }
}

Fault Tolerance

Backpressure

The system is configured to handle up to 15 queries per queue and 1,000 queries at the domain level. Exceeding these thresholds triggers throttling, and the system will respond with exceptions to additional requests. SessionManager track active sessions.

Spark Application Failure Handling

  • Happy case - Spark Job running in EMR-S update connection state. Connection running in Plugin listen to connection state change then notify registered Sessions.
  • No Statement Exit - If Spark Job does not receive statement for 10mis. Spark Job should exit.
  • Heartbeat - Connection using lastUpdateTime to detected lost connection Spark Job. if lastUpdateTime check failed 1 times consecutive, (1) Trigger Connection close. (2) Trigger session state change with AppState.ERROR and set sessionState = ERROR
  • LSE - In case of EMR-S LSE, Plugin does not take any action and depend on CP assign new Application Id.

Plugin Failure Failure Handling

  • Datasource metadata index unavailable: Flint async query does not work. Error response is “no datasource exist”. Todo - SOP need.
  • Session index unavailable: Flint async query does not work. Error response is “no session store available”. Todo - SOP need.
  • Result index unavailable: Flint async query fetch result does not work. Error response is “result write failed”. Todo - SOP need.
  • Data index unavailable: Flint skipping index / cover index / mv does not work. (1) Query can not be rewrite. (2) Refresh failed with error message “update index failed”.

Interactive Session Recovery

Plugin could trigger Interactive session recovery if session in dead state. The concern is in LSE, Plugin will keep create new session and recover statement. Plugin should emit metrics to CP. CP decide when to trigger statement recovery.

// 1. list all the existing statements in dead session
List<Statement> stList = sql("
    SELECT statement 
    FROM flint_ql_sessions 
    WHERE type = "session" 
      and sessionId = "deadSessionId" 
      and state = "waiting"

// 2. create new session
Session newSession = sessionManager.allocateSession()

// 3. recover statement in new Session
for(st: stList) {
   st.recover(newSession)
}

Deployment

Migrate to new Application

Opt-1

  • CP update DNS record with new Application ID.
  • CP list active sessions in domain,
    • For non streaming session, call DELETE async_query/sessions/{sessionId}/?wait=30mins close session.
      • As InteractiveSession, it wait 30mins || List are finish
      • As BatchSession, it wait 30mins || statement is finish
    • For streaming session,
      • call DELETE async_query/sessions/{sessionId} close session
      • call POST async_query/sessions/{sessionId}/?recover=true&applicationId=newApp to recover session.

Opt-2 - Preferred

  • CP update DNS record with new ApplicationId and old ApplicationId
  • DP list active sessions in domain,
    • For non streaming session, call Session.close().
      • As InteractiveSession, it wait 30mins || List are finish
      • As BatchSession, it wait 30mins || statement is finish
    • For streaming session, call Session.close() close current session and Session.recover() to recover session in new ApplicationId.

Limitation

Metrics & Monitor

Statement

  • statement count
  • success statement count
  • failed statement count

Session

  • active session count
  • dead session count
  • error session count
  • finish session count
@penghuo penghuo added Flint enhancement New feature or request v2.12.0 Issues targeting release v2.12.0 and removed untriaged labels Oct 10, 2023
@penghuo penghuo mentioned this issue Oct 13, 2023
6 tasks
@penghuo penghuo closed this as completed Oct 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request Flint v2.12.0 Issues targeting release v2.12.0
Projects
None yet
Development

No branches or pull requests

1 participant