diff --git a/.gitignore b/.gitignore index 72f7859..a2bc369 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,11 @@ cmd/escp/escp cmd/esdiff/esdiff +cmd/esshards/esshards +cmd/primeshards/primeshards +cmd/shardsize/shardsize + +vendor/**/ .*.swp + + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d955221 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2016 Lytics + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/README.md b/README.md index 369a9ef..0711c46 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,15 @@ esdiff -d 4 http://host1:9200/ srcindex http://host2:9200 dstindex esdiff -d 1 http://host1:9200/ srcindex http://host2:9200 dstindex ``` +## Introspection tools + +* `primeshards` Correlates which indexes which have one or more primary shards on a subset of Elasticsearch nodes. +* `shardsize` Returns a list of indexes whose shards are larger than 10GB. + +### Disclaimer + +This toolkit has been built against Elasticsearch 2.3.2. Your mileage may very on differing versions. + Other Tools ------------------------------- * https://github.com/taskrabbit/elasticsearch-dump diff --git a/cmd/primeshards/README.md b/cmd/primeshards/README.md new file mode 100644 index 0000000..b832d19 --- /dev/null +++ b/cmd/primeshards/README.md @@ -0,0 +1,17 @@ + + +> primeshards +Correlates which indexes which have one or more primary shards on a subset of Elasticsearch nodes. + +Useful for narrowing down hot indexes and their subsequent shards by host. +If a single primary shard is allocated on one of the given nodes, the index is added to the index list returned. + +Usage: see -help + + + + + + +- - - +Generated by [godoc2md](http://godoc.org/github.com/davecheney/godoc2md) diff --git a/cmd/primeshards/main.go b/cmd/primeshards/main.go new file mode 100644 index 0000000..6644477 --- /dev/null +++ b/cmd/primeshards/main.go @@ -0,0 +1,54 @@ +// Correlates which indexes which have one or more primary shards on a subset of Elasticsearch nodes. +// +// Useful for narrowing down hot indexes and their subsequent shards by host. +// If a single primary shard is allocated on one of the given nodes, the index is added to the index list returned. +// +// Usage: see -help +package main + +import ( + "flag" + "sort" + "strings" + + log "github.com/Sirupsen/logrus" + "github.com/lytics/escp/esshards" +) + +func main() { + var hostAddr string + var nodes []string + var nodesRaw string + log.SetLevel(log.InfoLevel) + + flag.StringVar(&nodesRaw, "nodes", "", "Nodes to find common primary shards eg: \"es1,es2,es3\"") + flag.StringVar(&hostAddr, "host", "http://localhost:9200/", "Elasticsearch query address") + flag.Parse() + + log.Debugf("nodesRaw: %v", nodesRaw) + nodes = strings.Split(nodesRaw, ",") + + shardAddr := hostAddr + "_search_shards" + info, err := esshards.Get(shardAddr) + if err != nil { + log.Fatalf("Error querying shard info from Elasticsearch:\n%#v", err) + } + + HRnodeSet := make(map[string]struct{}) + for _, v := range nodes { + HRnodeSet[v] = struct{}{} + } + nodeIDs := esshards.NodesFromHRName(*info, HRnodeSet) + log.Debugf("Nodes associated: %#v", nodeIDs) + + cmi := esshards.CommonPrimaryIndexes(info, nodeIDs) + log.Infof("Indices with primary shards common to %v:\n", nodeIDs) + indexes := make([]string, 0) + for k, _ := range cmi { + indexes = append(indexes, k) + } + sort.Strings(indexes) + for _, v := range indexes { + log.Infof("%s", v) + } +} diff --git a/cmd/shardsize/README.md b/cmd/shardsize/README.md new file mode 100644 index 0000000..d6131bd --- /dev/null +++ b/cmd/shardsize/README.md @@ -0,0 +1,19 @@ + + +> shardsize +Returns a list of indexes whose shards are larger than 10GB. + +Large shard sizes can be problematic in elasticsearch. There is no clear optimal shard size; it is mostly arbitrated by your data structures. + +`cmd/shardsize` calculates which indicies have excessively large shards and prints them to stdout. + +#### eg +`./shardsize --host=http://localhost:9200` + + + + + + +- - - +Generated by [godoc2md](http://godoc.org/github.com/davecheney/godoc2md) diff --git a/cmd/shardsize/main.go b/cmd/shardsize/main.go new file mode 100644 index 0000000..54bcf36 --- /dev/null +++ b/cmd/shardsize/main.go @@ -0,0 +1,70 @@ +// Returns a list of indexes whose shards are larger than 10GB. +// +// Large shard sizes can be problematic in elasticsearch. There is no clear optimal shard size; it is mostly arbitrated by your data structures. +// +// `cmd/shardsize` calculates which indicies have excessively large shards and prints them to stdout. +// +// #### eg +// `./shardsize --host=http://localhost:9200` +package main + +import ( + "flag" + "fmt" + "os" + "sort" + + log "github.com/Sirupsen/logrus" + "github.com/lytics/escp/esshards" + "github.com/lytics/escp/esstats" + "github.com/lytics/escp/estypes" + "github.com/pivotal-golang/bytefmt" +) + +func main() { + var hostAddr string + log.SetLevel(log.InfoLevel) + + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage: %s -host http://localhost:9200/\n", os.Args[0]) + flag.PrintDefaults() + } + + flag.StringVar(&hostAddr, "host", "http://localhost:9200/", "Elasticsearch query address") + flag.Parse() + + shardAddr := hostAddr + "_stats" + stats, err := esstats.Get(shardAddr) + if err != nil { + log.Fatalf("Error querying shard info from Elasticsearch:\n%#v", err) + } + + indices := make(map[string]estypes.IndexInfo) + for k, v := range stats.Indices { + indices[k] = estypes.IndexInfo{Name: k, ByteSize: v.Primaries.Store.IndexByteSize} + } + + shardInfo, err := esshards.Get(hostAddr + "_search_shards") + if err != nil { + log.Fatalf("Error querying shard info from Elasticsearch:\n%#v", err) + } + + shardCount := esstats.CountShards(shardInfo.Shards) + + indexList := make([]estypes.IndexInfo, 0) + for k, v := range shardCount { + ii := indices[k] + ii.ShardCount = v + ii.CalculateShards() + indices[k] = ii + indexList = append(indexList, ii) + } + + sort.Sort(estypes.IndexSort(indexList)) + log.Infof(" Index IndexSize ShardSize Shards (Optimal Shards)") + for _, sc := range indexList { + if sc.OptimalShards-sc.ShardCount > 10 { + log.Infof("%20s: %7s %12s %3d %8d", sc.Name, bytefmt.ByteSize(uint64(sc.ByteSize)), bytefmt.ByteSize(uint64(sc.BytesPerShard)), sc.ShardCount, sc.OptimalShards) + } + } +} diff --git a/esshards/doc.go b/esshards/doc.go new file mode 100644 index 0000000..1f8470c --- /dev/null +++ b/esshards/doc.go @@ -0,0 +1,2 @@ +// ES Shards package encapsulates functionality to understand your Elasticsearch cluster's shard allocation. +package esshards diff --git a/esshards/esshards.go b/esshards/esshards.go new file mode 100644 index 0000000..a24b218 --- /dev/null +++ b/esshards/esshards.go @@ -0,0 +1,148 @@ +package esshards + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + + "github.com/lytics/escp/estypes" +) + +var ErrMissing = errors.New("Error GETting _search_shards endpoint") + +// Get returns structured data from the _search_shards endpoint. +// This includes information about the cluster's nodes and flat listing of shards. +func Get(dst string) (*estypes.SearchShardsEndpoint, error) { + resp, err := http.Get(dst) + if err != nil { + return nil, fmt.Errorf("error contacting source Elasticsearch: %v", err) + } + + if resp.StatusCode == 404 { + return nil, ErrMissing + } + if resp.StatusCode != 200 { + return nil, fmt.Errorf("non-200 status code from source Elasticsearch: %d", resp.StatusCode) + } + + shardInfo := estypes.NewSearchShards() + if err := json.NewDecoder(resp.Body).Decode(&shardInfo); err != nil { + return nil, err + } + return shardInfo, nil +} + +//Return a map of ES node IDs with built map sets for shard IDs +func NodeIDs(endpoint estypes.SearchShardsEndpoint) map[string]map[string][]estypes.ShardAttributes { + nodemap := make(map[string]map[string][]estypes.ShardAttributes) + + for k, _ := range endpoint.Nodes { + nodemap[k] = make(map[string][]estypes.ShardAttributes) + } + return nodemap +} + +// NodesFromHRName matches a list human-readable node names and returns a list of InternalIDs. +func NodesFromHRName(endpoint estypes.SearchShardsEndpoint, esNames map[string]struct{}) map[string]string { + matching := make(map[string]string) + for k, v := range endpoint.Nodes { + if _, e := esNames[v.Name]; e { + matching[k] = v.Name + } + } + return matching +} + +// PrimaryShards accepts a list of ShardAttributes filter and return the Primary shards +func PrimaryShards(shards []estypes.ShardAttributes) []estypes.ShardAttributes { + primaries := make([]estypes.ShardAttributes, 0, 0) + + for _, v := range shards { + if v.Primary == true { + primaries = append(primaries, v) + } + } + return primaries +} + +// FlatShardAttributes accepts a slice ShardInfo([]ShardAttributes), and +// unpacks all of the contained ShardAttributes into a new list to return. +func FlatShardAttributes(shardList []estypes.ShardInfo) []estypes.ShardAttributes { + shardAttrs := make([]estypes.ShardAttributes, 0, 0) + for _, si := range shardList { + for _, s := range si { + shardAttrs = append(shardAttrs, s) + } + } + return shardAttrs +} + +// ProcessShardList calculates a nested map[Node][Index][]ShardAttributes +// which represents [NodeName][IndexName][]PrimaryShards and is returned. +// Exposing which Nodes host an Index's Primary shard. +func ProcessShardList(shardList []estypes.ShardAttributes, nodemap map[string]map[string][]estypes.ShardAttributes) map[string]map[string][]estypes.ShardAttributes { + primaries := PrimaryShards(shardList) + + for _, p := range primaries { + nodemap[p.Node][p.Index] = append(nodemap[p.Node][p.Index], p) + } + return nodemap +} + +// Discover the primary indexes owned by ES Nodes +func NodeIndexSets(info estypes.SearchShardsEndpoint) map[string]map[string]struct{} { + + primaryNodes := make(map[string]map[string]struct{}) + + nodeids := NodeIDs(info) + shardAttrs := FlatShardAttributes(info.Shards) + primeMap := ProcessShardList(shardAttrs, nodeids) + + type empty struct{} + for pk, pv := range primeMap { + for _, iv := range pv { + for _, s := range iv { + //Create map for node if new + if _, e := primaryNodes[pk]; !e { + primaryNodes[pk] = make(map[string]struct{}) + } + //Assign index to Node's set + if _, e := primaryNodes[pk][s.Index]; !e { + primaryNodes[pk][s.Index] = empty{} + } + } + } + } + return primaryNodes +} + +// CommonPrimaryIndexes builds on NodeIndexSets(...) to produce a set of Indexes common to +// a set of Nodes. +func CommonPrimaryIndexes(info *estypes.SearchShardsEndpoint, nodeIDs map[string]string) map[string]struct{} { + commonIndexes := make(map[string]struct{}) + nodeSets := NodeIndexSets(*info) + + for nk, nv := range nodeSets { + if _, exists := nodeIDs[nk]; exists { + if len(commonIndexes) == 0 { + commonIndexes = nv + } else { + commonIndexes = MatchMaps(nv, commonIndexes) + } + } + } + return commonIndexes +} + +// MatchMaps compares two map sets and returns the common keys +func MatchMaps(x, y map[string]struct{}) map[string]struct{} { + z := make(map[string]struct{}) + + for k, _ := range x { + if _, exists := y[k]; exists { + z[k] = struct{}{} + } + } + return z +} diff --git a/esshards/esshards_test.go b/esshards/esshards_test.go new file mode 100644 index 0000000..aec25e1 --- /dev/null +++ b/esshards/esshards_test.go @@ -0,0 +1,188 @@ +package esshards + +import ( + "fmt" + "os" + "testing" + + "github.com/lytics/escp/estypes" +) + +var endpointInfo *estypes.SearchShardsEndpoint + +func init() { + endpointInfo = estypes.NewSearchShards() + + na := make(map[string]estypes.NodeAttributes) + na["uid1"] = estypes.NodeAttributes{Name: "es1"} + na["uid2"] = estypes.NodeAttributes{Name: "es2"} + na["uid3"] = estypes.NodeAttributes{Name: "es3"} + na["uid4"] = estypes.NodeAttributes{Name: "es4"} + endpointInfo.Nodes = na + + sattr0 := estypes.ShardAttributes{Primary: true, Shard: 0, Node: "uid1", Index: "index0"} + sattr1 := estypes.ShardAttributes{Primary: false, Shard: 0, Node: "uid2", Index: "index0"} + saList := make([]estypes.ShardAttributes, 0) + saList = append(saList, sattr0) + saList = append(saList, sattr1) + + sinfo0 := make([]estypes.ShardInfo, 0) + sinfo0 = append(sinfo0, saList) + + sattr0 = estypes.ShardAttributes{Primary: true, Shard: 1, Node: "uid2", Index: "index0"} + sattr1 = estypes.ShardAttributes{Primary: false, Shard: 1, Node: "uid3", Index: "index0"} + saList = make([]estypes.ShardAttributes, 0) + saList = append(saList, sattr0) + saList = append(saList, sattr1) + sinfo0 = append(sinfo0, saList) + + sattr0 = estypes.ShardAttributes{Primary: true, Shard: 2, Node: "uid3", Index: "index0"} + sattr1 = estypes.ShardAttributes{Primary: false, Shard: 2, Node: "uid1", Index: "index0"} + saList = make([]estypes.ShardAttributes, 0) + saList = append(saList, sattr0) + saList = append(saList, sattr1) + sinfo0 = append(sinfo0, saList) + + //Index1 + sattr0 = estypes.ShardAttributes{Primary: true, Shard: 0, Node: "uid4", Index: "index1"} + sattr1 = estypes.ShardAttributes{Primary: false, Shard: 0, Node: "uid2", Index: "index1"} + saList = make([]estypes.ShardAttributes, 0) + saList = append(saList, sattr0) + saList = append(saList, sattr1) + sinfo0 = append(sinfo0, saList) + + sattr0 = estypes.ShardAttributes{Primary: true, Shard: 1, Node: "uid2", Index: "index1"} + sattr1 = estypes.ShardAttributes{Primary: false, Shard: 1, Node: "uid4", Index: "index1"} + saList = make([]estypes.ShardAttributes, 0) + saList = append(saList, sattr0) + saList = append(saList, sattr1) + sinfo0 = append(sinfo0, saList) + + sattr0 = estypes.ShardAttributes{Primary: false, Shard: 2, Node: "uid1", Index: "index1"} + sattr1 = estypes.ShardAttributes{Primary: true, Shard: 2, Node: "uid4", Index: "index1"} + saList = make([]estypes.ShardAttributes, 0) + saList = append(saList, sattr0) + saList = append(saList, sattr1) + sinfo0 = append(sinfo0, saList) + endpointInfo.Shards = sinfo0 +} + +func TestMatching(t *testing.T) { + + x := map[string]struct{}{"hi": struct{}{}, "cat": struct{}{}} + y := map[string]struct{}{"cat": struct{}{}, "neh": struct{}{}} + + match := MatchMaps(x, y) + if len(match) > 1 { + t.Errorf("Match size too large: %#v\n", match) + } + if _, ex := match["cat"]; !ex { + t.Errorf("Error matching common elements of set") + } +} + +func TestCommonNodes(t *testing.T) { + nodes := make(map[string]string) + nodes["uid1"] = "es1" + nodes["uid2"] = "es2" + nodes["uid3"] = "es3" + cmi := CommonPrimaryIndexes(endpointInfo, nodes) + + if _, e := cmi["index0"]; !e { + t.Errorf("index0 primary shards not matched between uids[1,2,3]") + } + + nodes = make(map[string]string) + nodes["uid2"] = "es2" + nodes["uid4"] = "es4" + cmi = CommonPrimaryIndexes(endpointInfo, nodes) + + if _, e := cmi["index1"]; !e { + t.Errorf("index1 primary shards not matched between uids[2,4]") + } +} + +func TestNodesHRNames(t *testing.T) { + HRNodes := make(map[string]struct{}) + HRNodes["es1"] = struct{}{} + HRNodes["es4"] = struct{}{} + + ids := NodesFromHRName(*endpointInfo, HRNodes) + if _, e := ids["uid4"]; !e { + t.Error("'uid4' not returned from human readable node names") + } + if _, e := ids["uid1"]; !e { + t.Error("'uid1' not returned from human readable node names") + } +} + +func TestPrimariesPerNode(t *testing.T) { + primaryNodes := make(map[string]map[string]struct{}) + info := endpointInfo + + nodeids := NodeIDs(*info) + shardAttrs := FlatShardAttributes(info.Shards) + primeMap := ProcessShardList(shardAttrs, nodeids) + + type empty struct{} + for pk, pv := range primeMap { + for _, iv := range pv { + for _, s := range iv { + //fmt.Printf("\t\t%s; %v\n", s.Index, s.Shard) + if _, e := primaryNodes[pk]; !e { + primaryNodes[pk] = make(map[string]struct{}) + } + if _, e := primaryNodes[pk][s.Index]; !e { + primaryNodes[pk][s.Index] = empty{} + } + } + } + } + if _, e := primaryNodes["uid4"]["index1"]; !e { + t.Error("Primary shard of index1 not on uid4") + } + + _, uid2_i0 := primaryNodes["uid2"]["index0"] + _, uid2_i1 := primaryNodes["uid2"]["index1"] + if !uid2_i0 || !uid2_i1 { + t.Error("Both indexes should have primary shards on uid2") + } + + /* + for k, _ := range primaryNodes { + fmt.Printf("%s\n", k) + for ki, _ := range primaryNodes[k] { + fmt.Printf("\t%s\n", ki) + } + } + */ + +} + +//Query a local ES node and display all the indexes +// Skipped unless TESTINT=1 is set in env +func TestFullIntegration(t *testing.T) { + //Skip visual data inspection if TESTINT environment variable isn't set to "1" + if s := os.Getenv("TESTINT"); s != "1" { + return + } + info, err := Get("http://localhost:9200/_search_shards") + if err != nil { + t.Error(err) + } + + nodeids := NodeIDs(*info) + shardAttrs := FlatShardAttributes(info.Shards) + + primeMap := ProcessShardList(shardAttrs, nodeids) + for pk, pv := range primeMap { + fmt.Printf("Node: %s\n", pk) + for ik, iv := range pv { + //fmt.Printf("\tIndex: %s\n%#v\n", ik, iv) + fmt.Printf("\tIndex: %s\n", ik) + for _, s := range iv { + fmt.Printf("\t\t%s; %v\n", s.Index, s.Shard) + } + } + } +} diff --git a/esstats/doc.go b/esstats/doc.go new file mode 100644 index 0000000..045b460 --- /dev/null +++ b/esstats/doc.go @@ -0,0 +1,3 @@ +// ES Stats package API wraps up all the data return from the _stats endpoint +// and returns it in Go data structures. +package esstats diff --git a/esstats/esstats.go b/esstats/esstats.go new file mode 100644 index 0000000..9fa65f4 --- /dev/null +++ b/esstats/esstats.go @@ -0,0 +1,42 @@ +package esstats + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/lytics/escp/estypes" +) + +// Get and decode the response from _stats debug endpoint. +func Get(qry string) (*estypes.Stats, error) { + resp, err := http.Get(qry) + if err != nil { + return nil, fmt.Errorf("error contacting source Elasticsearch: %v", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("non-200 status code from source Elasticsearch: %d", resp.StatusCode) + } + + statsInfo := &estypes.Stats{Indices: make(map[string]estypes.StatsIndices)} + + if err := json.NewDecoder(resp.Body).Decode(&statsInfo); err != nil { + return nil, err + } + return statsInfo, nil +} + +//Create a map of index name to number of shards. Function iterates over +//list of shards returned by _stats and counts them to the index name +func CountShards(shards estypes.ShardList) map[string]int { + shardCount := make(map[string]int) + for _, s := range shards { + if _, ok := shardCount[s[0].Index]; !ok { + shardCount[s[0].Index] = 1 + } else { + shardCount[s[0].Index]++ + } + } + return shardCount +} diff --git a/estypes/doc.go b/estypes/doc.go index 1d769ac..6896d05 100644 --- a/estypes/doc.go +++ b/estypes/doc.go @@ -1,3 +1,3 @@ -// This package contains common Elasticsearch data types used by multiple other +// This package contains common Elasticsearch data types used by other // packages. package estypes diff --git a/estypes/estypes.go b/estypes/estypes.go index 81e15c1..d8e69a9 100644 --- a/estypes/estypes.go +++ b/estypes/estypes.go @@ -3,6 +3,7 @@ package estypes import ( "encoding/json" "errors" + "math" ) type Meta struct { @@ -60,12 +61,12 @@ type ShardList []ShardInfo type ShardInfo []ShardAttributes type ShardAttributes struct { - State string `json:"state"` - Primary bool `json:"primary"` - Node string `json:"node"` - //Relocating bool `json:"relocating_node"` - Shard int `json:"shard"` - Index string `json:"index"` + State string `json:"state"` + Primary bool `json:"primary"` + Node string `json:"node"` + Relocating bool `json:"relocating_node"` + Shard int `json:"shard"` + Index string `json:"index"` } /* @@ -99,8 +100,37 @@ type IndexInfo struct { ByteSize int ShardCount int BytesPerShard int + OptimalShards int } +var GBytes = 10737418240 + +// https://github.com/golang/go/issues/4594#issuecomment-135336012 +func round(f float64) int { + if math.Abs(f) < 0.5 { + return 0 + } + return int(f + math.Copysign(0.5, f)) +} + +func optimalShards(ii IndexInfo) int { + proactiveSize := float64(ii.ByteSize) * 1.25 + proactiveShardCount := round(proactiveSize / float64(GBytes)) + + if proactiveShardCount <= 3 { + return 3 + } else { + return proactiveShardCount + } +} + +// CalculateShards estimates an optimal number of shards given the byte size of an index. +// There's not much science here, just based on poorly performing index backpressure. +func (ii *IndexInfo) CalculateShards() { + ii.OptimalShards = optimalShards(*ii) +} + +// IndexSort defines sorting indexes by their byte size. type IndexSort []IndexInfo func (is IndexSort) Len() int { return len(is) } diff --git a/vendor/vendor.json b/vendor/vendor.json new file mode 100644 index 0000000..6980440 --- /dev/null +++ b/vendor/vendor.json @@ -0,0 +1,91 @@ +{ + "comment": "", + "ignore": "test", + "package": [ + { + "checksumSHA1": "pv31U2z7ESu51w5x38+MvIO5kC4=", + "path": "github.com/Sirupsen/logrus", + "revision": "be8c024f59696b812ed7a498c6c7c4733c58a257", + "revisionTime": "2016-09-20T23:40:39Z" + }, + { + "checksumSHA1": "vJFveoepc+cD62r2X06n/VWioLg=", + "path": "github.com/lytics/escp/cmd/escp", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "2ECoa5rulLXFmGhDphdxbtwM69w=", + "path": "github.com/lytics/escp/cmd/esdiff", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "BpfnwtqtKZg21GBUCgOvhpj5Q8Y=", + "path": "github.com/lytics/escp/cmd/primeshards", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "07MA9V2Iruzf0mcJghBbcxLOAJo=", + "path": "github.com/lytics/escp/cmd/shardsize", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "DLQvabxe6xU0WAzQQTpT7CgOSXM=", + "path": "github.com/lytics/escp/esbulk", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "dYeS2GyRO+n9Rv6EWVwC+FkQCGg=", + "path": "github.com/lytics/escp/esdiff", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "2CvGraMr14qCujKqunw9QwzyBLQ=", + "path": "github.com/lytics/escp/esindex", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "RQq4XoCye59PxJZW2j2jEBk88NU=", + "path": "github.com/lytics/escp/esscroll", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "81BkWEtyLDzXXOI7EDv3D+dbooE=", + "path": "github.com/lytics/escp/esshards", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "/ymqbthHayfyDaofjT/OpzV22sE=", + "path": "github.com/lytics/escp/esstats", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "UVzCr7Aw0pORf5L2iTmX7KnJNak=", + "path": "github.com/lytics/escp/estypes", + "revision": "cf7353bbee396594535cb17988b3e8cc9df5dc9a", + "revisionTime": "2016-12-22T18:35:37Z" + }, + { + "checksumSHA1": "J1oI5fm4VdotEEBN+Kci8o9LJh4=", + "path": "github.com/pivotal-golang/bytefmt", + "revision": "263ce04fc1d71bc7d50c241293488b3f54c779e5", + "revisionTime": "2016-01-07T07:03:02Z" + }, + { + "checksumSHA1": "w/oQQFHCnE+S+2g5D62nfY8aDuQ=", + "path": "golang.org/x/sys/unix", + "revision": "62bee037599929a6e9146f29d10dd5208c43507d", + "revisionTime": "2016-06-14T22:52:37Z" + } + ], + "rootPath": "github.com/lytics/escp" +}