Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 17 additions & 82 deletions sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,10 @@ func newSQLStore(sessionID SessionID, driver string, dataSourceName string, conn
// Reset deletes the store records and sets the seqnums back to 1
func (store *sqlStore) Reset() error {
s := store.sessionID
qr := `DELETE FROM messages
_, err := store.db.Exec(`DELETE FROM messages
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`
if store.sqlDriver == "postgres" {
qr = `DELETE FROM messages
WHERE beginstring=$1 AND session_qualifier=$2
AND sendercompid=$3 AND sendersubid=$4 AND senderlocid=$5
AND targetcompid=$6 AND targetsubid=$7 AND targetlocid=$8`
}
_, err := store.db.Exec(qr,
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand All @@ -100,20 +93,11 @@ func (store *sqlStore) Reset() error {
return err
}

qr = `UPDATE sessions
_, err = store.db.Exec(`UPDATE sessions
SET creation_time=?, incoming_seqnum=?, outgoing_seqnum=?
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`
if store.sqlDriver == "postgres" {
qr = `UPDATE sessions
SET creation_time=$1, incoming_seqnum=$2, outgoing_seqnum=$3
WHERE beginstring=$4 AND session_qualifier=$5
AND sendercompid=$6 AND sendersubid=$7 AND senderlocid=$8
AND targetcompid=$9 AND targetsubid=$10 AND targetlocid=$11`
}

_, err = store.db.Exec(qr,
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
store.cache.CreationTime(), store.cache.NextTargetMsgSeqNum(), store.cache.NextSenderMsgSeqNum(),
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
Expand All @@ -134,19 +118,11 @@ func (store *sqlStore) populateCache() (err error) {
s := store.sessionID
var creationTime time.Time
var incomingSeqNum, outgoingSeqNum int
qr := `SELECT creation_time, incoming_seqnum, outgoing_seqnum
FROM sessions
row := store.db.QueryRow(`SELECT creation_time, incoming_seqnum, outgoing_seqnum
FROM sessions
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`
if store.sqlDriver == "postgres" {
qr = `SELECT creation_time, incoming_seqnum, outgoing_seqnum
FROM sessions
WHERE beginstring=$1 AND session_qualifier=$2
AND sendercompid=$3 AND sendersubid=$4 AND senderlocid=$5
AND targetcompid=$6 AND targetsubid=$7 AND targetlocid=$8`
}
row := store.db.QueryRow(qr,
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand All @@ -167,21 +143,12 @@ func (store *sqlStore) populateCache() (err error) {
}

// session record not found, create it
qr = `INSERT INTO sessions (
_, err = store.db.Exec(`INSERT INTO sessions (
creation_time, incoming_seqnum, outgoing_seqnum,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
if store.sqlDriver == "postgres" {
qr = `INSERT INTO sessions (
creation_time, incoming_seqnum, outgoing_seqnum,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`
}
_, err = store.db.Exec(qr,
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
store.cache.creationTime,
store.cache.NextTargetMsgSeqNum(),
store.cache.NextSenderMsgSeqNum(),
Expand All @@ -205,17 +172,10 @@ func (store *sqlStore) NextTargetMsgSeqNum() int {
// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent
func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error {
s := store.sessionID
qr := `UPDATE sessions SET outgoing_seqnum = ?
_, err := store.db.Exec(`UPDATE sessions SET outgoing_seqnum = ?
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`
if store.sqlDriver == "postgres" {
qr = `UPDATE sessions SET outgoing_seqnum = $1
WHERE beginstring=$2 AND session_qualifier=$3
AND sendercompid=$4 AND sendersubid=$5 AND senderlocid=$6
AND targetcompid=$7 AND targetsubid=$8 AND targetlocid=$9`
}
_, err := store.db.Exec(qr,
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
next, s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand All @@ -228,17 +188,10 @@ func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error {
// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received
func (store *sqlStore) SetNextTargetMsgSeqNum(next int) error {
s := store.sessionID
qr := `UPDATE sessions SET incoming_seqnum = ?
_, err := store.db.Exec(`UPDATE sessions SET incoming_seqnum = ?
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`
if store.sqlDriver == "postgres" {
qr = `UPDATE sessions SET incoming_seqnum = $1
WHERE beginstring=$2 AND session_qualifier=$3
AND sendercompid=$4 AND sendersubid=$5 AND senderlocid=$6
AND targetcompid=$7 AND targetsubid=$8 AND targetlocid=$9`
}
_, err := store.db.Exec(qr,
AND targetcompid=? AND targetsubid=? AND targetlocid=?`,
next, s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand Down Expand Up @@ -268,21 +221,12 @@ func (store *sqlStore) CreationTime() time.Time {
func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error {
s := store.sessionID

qr := `INSERT INTO messages (
msgseqnum, message,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
if store.sqlDriver == "postgres" {
qr = `INSERT INTO messages (
_, err := store.db.Exec(`INSERT INTO messages (
msgseqnum, message,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`
}
_, err := store.db.Exec(qr,
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
seqNum, string(msg),
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
Expand All @@ -294,21 +238,12 @@ func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error {
func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) {
s := store.sessionID
var msgs [][]byte
qr := `SELECT message FROM messages
rows, err := store.db.Query(`SELECT message FROM messages
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?
AND msgseqnum>=? AND msgseqnum<=?
ORDER BY msgseqnum`
if store.sqlDriver == "postgres" {
qr = `SELECT message FROM messages
WHERE beginstring=$1 AND session_qualifier=$2
AND sendercompid=$3 AND sendersubid=$4 AND senderlocid=$5
AND targetcompid=$6 AND targetsubid=$7 AND targetlocid=$8
AND msgseqnum>=$9 AND msgseqnum<=$10
ORDER BY msgseqnum`
}
rows, err := store.db.Query(qr,
ORDER BY msgseqnum`,
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID,
Expand Down