chore: ⬆️ updated deps
This commit is contained in:
+146
-18
@@ -8,6 +8,7 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/golang-sql/sqlexp"
|
||||
"github.com/microsoft/go-mssqldb/aecmk"
|
||||
@@ -26,6 +27,8 @@ type token byte
|
||||
const (
|
||||
tokenReturnStatus token = 121 // 0x79
|
||||
tokenColMetadata token = 129 // 0x81
|
||||
tokenTabName token = 164 // 0xA4
|
||||
tokenColInfo token = 165 // 0xA5
|
||||
tokenOrder token = 169 // 0xA9
|
||||
tokenError token = 170 // 0xAA
|
||||
tokenInfo token = 171 // 0xAB
|
||||
@@ -109,9 +112,38 @@ const (
|
||||
// TODO implement more flags
|
||||
)
|
||||
|
||||
// cancelDrainTimeout bounds how long to wait for the server's cancel confirmation.
|
||||
// If the drain fails for any reason (timeout, I/O error, or context cancellation),
|
||||
// the connection is marked bad via checkBadConn.
|
||||
const cancelDrainTimeout = 5 * time.Second
|
||||
|
||||
type cancelConfirmationResult uint8
|
||||
|
||||
const (
|
||||
cancelConfirmationReceived cancelConfirmationResult = iota
|
||||
cancelConfirmationChannelClosed
|
||||
cancelConfirmationUnavailable
|
||||
)
|
||||
|
||||
// interface for all tokens
|
||||
type tokenStruct interface{}
|
||||
|
||||
// cancelDrainError builds a StreamError for cancel-drain failures.
|
||||
// StreamError is used instead of ServerError because this is a client-side
|
||||
// drain failure, not a server internal error, and StreamError.Error()
|
||||
// surfaces the diagnostic message whereas ServerError.Error() is a fixed string.
|
||||
func cancelDrainError(phase string, drainCtx context.Context, tokErr error) error {
|
||||
msg := "did not get cancellation confirmation from the server"
|
||||
cause := tokErr
|
||||
if cause == nil {
|
||||
cause = drainCtx.Err()
|
||||
}
|
||||
if cause != nil {
|
||||
return StreamError{InnerError: fmt.Errorf("%s (%s: %w)", msg, phase, cause)}
|
||||
}
|
||||
return StreamError{InnerError: fmt.Errorf("%s (%s)", msg, phase)}
|
||||
}
|
||||
|
||||
type orderStruct struct {
|
||||
ColIds []uint16
|
||||
}
|
||||
@@ -411,6 +443,28 @@ func parseOrder(r *tdsBuffer) (res orderStruct) {
|
||||
return res
|
||||
}
|
||||
|
||||
// https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/9b5c1e40-b6ce-4e5f-9ce2-2284cc44a38d
|
||||
func parseTabName(r *tdsBuffer) {
|
||||
// The TABNAME token describes the table names associated with a result set.
|
||||
// It is sent in response to browse-mode queries and INSERT/UPDATE/DELETE on tables with triggers.
|
||||
// We read and discard the data since it is informational only.
|
||||
size := r.uint16()
|
||||
if _, err := io.CopyN(io.Discard, r, int64(size)); err != nil {
|
||||
badStreamPanic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/7ec86c73-d57e-4a0d-b945-d660ef8e5bf8
|
||||
func parseColInfo(r *tdsBuffer) {
|
||||
// The COLINFO token describes the source of the column data in browse mode and
|
||||
// for INSERT/UPDATE/DELETE on tables with triggers.
|
||||
// We read and discard the data since it is informational only.
|
||||
size := r.uint16()
|
||||
if _, err := io.CopyN(io.Discard, r, int64(size)); err != nil {
|
||||
badStreamPanic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// https://msdn.microsoft.com/en-us/library/dd340421.aspx
|
||||
func parseDone(r *tdsBuffer) (res doneStruct) {
|
||||
res.Status = r.uint16()
|
||||
@@ -779,7 +833,8 @@ func readCekTableEntry(r *tdsBuffer) cekTableEntry {
|
||||
|
||||
// http://msdn.microsoft.com/en-us/library/dd357254.aspx
|
||||
func parseRow(ctx context.Context, r *tdsBuffer, s *tdsSession, columns []columnStruct, row []interface{}) error {
|
||||
for i, column := range columns {
|
||||
for i := range columns {
|
||||
column := &columns[i]
|
||||
columnContent := column.ti.Reader(&column.ti, r, nil, s.encoding)
|
||||
if columnContent == nil {
|
||||
row[i] = columnContent
|
||||
@@ -816,7 +871,7 @@ func (R RWCBuffer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func decryptColumn(ctx context.Context, column columnStruct, s *tdsSession, columnContent interface{}) (*tdsBuffer, error) {
|
||||
func decryptColumn(ctx context.Context, column *columnStruct, s *tdsSession, columnContent interface{}) (*tdsBuffer, error) {
|
||||
encType := encryption.From(column.cryptoMeta.encType)
|
||||
cekValue := column.cryptoMeta.entry.cekValues[column.cryptoMeta.ordinal]
|
||||
if (s.logFlags & uint64(msdsn.LogDebug)) == uint64(msdsn.LogDebug) {
|
||||
@@ -860,7 +915,8 @@ func parseNbcRow(ctx context.Context, r *tdsBuffer, s *tdsSession, columns []col
|
||||
bitlen := (len(columns) + 7) / 8
|
||||
pres := make([]byte, bitlen)
|
||||
r.ReadFull(pres)
|
||||
for i, col := range columns {
|
||||
for i := range columns {
|
||||
col := &columns[i]
|
||||
if pres[i/8]&(1<<(uint(i)%8)) != 0 {
|
||||
row[i] = nil
|
||||
continue
|
||||
@@ -995,6 +1051,10 @@ func processSingleResponse(ctx context.Context, sess *tdsSession, ch chan tokenS
|
||||
case tokenFeatureExtAck:
|
||||
featureExtAck := parseFeatureExtAck(sess.buf)
|
||||
ch <- featureExtAck
|
||||
case tokenTabName:
|
||||
parseTabName(sess.buf)
|
||||
case tokenColInfo:
|
||||
parseColInfo(sess.buf)
|
||||
case tokenOrder:
|
||||
order := parseOrder(sess.buf)
|
||||
ch <- order
|
||||
@@ -1139,9 +1199,23 @@ type tokenProcessor struct {
|
||||
noAttn bool
|
||||
}
|
||||
|
||||
// startResponseReader waits for any previous reader goroutine to finish,
|
||||
// then launches a new one that writes tokens to tokChan.
|
||||
func (sess *tdsSession) startResponseReader(ctx context.Context, tokChan chan tokenStruct, outs outputs) {
|
||||
if sess.readDone != nil {
|
||||
<-sess.readDone
|
||||
}
|
||||
readDone := make(chan struct{})
|
||||
sess.readDone = readDone
|
||||
go func() {
|
||||
defer close(readDone)
|
||||
processSingleResponse(ctx, sess, tokChan, outs)
|
||||
}()
|
||||
}
|
||||
|
||||
func startReading(sess *tdsSession, ctx context.Context, outs outputs) *tokenProcessor {
|
||||
tokChan := make(chan tokenStruct, 5)
|
||||
go processSingleResponse(ctx, sess, tokChan, outs)
|
||||
sess.startResponseReader(ctx, tokChan, outs)
|
||||
return &tokenProcessor{
|
||||
tokChan: tokChan,
|
||||
ctx: ctx,
|
||||
@@ -1236,36 +1310,90 @@ func (t tokenProcessor) nextToken() (tokenStruct, error) {
|
||||
// in this case current response would not contain confirmation
|
||||
// and we would need to read one more response
|
||||
|
||||
// t.ctx is already cancelled; use a separate context to drain.
|
||||
drainCtx, drainCancel := context.WithTimeout(context.Background(), cancelDrainTimeout)
|
||||
defer drainCancel()
|
||||
|
||||
// first lets finish reading current response and look
|
||||
// for confirmation in it
|
||||
if readCancelConfirmation(t.tokChan) {
|
||||
result, tokErr := readCancelConfirmation(drainCtx, t.tokChan)
|
||||
switch result {
|
||||
case cancelConfirmationReceived:
|
||||
// we got confirmation in current response
|
||||
return nil, t.ctx.Err()
|
||||
case cancelConfirmationUnavailable:
|
||||
// Drain tokChan in the background so processSingleResponse
|
||||
// can finish sending and exit once the connection closes.
|
||||
go func() {
|
||||
for range t.tokChan {
|
||||
}
|
||||
}()
|
||||
return nil, cancelDrainError("current response", drainCtx, tokErr)
|
||||
}
|
||||
// we did not get cancellation confirmation in the current response
|
||||
// read one more response, it must be there
|
||||
t.tokChan = make(chan tokenStruct, 5)
|
||||
go processSingleResponse(t.ctx, t.sess, t.tokChan, t.outs)
|
||||
if readCancelConfirmation(t.tokChan) {
|
||||
// Use t.ctx (already cancelled) for processSingleResponse so that
|
||||
// ReturnMessageEnqueue calls return immediately via ctx.Done()
|
||||
// instead of blocking on a full message queue, which would stall
|
||||
// the goroutine and prevent it from delivering the DONE_ATTN token.
|
||||
t.sess.startResponseReader(t.ctx, t.tokChan, t.outs)
|
||||
// Fresh timeout for second drain so the first attempt's elapsed
|
||||
// time does not reduce the budget for the second response.
|
||||
drainCtx2, drainCancel2 := context.WithTimeout(context.Background(), cancelDrainTimeout)
|
||||
defer drainCancel2()
|
||||
result2, tokErr2 := readCancelConfirmation(drainCtx2, t.tokChan)
|
||||
if result2 == cancelConfirmationReceived {
|
||||
return nil, t.ctx.Err()
|
||||
}
|
||||
// we did not get cancellation confirmation, something is not
|
||||
// right, this connection is not usable anymore
|
||||
return nil, ServerError{Error{Message: "did not get cancellation confirmation from the server"}}
|
||||
// Drain tokChan in the background so processSingleResponse
|
||||
// can finish sending and exit once the connection closes.
|
||||
go func() {
|
||||
for range t.tokChan {
|
||||
}
|
||||
}()
|
||||
return nil, cancelDrainError("second response", drainCtx2, tokErr2)
|
||||
}
|
||||
}
|
||||
|
||||
func readCancelConfirmation(tokChan chan tokenStruct) bool {
|
||||
for tok := range tokChan {
|
||||
switch tok := tok.(type) {
|
||||
default:
|
||||
// just skip token
|
||||
case doneStruct:
|
||||
if tok.Status&doneAttn != 0 {
|
||||
// got cancellation confirmation, exit
|
||||
return true
|
||||
func readCancelConfirmation(ctx context.Context, tokChan chan tokenStruct) (cancelConfirmationResult, error) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// ctx.Done may win the select even when tokChan is also
|
||||
// ready (Go select is pseudo-random). Drain any buffered
|
||||
// tokens so we don't miss a just-arrived DONE_ATTN.
|
||||
for {
|
||||
select {
|
||||
case tok, ok := <-tokChan:
|
||||
if !ok {
|
||||
return cancelConfirmationChannelClosed, nil
|
||||
}
|
||||
if done, isDone := tok.(doneStruct); isDone && done.Status&doneAttn != 0 {
|
||||
return cancelConfirmationReceived, nil
|
||||
}
|
||||
if tokErr, isErr := tok.(error); isErr {
|
||||
return cancelConfirmationUnavailable, tokErr
|
||||
}
|
||||
continue
|
||||
default:
|
||||
return cancelConfirmationUnavailable, nil
|
||||
}
|
||||
}
|
||||
case tok, ok := <-tokChan:
|
||||
if !ok {
|
||||
return cancelConfirmationChannelClosed, nil
|
||||
}
|
||||
switch tok := tok.(type) {
|
||||
case doneStruct:
|
||||
if tok.Status&doneAttn != 0 {
|
||||
return cancelConfirmationReceived, nil
|
||||
}
|
||||
case error:
|
||||
return cancelConfirmationUnavailable, tok
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user