Skip to content
Open
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ license-expression==30.4.4
lxml==5.4.0
MarkupSafe==3.0.2
more-itertools==10.7.0
multiregex==2.0.3
normality==2.6.1
packageurl-python==0.17.1
packaging==25.0
Expand Down
1 change: 1 addition & 0 deletions setup-mini.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ install_requires =
license_expression >= 30.4.4
lxml >= 5.4.0
MarkupSafe >= 2.1.2
multiregex >= 2.0.3
normality <= 2.6.1
packageurl_python >= 0.9.0
packvers >= 21.0.0
Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ install_requires =
colorama >= 0.3.9
commoncode >= 32.4.0
container-inspector >= 31.0.0
cyseq >= 0.0.2
debian-inspector >= 31.1.0
dparse2 >= 0.7.0
fasteners
Expand All @@ -90,6 +91,7 @@ install_requires =
license_expression >= 30.4.4
lxml >= 5.4.0
MarkupSafe >= 2.1.2
multiregex >= 2.0.3
normality <= 2.6.1
packageurl_python >= 0.9.0
packvers >= 21.0.0
Expand All @@ -116,7 +118,6 @@ install_requires =
typecode >= 30.0.1
typecode[full] >= 30.0.1
extractcode[full] >= 31.0.0
cyseq >= 0.0.2


[options.packages.find]
Expand Down
18 changes: 16 additions & 2 deletions src/packagedcode/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,29 @@
win_reg.InstalledProgramFromDockerUtilityvmSoftwareHandler,
]


# These handlers are special as they use filetype to
# detect these binaries instead of datafile path patterns
# as these are optionally installed, we can skip checking
# for filetype if these are not available
BINARY_HANDLERS_PRESENT = False
BINARY_PACKAGE_DATAFILE_HANDLERS = []

try:
from go_inspector.binary import get_go_binary_handler
APPLICATION_PACKAGE_DATAFILE_HANDLERS.append(get_go_binary_handler())
handler = get_go_binary_handler()
APPLICATION_PACKAGE_DATAFILE_HANDLERS.append(handler)
BINARY_PACKAGE_DATAFILE_HANDLERS.append(handler)
BINARY_HANDLERS_PRESENT = True
except ImportError:
pass

try:
from rust_inspector.packages import get_rust_binary_handler
APPLICATION_PACKAGE_DATAFILE_HANDLERS.append(get_rust_binary_handler())
handler = get_rust_binary_handler()
APPLICATION_PACKAGE_DATAFILE_HANDLERS.append(handler)
BINARY_PACKAGE_DATAFILE_HANDLERS.append(handler)
BINARY_HANDLERS_PRESENT = True
except ImportError:
pass

Expand Down
200 changes: 200 additions & 0 deletions src/packagedcode/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# ScanCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/scancode-toolkit for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
import json
import attr
import fnmatch

from commoncode.fileutils import create_dir

from packagedcode import APPLICATION_PACKAGE_DATAFILE_HANDLERS
from packagedcode import SYSTEM_PACKAGE_DATAFILE_HANDLERS

from scancode_config import packagedcode_cache_dir
from scancode_config import scancode_cache_dir

"""
An on-disk persistent cache of package manifest patterns and related package
manifest handlers mapping. Loading and dumping the cached package manifest
patterns is safe to use across multiple processes using lock files.
"""

# global in-memory cache of the PkgManifestPatternsCache
_PACKAGE_CACHE = None

PACKAGE_INDEX_LOCK_TIMEOUT = 60 * 6
PACKAGE_INDEX_DIR = 'package_patterns_index'
PACKAGE_INDEX_FILENAME = 'index_cache'
PACKAGE_LOCKFILE_NAME = 'scancode_package_index_lockfile'
PACKAGE_CHECKSUM_FILE = 'scancode_package_index_tree_checksums'


@attr.s
class PkgManifestPatternsCache:
"""
Represent cachable package manifest regex patterns, prematchers
and mappings from regex patterns to datasource IDs for all datafile
handlers.
"""

handler_by_regex = attr.ib(default=attr.Factory(dict))
system_multiregex_patterns = attr.ib(default=attr.Factory(list))
application_multiregex_patterns = attr.ib(default=attr.Factory(list))

@staticmethod
def all_multiregex_patterns(self):
return self.application_multiregex_patterns + [
multiregex_pattern
for multiregex_pattern in self.system_multiregex_patterns
if multiregex_pattern not in self.application_multiregex_patterns
]

@classmethod
def from_mapping(cls, cache_mapping):
return cls(**cache_mapping)

