|
- // Copyright 2019 Huawei Technologies Co.,Ltd.
- // Licensed under the Apache License, Version 2.0 (the "License"); you may not use
- // this file except in compliance with the License. You may obtain a copy of the
- // License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software distributed
- // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- // CONDITIONS OF ANY KIND, either express or implied. See the License for the
- // specific language governing permissions and limitations under the License.
-
- package obs
-
- import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "math/rand"
- "net"
- "net/http"
- "net/url"
- "os"
- "strings"
- "time"
- )
-
- func prepareHeaders(headers map[string][]string, meta bool, isObs bool) map[string][]string {
- _headers := make(map[string][]string, len(headers))
- if headers != nil {
- for key, value := range headers {
- key = strings.TrimSpace(key)
- if key == "" {
- continue
- }
- _key := strings.ToLower(key)
- if _, ok := allowedRequestHTTPHeaderMetadataNames[_key]; !ok && !strings.HasPrefix(key, HEADER_PREFIX) && !strings.HasPrefix(key, HEADER_PREFIX_OBS) {
- if !meta {
- continue
- }
- if !isObs {
- _key = HEADER_PREFIX_META + _key
- } else {
- _key = HEADER_PREFIX_META_OBS + _key
- }
- } else {
- _key = key
- }
- _headers[_key] = value
- }
- }
- return _headers
- }
-
- func (obsClient ObsClient) doActionWithoutBucket(action, method string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- return obsClient.doAction(action, method, "", "", input, output, true, true, extensions)
- }
-
- func (obsClient ObsClient) doActionWithBucketV2(action, method, bucketName string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname {
- return errors.New("Bucket is empty")
- }
- return obsClient.doAction(action, method, bucketName, "", input, output, false, true, extensions)
- }
-
- func (obsClient ObsClient) doActionWithBucket(action, method, bucketName string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname {
- return errors.New("Bucket is empty")
- }
- return obsClient.doAction(action, method, bucketName, "", input, output, true, true, extensions)
- }
-
- func (obsClient ObsClient) doActionWithBucketAndKey(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- return obsClient._doActionWithBucketAndKey(action, method, bucketName, objectKey, input, output, true, extensions)
- }
-
- func (obsClient ObsClient) doActionWithBucketAndKeyV2(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname {
- return errors.New("Bucket is empty")
- }
- if strings.TrimSpace(objectKey) == "" {
- return errors.New("Key is empty")
- }
- return obsClient.doAction(action, method, bucketName, objectKey, input, output, false, true, extensions)
- }
-
- func (obsClient ObsClient) doActionWithBucketAndKeyUnRepeatable(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- return obsClient._doActionWithBucketAndKey(action, method, bucketName, objectKey, input, output, false, extensions)
- }
-
- func (obsClient ObsClient) _doActionWithBucketAndKey(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, repeatable bool, extensions []extensionOptions) error {
- if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname {
- return errors.New("Bucket is empty")
- }
- if strings.TrimSpace(objectKey) == "" {
- return errors.New("Key is empty")
- }
- return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, repeatable, extensions)
- }
-
- func (obsClient ObsClient) doAction(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, xmlResult bool, repeatable bool, extensions []extensionOptions) error {
-
- var resp *http.Response
- var respError error
- doLog(LEVEL_INFO, "Enter method %s...", action)
- start := GetCurrentTimestamp()
-
- params, headers, data, err := input.trans(obsClient.conf.signature == SignatureObs)
- if err != nil {
- return err
- }
-
- if params == nil {
- params = make(map[string]string)
- }
-
- if headers == nil {
- headers = make(map[string][]string)
- }
-
- for _, extension := range extensions {
- if extensionHeader, ok := extension.(extensionHeaders); ok {
- _err := extensionHeader(headers, obsClient.conf.signature == SignatureObs)
- if _err != nil {
- doLog(LEVEL_WARN, fmt.Sprintf("set header with error: %v", _err))
- }
- } else {
- doLog(LEVEL_WARN, "Unsupported extensionOptions")
- }
- }
-
- switch method {
- case HTTP_GET:
- resp, respError = obsClient.doHTTPGet(bucketName, objectKey, params, headers, data, repeatable)
- case HTTP_POST:
- resp, respError = obsClient.doHTTPPost(bucketName, objectKey, params, headers, data, repeatable)
- case HTTP_PUT:
- resp, respError = obsClient.doHTTPPut(bucketName, objectKey, params, headers, data, repeatable)
- case HTTP_DELETE:
- resp, respError = obsClient.doHTTPDelete(bucketName, objectKey, params, headers, data, repeatable)
- case HTTP_HEAD:
- resp, respError = obsClient.doHTTPHead(bucketName, objectKey, params, headers, data, repeatable)
- case HTTP_OPTIONS:
- resp, respError = obsClient.doHTTPOptions(bucketName, objectKey, params, headers, data, repeatable)
- default:
- respError = errors.New("Unexpect http method error")
- }
- if respError == nil && output != nil {
- respError = ParseResponseToBaseModel(resp, output, xmlResult, obsClient.conf.signature == SignatureObs)
- if respError != nil {
- doLog(LEVEL_WARN, "Parse response to BaseModel with error: %v", respError)
- }
- } else {
- doLog(LEVEL_WARN, "Do http request with error: %v", respError)
- }
-
- if isDebugLogEnabled() {
- doLog(LEVEL_DEBUG, "End method %s, obsclient cost %d ms", action, (GetCurrentTimestamp() - start))
- }
-
- return respError
- }
-
- func (obsClient ObsClient) doHTTPGet(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_GET, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable)
- }
-
- func (obsClient ObsClient) doHTTPHead(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_HEAD, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable)
- }
-
- func (obsClient ObsClient) doHTTPOptions(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_OPTIONS, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable)
- }
-
- func (obsClient ObsClient) doHTTPDelete(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_DELETE, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable)
- }
-
- func (obsClient ObsClient) doHTTPPut(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_PUT, bucketName, objectKey, params, prepareHeaders(headers, true, obsClient.conf.signature == SignatureObs), data, repeatable)
- }
-
- func (obsClient ObsClient) doHTTPPost(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_POST, bucketName, objectKey, params, prepareHeaders(headers, true, obsClient.conf.signature == SignatureObs), data, repeatable)
- }
-
- func (obsClient ObsClient) doHTTPWithSignedURL(action, method string, signedURL string, actualSignedRequestHeaders http.Header, data io.Reader, output IBaseModel, xmlResult bool) (respError error) {
- req, err := http.NewRequest(method, signedURL, data)
- if err != nil {
- return err
- }
- if obsClient.conf.ctx != nil {
- req = req.WithContext(obsClient.conf.ctx)
- }
- var resp *http.Response
-
- var isSecurityToken bool
- var securityToken string
- var query []string
- parmas := strings.Split(signedURL, "?")
- if len(parmas) > 1 {
- query = strings.Split(parmas[1], "&")
- for _, value := range query {
- if strings.HasPrefix(value, HEADER_STS_TOKEN_AMZ+"=") || strings.HasPrefix(value, HEADER_STS_TOKEN_OBS+"=") {
- if value[len(HEADER_STS_TOKEN_AMZ)+1:] != "" {
- securityToken = value[len(HEADER_STS_TOKEN_AMZ)+1:]
- isSecurityToken = true
- }
- }
- }
- }
- logSignedURL := signedURL
- if isSecurityToken {
- logSignedURL = strings.Replace(logSignedURL, securityToken, "******", -1)
- }
- doLog(LEVEL_INFO, "Do %s with signedUrl %s...", action, logSignedURL)
-
- req.Header = actualSignedRequestHeaders
- if value, ok := req.Header[HEADER_HOST_CAMEL]; ok {
- req.Host = value[0]
- delete(req.Header, HEADER_HOST_CAMEL)
- } else if value, ok := req.Header[HEADER_HOST]; ok {
- req.Host = value[0]
- delete(req.Header, HEADER_HOST)
- }
-
- if value, ok := req.Header[HEADER_CONTENT_LENGTH_CAMEL]; ok {
- req.ContentLength = StringToInt64(value[0], -1)
- delete(req.Header, HEADER_CONTENT_LENGTH_CAMEL)
- } else if value, ok := req.Header[HEADER_CONTENT_LENGTH]; ok {
- req.ContentLength = StringToInt64(value[0], -1)
- delete(req.Header, HEADER_CONTENT_LENGTH)
- }
-
- req.Header[HEADER_USER_AGENT_CAMEL] = []string{USER_AGENT}
- start := GetCurrentTimestamp()
- resp, err = obsClient.httpClient.Do(req)
- if isInfoLogEnabled() {
- doLog(LEVEL_INFO, "Do http request cost %d ms", (GetCurrentTimestamp() - start))
- }
-
- var msg interface{}
- if err != nil {
- respError = err
- resp = nil
- } else {
- doLog(LEVEL_DEBUG, "Response headers: %v", resp.Header)
- if resp.StatusCode >= 300 {
- respError = ParseResponseToObsError(resp, obsClient.conf.signature == SignatureObs)
- msg = resp.Status
- resp = nil
- } else {
- if output != nil {
- respError = ParseResponseToBaseModel(resp, output, xmlResult, obsClient.conf.signature == SignatureObs)
- }
- if respError != nil {
- doLog(LEVEL_WARN, "Parse response to BaseModel with error: %v", respError)
- }
- }
- }
-
- if msg != nil {
- doLog(LEVEL_ERROR, "Failed to send request with reason:%v", msg)
- }
-
- if isDebugLogEnabled() {
- doLog(LEVEL_DEBUG, "End method %s, obsclient cost %d ms", action, (GetCurrentTimestamp() - start))
- }
-
- return
- }
-
- func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (resp *http.Response, respError error) {
-
- bucketName = strings.TrimSpace(bucketName)
-
- method = strings.ToUpper(method)
-
- var redirectURL string
- var requestURL string
- maxRetryCount := obsClient.conf.maxRetryCount
- maxRedirectCount := obsClient.conf.maxRedirectCount
-
- var _data io.Reader
- if data != nil {
- if dataStr, ok := data.(string); ok {
- doLog(LEVEL_DEBUG, "Do http request with string: %s", dataStr)
- headers["Content-Length"] = []string{IntToString(len(dataStr))}
- _data = strings.NewReader(dataStr)
- } else if dataByte, ok := data.([]byte); ok {
- doLog(LEVEL_DEBUG, "Do http request with byte array")
- headers["Content-Length"] = []string{IntToString(len(dataByte))}
- _data = bytes.NewReader(dataByte)
- } else if dataReader, ok := data.(io.Reader); ok {
- _data = dataReader
- } else {
- doLog(LEVEL_WARN, "Data is not a valid io.Reader")
- return nil, errors.New("Data is not a valid io.Reader")
- }
- }
-
- var lastRequest *http.Request
- redirectFlag := false
- for i, redirectCount := 0, 0; i <= maxRetryCount; i++ {
- if redirectURL != "" {
- if !redirectFlag {
- parsedRedirectURL, err := url.Parse(redirectURL)
- if err != nil {
- return nil, err
- }
- requestURL, err = obsClient.doAuth(method, bucketName, objectKey, params, headers, parsedRedirectURL.Host)
- if err != nil {
- return nil, err
- }
- if parsedRequestURL, err := url.Parse(requestURL); err != nil {
- return nil, err
- } else if parsedRequestURL.RawQuery != "" && parsedRedirectURL.RawQuery == "" {
- redirectURL += "?" + parsedRequestURL.RawQuery
- }
- }
- requestURL = redirectURL
- } else {
- var err error
- requestURL, err = obsClient.doAuth(method, bucketName, objectKey, params, headers, "")
- if err != nil {
- return nil, err
- }
- }
-
- req, err := http.NewRequest(method, requestURL, _data)
- if obsClient.conf.ctx != nil {
- req = req.WithContext(obsClient.conf.ctx)
- }
- if err != nil {
- return nil, err
- }
- doLog(LEVEL_DEBUG, "Do request with url [%s] and method [%s]", requestURL, method)
-
- if isDebugLogEnabled() {
- auth := headers[HEADER_AUTH_CAMEL]
- delete(headers, HEADER_AUTH_CAMEL)
-
- var isSecurityToken bool
- var securityToken []string
- if securityToken, isSecurityToken = headers[HEADER_STS_TOKEN_AMZ]; isSecurityToken {
- headers[HEADER_STS_TOKEN_AMZ] = []string{"******"}
- } else if securityToken, isSecurityToken = headers[HEADER_STS_TOKEN_OBS]; isSecurityToken {
- headers[HEADER_STS_TOKEN_OBS] = []string{"******"}
- }
- doLog(LEVEL_DEBUG, "Request headers: %v", headers)
- headers[HEADER_AUTH_CAMEL] = auth
- if isSecurityToken {
- if obsClient.conf.signature == SignatureObs {
- headers[HEADER_STS_TOKEN_OBS] = securityToken
- } else {
- headers[HEADER_STS_TOKEN_AMZ] = securityToken
- }
- }
- }
-
- for key, value := range headers {
- if key == HEADER_HOST_CAMEL {
- req.Host = value[0]
- delete(headers, key)
- } else if key == HEADER_CONTENT_LENGTH_CAMEL {
- req.ContentLength = StringToInt64(value[0], -1)
- delete(headers, key)
- } else {
- req.Header[key] = value
- }
- }
-
- lastRequest = req
-
- req.Header[HEADER_USER_AGENT_CAMEL] = []string{USER_AGENT}
-
- if lastRequest != nil {
- req.Host = lastRequest.Host
- req.ContentLength = lastRequest.ContentLength
- }
-
- start := GetCurrentTimestamp()
- resp, err = obsClient.httpClient.Do(req)
- if isInfoLogEnabled() {
- doLog(LEVEL_INFO, "Do http request cost %d ms", (GetCurrentTimestamp() - start))
- }
-
- var msg interface{}
- if err != nil {
- msg = err
- respError = err
- resp = nil
- if !repeatable {
- break
- }
- } else {
- doLog(LEVEL_DEBUG, "Response headers: %v", resp.Header)
- if resp.StatusCode < 300 {
- break
- } else if !repeatable || (resp.StatusCode >= 400 && resp.StatusCode < 500) || resp.StatusCode == 304 {
- respError = ParseResponseToObsError(resp, obsClient.conf.signature == SignatureObs)
- resp = nil
- break
- } else if resp.StatusCode >= 300 && resp.StatusCode < 400 {
- if location := resp.Header.Get(HEADER_LOCATION_CAMEL); location != "" && redirectCount < maxRedirectCount {
- redirectURL = location
- doLog(LEVEL_WARN, "Redirect request to %s", redirectURL)
- msg = resp.Status
- maxRetryCount++
- redirectCount++
- if resp.StatusCode == 302 && method == HTTP_GET {
- redirectFlag = true
- } else {
- redirectFlag = false
- }
- } else {
- respError = ParseResponseToObsError(resp, obsClient.conf.signature == SignatureObs)
- resp = nil
- break
- }
- } else {
- msg = resp.Status
- }
- }
- if i != maxRetryCount {
- if resp != nil {
- _err := resp.Body.Close()
- if _err != nil {
- doLog(LEVEL_WARN, "Failed to close resp body")
- }
- resp = nil
- }
- if _, ok := headers[HEADER_AUTH_CAMEL]; ok {
- delete(headers, HEADER_AUTH_CAMEL)
- }
- doLog(LEVEL_WARN, "Failed to send request with reason:%v, will try again", msg)
- if r, ok := _data.(*strings.Reader); ok {
- _, err := r.Seek(0, 0)
- if err != nil {
- return nil, err
- }
- } else if r, ok := _data.(*bytes.Reader); ok {
- _, err := r.Seek(0, 0)
- if err != nil {
- return nil, err
- }
- } else if r, ok := _data.(*fileReaderWrapper); ok {
- fd, err := os.Open(r.filePath)
- if err != nil {
- return nil, err
- }
- defer func() {
- errMsg := fd.Close()
- if errMsg != nil {
- doLog(LEVEL_WARN, "Failed to close with reason: %v", errMsg)
- }
- }()
- fileReaderWrapper := &fileReaderWrapper{filePath: r.filePath}
- fileReaderWrapper.mark = r.mark
- fileReaderWrapper.reader = fd
- fileReaderWrapper.totalCount = r.totalCount
- _data = fileReaderWrapper
- _, err = fd.Seek(r.mark, 0)
- if err != nil {
- return nil, err
- }
- } else if r, ok := _data.(*readerWrapper); ok {
- _, err := r.seek(0, 0)
- if err != nil {
- return nil, err
- }
- }
- time.Sleep(time.Duration(float64(i+2) * rand.Float64() * float64(time.Second)))
- } else {
- doLog(LEVEL_ERROR, "Failed to send request with reason:%v", msg)
- if resp != nil {
- respError = ParseResponseToObsError(resp, obsClient.conf.signature == SignatureObs)
- resp = nil
- }
- }
- }
- return
- }
-
- type connDelegate struct {
- conn net.Conn
- socketTimeout time.Duration
- finalTimeout time.Duration
- }
-
- func getConnDelegate(conn net.Conn, socketTimeout int, finalTimeout int) *connDelegate {
- return &connDelegate{
- conn: conn,
- socketTimeout: time.Second * time.Duration(socketTimeout),
- finalTimeout: time.Second * time.Duration(finalTimeout),
- }
- }
-
- func (delegate *connDelegate) Read(b []byte) (n int, err error) {
- setReadDeadlineErr := delegate.SetReadDeadline(time.Now().Add(delegate.socketTimeout))
- flag := isDebugLogEnabled()
-
- if setReadDeadlineErr != nil && flag {
- doLog(LEVEL_DEBUG, "Failed to set read deadline with reason: %v, but it's ok", setReadDeadlineErr)
- }
-
- n, err = delegate.conn.Read(b)
- setReadDeadlineErr = delegate.SetReadDeadline(time.Now().Add(delegate.finalTimeout))
- if setReadDeadlineErr != nil && flag {
- doLog(LEVEL_DEBUG, "Failed to set read deadline with reason: %v, but it's ok", setReadDeadlineErr)
- }
- return n, err
- }
-
- func (delegate *connDelegate) Write(b []byte) (n int, err error) {
- setWriteDeadlineErr := delegate.SetWriteDeadline(time.Now().Add(delegate.socketTimeout))
- flag := isDebugLogEnabled()
- if setWriteDeadlineErr != nil && flag {
- doLog(LEVEL_DEBUG, "Failed to set write deadline with reason: %v, but it's ok", setWriteDeadlineErr)
- }
-
- n, err = delegate.conn.Write(b)
- finalTimeout := time.Now().Add(delegate.finalTimeout)
- setWriteDeadlineErr = delegate.SetWriteDeadline(finalTimeout)
- if setWriteDeadlineErr != nil && flag {
- doLog(LEVEL_DEBUG, "Failed to set write deadline with reason: %v, but it's ok", setWriteDeadlineErr)
- }
- setReadDeadlineErr := delegate.SetReadDeadline(finalTimeout)
- if setReadDeadlineErr != nil && flag {
- doLog(LEVEL_DEBUG, "Failed to set read deadline with reason: %v, but it's ok", setReadDeadlineErr)
- }
- return n, err
- }
-
- func (delegate *connDelegate) Close() error {
- return delegate.conn.Close()
- }
-
- func (delegate *connDelegate) LocalAddr() net.Addr {
- return delegate.conn.LocalAddr()
- }
-
- func (delegate *connDelegate) RemoteAddr() net.Addr {
- return delegate.conn.RemoteAddr()
- }
-
- func (delegate *connDelegate) SetDeadline(t time.Time) error {
- return delegate.conn.SetDeadline(t)
- }
-
- func (delegate *connDelegate) SetReadDeadline(t time.Time) error {
- return delegate.conn.SetReadDeadline(t)
- }
-
- func (delegate *connDelegate) SetWriteDeadline(t time.Time) error {
- return delegate.conn.SetWriteDeadline(t)
- }
|