Documentation
¶
Index ¶
- Variables
- func Master(datadir string, body io.Reader) (written int64, err error)
- type Cat
- func (c *Cat) CatActPipeline(ctx context.Context, in <-chan cattrack.CatTrack) error
- func (c *Cat) CleanTracks(ctx context.Context, in <-chan cattrack.CatTrack) <-chan cattrack.CatTrack
- func (c *Cat) Close() error
- func (c *Cat) ExportInfluxDB(tracks []cattrack.CatTrack) error
- func (c *Cat) GetDefaultRgeoIndexer() (*reducer.CellIndexer, error)
- func (c *Cat) GetDefaultS2CellIndexer() (*reducer.CellIndexer, error)
- func (c *Cat) ImprovedActTracks(ctx context.Context, in <-chan cattrack.CatTrack) <-chan cattrack.CatTrack
- func (c *Cat) IsRPCEnabled() bool
- func (c *Cat) IsRgeoRPCEnabled() bool
- func (c *Cat) IsTilingRPCEnabled() bool
- func (c *Cat) LockOrLoadState(readOnly bool) error
- func (c *Cat) MustGetLapState() *lap.State
- func (c *Cat) MustGetNapState() *nap.State
- func (c *Cat) OffsetIndexer(ctx context.Context, in <-chan cattrack.CatTrack) error
- func (c *Cat) Populate(ctx context.Context, sort bool, in <-chan cattrack.CatTrack) error
- func (c *Cat) PopulateReader(ctx context.Context, sort bool, in io.Reader) (err error)
- func (c *Cat) ProducerPipelines(ctx context.Context, in <-chan cattrack.CatTrack) error
- func (c *Cat) RGeoIndexTracks(ctx context.Context, in <-chan cattrack.CatTrack) error
- func (c *Cat) RgeoCollectLevel(ctx context.Context, level int) ([]cattrack.CatTrack, error)
- func (c *Cat) RgeoDumpLevel(wr io.Writer, level int) error
- func (c *Cat) S2CollectLevel(ctx context.Context, level catS2.CellLevel) ([]cattrack.CatTrack, error)
- func (c *Cat) S2DumpLevel(wr io.Writer, level catS2.CellLevel) error
- func (c *Cat) S2IndexTracks(ctx context.Context, in <-chan cattrack.CatTrack) error
- func (c *Cat) StoreLapState(ls *lap.State) error
- func (c *Cat) StoreNapState(ns *nap.State) error
- func (c *Cat) StoreSnaps(ctx context.Context, in <-chan cattrack.CatTrack) (out chan cattrack.CatTrack, errs chan error)
- func (c *Cat) StoreTracks(ctx context.Context, in <-chan cattrack.CatTrack) (errCh chan error)
- func (c *Cat) StoreTracksYYYYMM(ctx context.Context, in <-chan cattrack.CatTrack) (errCh chan error)
- func (c *Cat) SubscribeFancyLogs()
- func (c *Cat) TrackLaps(ctx context.Context, in <-chan cattrack.CatTrack) (*lap.State, <-chan cattrack.CatLap)
- func (c *Cat) TrackNaps(ctx context.Context, in <-chan cattrack.CatTrack) <-chan cattrack.CatNap
- func (c *Cat) Unbacktrack(ctx context.Context, in <-chan cattrack.CatTrack) (<-chan cattrack.CatTrack, func() error)
- func (c *Cat) Validate(ctx context.Context, in <-chan cattrack.CatTrack) (valid chan cattrack.CatTrack, invalid chan cattrack.CatTrack)
- type CatHandler
- type TestCat
- type Window
Constants ¶
This section is empty.
Variables ¶
var PropKeyInvalid = "invalid"
var TestDatadirRoot = filepath.Join(os.TempDir(), "catd_api_test")
Functions ¶
func Master ¶
Master stores the incoming CatTracks in their original form (EXACTLY as posted; reading from the request body) in a file <datadir>/master.json.gz. Each call to Master appends to the file. The 'body' value is written in its entirety with a single newline following. Users of the master file should note that this results in lines which may be longer than bufio.MaxScanTokenSize, and should be prepared to handle this.
Types ¶
type Cat ¶
type Cat struct {
CatID conceptual.CatID
DataDir string
// Ok, actually we DO have to have/want a conn to state.
// An API function might use another API function,
// and they might want to share a state conn.
State *state.CatState
// contains filtered or unexported fields
}
Cat is the API representation of a cat. It does not reflect cat state. (Well, it can _reflect_ it, but not ~be~ it). It CAN reflect values about some (assumed, or inferred) cat, where data for the cat can come from some context, like a token (permissions), a CLI-flag, a URL parameter, or even an environment value. Anywhere cat data comes from, that is not the state of this app.
func NewCat ¶
func NewCat(catID conceptual.CatID, datadir string, backend *params.CatRPCServices) (*Cat, error)
NewCat inits a new Cat, but it does not access state. The given datadir should be the CAT datadir (not the daemon datadir).
func (*Cat) CatActPipeline ¶
func (*Cat) CleanTracks ¶
func (c *Cat) CleanTracks(ctx context.Context, in <-chan cattrack.CatTrack) <-chan cattrack.CatTrack
CleanTracks probably doesn't need a cat. FIXME? Turn it loose (method to func; no cat needed).
func (*Cat) GetDefaultRgeoIndexer ¶
func (c *Cat) GetDefaultRgeoIndexer() (*reducer.CellIndexer, error)
func (*Cat) GetDefaultS2CellIndexer ¶
func (c *Cat) GetDefaultS2CellIndexer() (*reducer.CellIndexer, error)
func (*Cat) ImprovedActTracks ¶
func (*Cat) IsRPCEnabled ¶
func (*Cat) IsRgeoRPCEnabled ¶
IsRgeoRPCEnabled returns true if the cat has an RPC client for rgeo. It does not check if the rgeo service is running, only if the client is configured.
func (*Cat) IsTilingRPCEnabled ¶
func (*Cat) LockOrLoadState ¶
LockOrLoadState makes sure a Cat has r|w CatState. If readOnly is false it will block unless already open.
func (*Cat) MustGetLapState ¶
func (*Cat) MustGetNapState ¶
func (*Cat) OffsetIndexer ¶
OffsetIndexer is a simple indexer that reduces tracks by count and time offsets.
func (*Cat) PopulateReader ¶
func (*Cat) ProducerPipelines ¶
func (*Cat) RGeoIndexTracks ¶
RGeoIndexTracks indexes incoming CatTracks for one cat.
func (*Cat) RgeoCollectLevel ¶
RgeoCollectLevel returns all indexed tracks for a given Rgeo level (dataset).
func (*Cat) S2CollectLevel ¶
func (c *Cat) S2CollectLevel(ctx context.Context, level catS2.CellLevel) ([]cattrack.CatTrack, error)
S2CollectLevel returns all indexed tracks for a given S2 cell level.
func (*Cat) S2DumpLevel ¶
S2CollectLevel writes all indexed tracks for a given S2 cell level.
func (*Cat) S2IndexTracks ¶
S2IndexTracks indexes incoming CatTracks for one cat.
func (*Cat) StoreSnaps ¶
func (c *Cat) StoreSnaps(ctx context.Context, in <-chan cattrack.CatTrack) (out chan cattrack.CatTrack, errs chan error)
StoreSnaps returns a channel of potentially-transformed CatTracks and a channel of errors. Incoming cattrack Snaps are mutated/transformed -- stripping Base64 data, adding storage URLs -- and likewise forwarded to the output. Output is unbuffered and blocking, requires a consumer. Remember: Snaps for which the handler errors ARE ALSO forwarded to the output channel. This ensures that subsequent track handlers do not miss _any_ tracks because of any Snap logic issues. Errors are forwarded to the error channel. The handler is idempotent and can be run multiple times on the same input.
Cat Snaps are originally uploaded by the client encoded in base64 in a properties attribute 'imgB64'. This handler attempts to decode the data, store it locally as a .jpg, and then upload it to S3. If decoding fails, the original track is forwarded to the output channel unmodified. If upload is successful, the track is modified in-place to include the S3 URL in an attribute 'imgS3', and the original `imgB64` attribute is removed. If upload fails, the original track is forwarded to the output channel unmodified. If the cat handler finds that the snap already exists in the cat state, it is not uploaded again, nor transformed.
func (*Cat) StoreTracks ¶
StoreTracks stores incoming CatTracks for one cat to disk.
func (*Cat) StoreTracksYYYYMM ¶
func (*Cat) SubscribeFancyLogs ¶
func (c *Cat) SubscribeFancyLogs()
func (*Cat) Unbacktrack ¶
func (c *Cat) Unbacktrack(ctx context.Context, in <-chan cattrack.CatTrack) (<-chan cattrack.CatTrack, func() error)
Unbacktrack is a dangerous device. It prevents tracks from getting populated. It drops (filters) tracks that lie WITHIN a Cat/UUID's already-populated time window. This prevents the cat from back-filling tracks that were missed, if they fall within the already-seen window. But it enables Cat.Populate runs to run idempotently given the same data source.
type TestCat ¶
type TestCat Cat
func NewTestCatReader ¶
func NewTestCatWriter ¶
NewTestCatWriter creates a new cat with a writable state in a temporary directory inside the TestDatadirRoot.