@staticmethod
def load_or_build(
packagedcode_cache_dir=packagedcode_cache_dir,
scancode_cache_dir=scancode_cache_dir,
force=False,
timeout=PACKAGE_INDEX_LOCK_TIMEOUT,
):
"""
Load or build and save and return a PkgManifestPatternsCache object.

We either load a cached PkgManifestPatternsCache or build and cache the patterns.

- If the cache exists, it is returned unless corrupted.
- If ``force`` is True, or if the cache does not exist a new index is built
and cached.
"""
idx_cache_dir = os.path.join(packagedcode_cache_dir, PACKAGE_INDEX_DIR)
create_dir(idx_cache_dir)
cache_file = os.path.join(idx_cache_dir, PACKAGE_INDEX_FILENAME)
has_cache = os.path.exists(cache_file) and os.path.getsize(cache_file)

# bypass build if cache exists
if has_cache and not force:
try:
return load_cache_file(cache_file)
except Exception as e:
# work around some rare Windows quirks
import traceback
print('Inconsistent License cache: rebuilding index.')
print(str(e))
print(traceback.format_exc())


from scancode import lockfile
lock_file = os.path.join(scancode_cache_dir, PACKAGE_LOCKFILE_NAME)

# here, we have no cache: lock, check and rebuild
try:
# acquire lock and wait until timeout to get a lock or die
with lockfile.FileLock(lock_file).locked(timeout=timeout):

system_multiregex_patterns, system_handlers_by_regex = build_mappings_and_multiregex_patterns(
datafile_handlers=SYSTEM_PACKAGE_DATAFILE_HANDLERS,
)
application_multiregex_patterns, application_handlers_by_regex = build_mappings_and_multiregex_patterns(
datafile_handlers=APPLICATION_PACKAGE_DATAFILE_HANDLERS,
)
package_cache = PkgManifestPatternsCache(
handler_by_regex=system_handlers_by_regex + application_handlers_by_regex,
system_multiregex_patterns=system_multiregex_patterns,
application_multiregex_patterns=application_multiregex_patterns,
)
package_cache.dump(cache_file)
return package_cache

except lockfile.LockTimeout:
# TODO: handle unable to lock in a nicer way
raise

def dump(self, cache_file):
"""
Dump this package cache on disk at ``cache_file``.
"""
package_cache = {}
with open(cache_file, 'w') as f:
json.dump(package_cache, f)


def get_prematchers_from_glob_pattern(pattern):
return [
prematcher.lower().lstrip("/")
for prematcher in pattern.split("*")
if prematcher
]


def build_mappings_and_multiregex_patterns(
datafile_handlers,
):
"""
Return an index built from rules and licenses directories
"""
with_patterns = []

for handler in datafile_handlers:
if handler.path_patterns:
with_patterns.append(handler)

handler_by_regex = {}
prematchers_by_regex = {}

for handler in with_patterns:
for pattern in handler.path_patterns:
regex_pattern = fnmatch.translate(pattern)
regex_pattern = fr"{regex_pattern}"

prematchers_by_regex[regex_pattern] = get_prematchers_from_glob_pattern(pattern)

if regex_pattern in handler_by_regex:
handler_by_regex[regex_pattern].append(handler.datasource_id)
else:
handler_by_regex[regex_pattern]= [handler.datasource_id]

multiregex_patterns = []
for regex in handler_by_regex.keys():
regex_and_prematcher = (regex, prematchers_by_regex.get(regex, []))
multiregex_patterns.append(regex_and_prematcher)

return handler_by_regex, multiregex_patterns


def get_cache(
force=False,
):
"""
Return a PkgManifestPatternsCache either rebuilt, cached or loaded from disk.
"""
global _PACKAGE_CACHE

if force or not _PACKAGE_CACHE:
_PACKAGE_CACHE = PkgManifestPatternsCache.load_or_build(
packagedcode_cache_dir=packagedcode_cache_dir,
scancode_cache_dir=scancode_cache_dir,
force=force,
# used for testing only
timeout=PACKAGE_INDEX_LOCK_TIMEOUT,
)
return _PACKAGE_CACHE


def load_cache_file(cache_file):
"""
Return a PkgManifestPatternsCache loaded from JSON ``cache_file``.
"""
with open(cache_file) as f:
cache = json.load(f)

return PkgManifestPatternsCache.from_mapping(cache)
60 changes: 47 additions & 13 deletions src/packagedcode/recognize.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@
import os
import sys

import multiregex

from commoncode import filetype
from packagedcode import APPLICATION_PACKAGE_DATAFILE_HANDLERS
from packagedcode import SYSTEM_PACKAGE_DATAFILE_HANDLERS
from packagedcode import ALL_DATAFILE_HANDLERS
from commoncode.fileutils import as_posixpath

from packagedcode import HANDLER_BY_DATASOURCE_ID
from packagedcode import BINARY_HANDLERS_PRESENT
from packagedcode import BINARY_PACKAGE_DATAFILE_HANDLERS
from packagedcode import models
from packagedcode.cache import get_cache

TRACE = os.environ.get('SCANCODE_DEBUG_PACKAGE_API', False)

