|
- // Copyright 2019 The Gitea Authors. All rights reserved.
- // Use of this source code is governed by a MIT-style
- // license that can be found in the LICENSE file.
-
- package issues
-
- import (
- "context"
- "errors"
- "fmt"
- "strconv"
- "time"
-
- "code.gitea.io/gitea/modules/log"
-
- "github.com/olivere/elastic/v7"
- )
-
- var (
- _ Indexer = &ElasticSearchIndexer{}
- )
-
- // ElasticSearchIndexer implements Indexer interface
- type ElasticSearchIndexer struct {
- client *elastic.Client
- indexerName string
- }
-
- type elasticLogger struct {
- *log.Logger
- }
-
- func (l elasticLogger) Printf(format string, args ...interface{}) {
- _ = l.Logger.Log(2, l.Logger.GetLevel(), format, args...)
- }
-
- // NewElasticSearchIndexer creates a new elasticsearch indexer
- func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, error) {
- opts := []elastic.ClientOptionFunc{
- elastic.SetURL(url),
- elastic.SetSniff(false),
- elastic.SetHealthcheckInterval(10 * time.Second),
- elastic.SetGzip(false),
- }
-
- logger := elasticLogger{log.GetLogger(log.DEFAULT)}
-
- if logger.GetLevel() == log.TRACE || logger.GetLevel() == log.DEBUG {
- opts = append(opts, elastic.SetTraceLog(logger))
- } else if logger.GetLevel() == log.ERROR || logger.GetLevel() == log.CRITICAL || logger.GetLevel() == log.FATAL {
- opts = append(opts, elastic.SetErrorLog(logger))
- } else if logger.GetLevel() == log.INFO || logger.GetLevel() == log.WARN {
- opts = append(opts, elastic.SetInfoLog(logger))
- }
-
- client, err := elastic.NewClient(opts...)
- if err != nil {
- return nil, err
- }
-
- return &ElasticSearchIndexer{
- client: client,
- indexerName: indexerName,
- }, nil
- }
-
- const (
- defaultMapping = `{
- "mappings": {
- "properties": {
- "id": {
- "type": "integer",
- "index": true
- },
- "repo_id": {
- "type": "integer",
- "index": true
- },
- "title": {
- "type": "text",
- "index": true
- },
- "content": {
- "type": "text",
- "index": true
- },
- "comments": {
- "type" : "text",
- "index": true
- }
- }
- }
- }`
- )
-
- // Init will initialize the indexer
- func (b *ElasticSearchIndexer) Init() (bool, error) {
- ctx := context.Background()
- exists, err := b.client.IndexExists(b.indexerName).Do(ctx)
- if err != nil {
- return false, err
- }
-
- if !exists {
- var mapping = defaultMapping
-
- createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx)
- if err != nil {
- return false, err
- }
- if !createIndex.Acknowledged {
- return false, errors.New("init failed")
- }
-
- return false, nil
- }
- return true, nil
- }
-
- // Index will save the index data
- func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
- if len(issues) == 0 {
- return nil
- } else if len(issues) == 1 {
- issue := issues[0]
- _, err := b.client.Index().
- Index(b.indexerName).
- Id(fmt.Sprintf("%d", issue.ID)).
- BodyJson(map[string]interface{}{
- "id": issue.ID,
- "repo_id": issue.RepoID,
- "title": issue.Title,
- "content": issue.Content,
- "comments": issue.Comments,
- }).
- Do(context.Background())
- return err
- }
-
- reqs := make([]elastic.BulkableRequest, 0)
- for _, issue := range issues {
- reqs = append(reqs,
- elastic.NewBulkIndexRequest().
- Index(b.indexerName).
- Id(fmt.Sprintf("%d", issue.ID)).
- Doc(map[string]interface{}{
- "id": issue.ID,
- "repo_id": issue.RepoID,
- "title": issue.Title,
- "content": issue.Content,
- "comments": issue.Comments,
- }),
- )
- }
-
- _, err := b.client.Bulk().
- Index(b.indexerName).
- Add(reqs...).
- Do(context.Background())
- return err
- }
-
- // Delete deletes indexes by ids
- func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
- if len(ids) == 0 {
- return nil
- } else if len(ids) == 1 {
- _, err := b.client.Delete().
- Index(b.indexerName).
- Id(fmt.Sprintf("%d", ids[0])).
- Do(context.Background())
- return err
- }
-
- reqs := make([]elastic.BulkableRequest, 0)
- for _, id := range ids {
- reqs = append(reqs,
- elastic.NewBulkDeleteRequest().
- Index(b.indexerName).
- Id(fmt.Sprintf("%d", id)),
- )
- }
-
- _, err := b.client.Bulk().
- Index(b.indexerName).
- Add(reqs...).
- Do(context.Background())
- return err
- }
-
- // Search searches for issues by given conditions.
- // Returns the matching issue IDs
- func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
- kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments")
- query := elastic.NewBoolQuery()
- query = query.Must(kwQuery)
- if len(repoIDs) > 0 {
- var repoStrs = make([]interface{}, 0, len(repoIDs))
- for _, repoID := range repoIDs {
- repoStrs = append(repoStrs, repoID)
- }
- repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...)
- query = query.Must(repoQuery)
- }
- searchResult, err := b.client.Search().
- Index(b.indexerName).
- Query(query).
- Sort("id", true).
- From(start).Size(limit).
- Do(context.Background())
- if err != nil {
- return nil, err
- }
-
- hits := make([]Match, 0, limit)
- for _, hit := range searchResult.Hits.Hits {
- id, _ := strconv.ParseInt(hit.Id, 10, 64)
- hits = append(hits, Match{
- ID: id,
- })
- }
-
- return &SearchResult{
- Total: searchResult.TotalHits(),
- Hits: hits,
- }, nil
- }
-
- // Close implements indexer
- func (b *ElasticSearchIndexer) Close() {}
|