Expand Down Expand Up @@ -56,25 +61,19 @@ def recognize_package_data(
if not filetype.is_file(location):
return []

assert application or system or package_only
if package_only or (application and system):
datafile_handlers = ALL_DATAFILE_HANDLERS
elif application:
datafile_handlers = APPLICATION_PACKAGE_DATAFILE_HANDLERS
elif system:
datafile_handlers = SYSTEM_PACKAGE_DATAFILE_HANDLERS

return list(_parse(
location=location,
package_only=package_only,
datafile_handlers=datafile_handlers,
application=application,
system=system,
))


def _parse(
location,
application=True,
system=False,
package_only=False,
datafile_handlers=APPLICATION_PACKAGE_DATAFILE_HANDLERS,
):
"""
Yield parsed PackageData objects from ``location``. Raises Exceptions on errors.
Expand All @@ -83,6 +82,41 @@ def _parse(
Default to use application packages
"""

package_path = as_posixpath(location)
package_patterns = get_cache()

assert application or system or package_only
if package_only or (application and system):
multiregex_patterns = package_patterns.all_multiregex_patterns
elif application:
multiregex_patterns = package_patterns.application_multiregex_patterns
elif system:
multiregex_patterns = package_patterns.system_multiregex_patterns

package_matcher = multiregex.RegexMatcher(multiregex_patterns)
matched_patterns = package_matcher.match(package_path)

datafile_handlers = []
for matched_pattern in matched_patterns:
regex, _match = matched_pattern
handler_ids = package_patterns.handler_by_regex.get(regex.pattern)
if TRACE:
logger_debug(f'_parse:.handler_ids: {handler_ids}')

datafile_handlers = [
HANDLER_BY_DATASOURCE_ID.get(handler_id)
for handler_id in handler_ids
]

if not datafile_handlers:
if BINARY_HANDLERS_PRESENT:
datafile_handlers = BINARY_PACKAGE_DATAFILE_HANDLERS
else:
if TRACE:
logger_debug(f'_parse: no package datafile detected at {package_path}')

return

for handler in datafile_handlers:
if TRACE:
logger_debug(f'_parse:.is_datafile: {handler}')
Expand Down
2 changes: 1 addition & 1 deletion src/packagedcode/rubygems.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def assemble(cls, package_data, resource, codebase, package_adder):
# TODO: https://stackoverflow.com/questions/41454333/meaning-of-new-block-git-sourcegithub-in-gemfile
class GemfileHandler(GemspecHandler):
datasource_id = 'gemfile'
path_patterns = ('*/Gemfile', '*/*.gemfile', '*/Gemfile-*')
path_patterns = ('*/Gemfile', '*.gemfile', '*/Gemfile-*')
default_package_type = 'gem'
default_primary_language = 'Ruby'
description = 'RubyGems Bundler Gemfile'
Expand Down
6 changes: 6 additions & 0 deletions src/scancode_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
NOTE: this is essentailly a copy of commoncode.fileutils.create_dir()
"""

if exists(location):

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a user-provided value.
This path depends on a
user-provided value
.
This path depends on a user-provided value.
This path depends on a user-provided value.
if not os.path.isdir(location):

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a user-provided value.
This path depends on a
user-provided value
.
This path depends on a user-provided value.
This path depends on a user-provided value.
err = ('Cannot create directory: existing file '
'in the way ''%(location)s.')
raise OSError(err % locals())
Expand All @@ -49,20 +49,20 @@
# may fail on win if the path is too long
# FIXME: consider using UNC ?\\ paths
try:
os.makedirs(location)

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a user-provided value.
This path depends on a
user-provided value
.
This path depends on a user-provided value.
This path depends on a user-provided value.

# avoid multi-process TOCTOU conditions when creating dirs
# the directory may have been created since the exist check
except WindowsError as e:
# [Error 183] Cannot create a file when that file already exists
if e and e.winerror == 183:
if not os.path.isdir(location):

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a user-provided value.
This path depends on a
user-provided value
.
This path depends on a user-provided value.
This path depends on a user-provided value.
raise
else:
raise
except (IOError, OSError) as o:
if o.errno == errno.EEXIST:
if not os.path.isdir(location):

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a user-provided value.
This path depends on a
user-provided value
.
This path depends on a user-provided value.
This path depends on a user-provided value.
raise
else:
raise
Expand Down Expand Up @@ -185,7 +185,13 @@
__env_license_cache_dir = os.getenv('SCANCODE_LICENSE_INDEX_CACHE')
licensedcode_cache_dir = (__env_license_cache_dir or std_license_cache_dir)


std_package_cache_dir = join(scancode_src_dir, 'packagedcode', 'data', 'cache')
__env_package_cache_dir = os.getenv('SCANCODE_PACKAGE_INDEX_CACHE')
packagedcode_cache_dir = (__env_package_cache_dir or std_package_cache_dir)

_create_dir(licensedcode_cache_dir)
_create_dir(packagedcode_cache_dir)
_create_dir(scancode_cache_dir)

# - scancode_temp_dir: for short-lived temporary files which are import- or run-
Expand Down
Loading